Tentokrát paralelně. Úkolem je pustit graf úloh, kde jednotlivé větve mají běžet paralelně. Používám zde jen python s balíčkem multiprocessing, což má svůj důvod. Multiprocessing funguje na jedničku (bez hvězdičky), ale procesu nedovoluje paralelně pouštět další procesy. Pokusíte-li se o to přesto, odmění Vás python tímto hlášením ve stylu fantasy románů: "daemonic processes are not allowed to have children". To ale nevadí, protože každý orientovaný graf bez cyklů lze dekomponovat na jednotlivé větve. A přesně to dělám v přiloženém skriptu. Skript pouští tento graf úloh:
Procedura job(x) trvá x sekund. Celý graf úloh by měl trvat 12 sekund, a také trvá. Ale nejsem spokojen s cyklem, který procesy pouští, protože neumím čekat na první konec nějakého procesu jinak než pomocí timoutu. Předpokládám, že jednotlivé úlohy trvají alespoň minuty, ale spíš hodiny, takže to tolik nevadí, ale není to čisté řešení a nelíbí se mi.
# -*- coding: cp1250 -*-
import sys
import copy
from multiprocessing import Process, Pool, Lock, Manager, Queue
def decompose(bash, reservedIDs = [0], parentsConditions = [0]):
processes = []
process = None
conditions = copy.copy(parentsConditions)
for i in range(len(bash)):
command = bash[i][0]
if command == "branch":
conditions2 = copy.copy(conditions)
for j in range(len(bash[i][1])):
for subprocess in decompose(bash[i][1][j], reservedIDs, conditions2):
processes.append(subprocess)
conditions.append(subprocess["ID"])
process = None
else:
if process == None:
processID = max(reservedIDs) + 1
reservedIDs.append(processID)
process = {
"ID": processID,
"bash": [],
"conditions": copy.copy(list(set(conditions)))
}
processes.append(process)
conditions.append(processID)
process["bash"].append(bash[i])
return processes
def doDecomposedBash(bash, jobs):
for [command, argument] in bash:
jobs[command](*argument)
def job(t):
print "Doing some staff ..."
import time
time.sleep(t)
print "... done (%f sec)." % t
if __name__ == "__main__":
manager = Manager()
jobs = {
"job": job,
}
enviroment = manager.dict()
bash = [
["branch",
[
[
["job", [1.0]],
["branch",
[
[
["job", [2.0]],
],
[
["job", [3.0]],
],
],
],
["job", [4.0]],
],
[
["job", [5.0]],
["branch",
[
[
["job", [6.0]],
],
[
["job", [7.0]],
],
],
],
],
],
],
]
processes = decompose(bash)
for process in processes:
process["process"] = Process(target = doDecomposedBash, args = (process["bash"], jobs))
process["state"] = "waiting"
closedProcesses = {0: 1}
# starting processes
while True:
theEnd = True
for process in processes:
if process["state"] == "running" and not process["process"].is_alive():
process["state"] = "closed"
if process["state"] == "closed":
closedProcesses[process["ID"]] = 1
else:
theEnd = False
if theEnd:
break
for process in processes:
if process["state"] == "waiting":
run = True
for condition in process["conditions"]:
if not condition in closedProcesses:
run = False
if run:
process["state"] = "running"
process["process"].start()
for process in processes:
if process["state"] == "running":
process["process"].join(0.01)
break
# All processes are done now.
2 komentáře:
Tomu kódu úplně nerozumím (i když jsem tomu moc nedal), ale v poslední době mě zaujal tenhle modul -- pbs. Je to Pythoní spouštění libovolných příkazů s poměrně přirozeným přenesením shellových schopností do pythonu (argumenty příkazu -> argumenty funkce, pipy -> vnořené volání, spuštění na pozadí ampersandem -> keyword argument _bg=True atd.) Třeba Vám k něčemu bude.
Děkuji, vypadá to docela dobře. Já stějně časem výpočty přesunu na metacentrum, takže tohle by mohlo být ideální řešení.
Okomentovat