pátek 3. února 2012

Další spouštěč úloh

Tentokrát paralelně. Úkolem je pustit graf úloh, kde jednotlivé větve mají běžet paralelně. Používám zde jen python s balíčkem multiprocessing, což má svůj důvod. Multiprocessing funguje na jedničku (bez hvězdičky), ale procesu nedovoluje paralelně pouštět další procesy. Pokusíte-li se o to přesto, odmění Vás python tímto hlášením ve stylu fantasy románů: "daemonic processes are not allowed to have children". To ale nevadí, protože každý orientovaný graf bez cyklů lze dekomponovat na jednotlivé větve. A přesně to dělám v přiloženém skriptu. Skript pouští tento graf úloh:



Procedura job(x) trvá x sekund. Celý graf úloh by měl trvat 12 sekund, a také trvá. Ale nejsem spokojen s cyklem, který procesy pouští, protože neumím čekat na první konec nějakého procesu jinak než pomocí timoutu. Předpokládám, že jednotlivé úlohy trvají alespoň minuty, ale spíš hodiny, takže to tolik nevadí, ale není to čisté řešení a nelíbí se mi. 

 # -*- coding: cp1250 -*-  
 import sys  
 import copy  
 from multiprocessing import Process, Pool, Lock, Manager, Queue  
 def decompose(bash, reservedIDs = [0], parentsConditions = [0]):  
  processes = []  
  process = None  
  conditions = copy.copy(parentsConditions)  
  for i in range(len(bash)):  
   command = bash[i][0]  
   if command == "branch":  
    conditions2 = copy.copy(conditions)  
    for j in range(len(bash[i][1])):  
     for subprocess in decompose(bash[i][1][j], reservedIDs, conditions2):  
      processes.append(subprocess)  
      conditions.append(subprocess["ID"])  
    process = None  
   else:  
    if process == None:  
     processID = max(reservedIDs) + 1  
     reservedIDs.append(processID)  
     process = {  
      "ID": processID,  
      "bash": [],  
      "conditions": copy.copy(list(set(conditions)))  
     }  
     processes.append(process)  
     conditions.append(processID)  
    process["bash"].append(bash[i])     
  return processes  
 def doDecomposedBash(bash, jobs):  
  for [command, argument] in bash:  
   jobs[command](*argument)  
 def job(t):  
  print "Doing some staff ..."  
  import time  
  time.sleep(t)  
  print "... done (%f sec)." % t  
 if __name__ == "__main__":  
  manager = Manager()  
  jobs = {  
   "job": job,  
  }  
  enviroment = manager.dict()  
  bash = [  
   ["branch",  
    [  
     [  
      ["job", [1.0]],  
      ["branch",  
       [  
        [  
         ["job", [2.0]],  
        ],  
        [  
         ["job", [3.0]],  
        ],  
       ],  
      ],  
      ["job", [4.0]],  
     ],  
     [  
      ["job", [5.0]],  
      ["branch",  
       [  
        [  
         ["job", [6.0]],  
        ],  
        [  
         ["job", [7.0]],  
        ],  
       ],  
      ],  
     ],  
    ],  
   ],  
  ]  
  processes = decompose(bash)  
  for process in processes:  
   process["process"] = Process(target = doDecomposedBash, args = (process["bash"], jobs))  
   process["state"] = "waiting"  
  closedProcesses = {0: 1}   
  # starting processes  
  while True:  
   theEnd = True  
   for process in processes:  
    if process["state"] == "running" and not process["process"].is_alive():  
     process["state"] = "closed"  
    if process["state"] == "closed":  
     closedProcesses[process["ID"]] = 1  
    else:  
     theEnd = False  
   if theEnd:  
    break  
   for process in processes:  
    if process["state"] == "waiting":  
     run = True  
     for condition in process["conditions"]:  
      if not condition in closedProcesses:  
       run = False  
     if run:  
      process["state"] = "running"  
      process["process"].start()  
   for process in processes:  
    if process["state"] == "running":  
     process["process"].join(0.01)  
     break       
   # All processes are done now.  

2 komentáře:

Unknown řekl(a)...

Tomu kódu úplně nerozumím (i když jsem tomu moc nedal), ale v poslední době mě zaujal tenhle modul -- pbs. Je to Pythoní spouštění libovolných příkazů s poměrně přirozeným přenesením shellových schopností do pythonu (argumenty příkazu -> argumenty funkce, pipy -> vnořené volání, spuštění na pozadí ampersandem -> keyword argument _bg=True atd.) Třeba Vám k něčemu bude.

Giovanni řekl(a)...

Děkuji, vypadá to docela dobře. Já stějně časem výpočty přesunu na metacentrum, takže tohle by mohlo být ideální řešení.