Parallel DAG processing
I am trying to figure out how I can handle a directed acyclic graph in parallel. Each node should be able to "execute" when all of its input nodes have been processed in advance. Imagine a class Task
with the following interface:
class Task(object):
result = None
def inputs(self):
''' List all requirements of the task. '''
return ()
def run(self):
pass
I cannot think of a way to handle the graph that can be represented on this structure asynchronously with the maximum number of workers at the same time, except for one method.
I think optimal processing will be achieved by creating a thread for each task, waiting for all inputs to be processed. But , spawning a thread for each task immediately, and not sequentially (i.e. when the task is ready for processing) is not a good idea for me.
import threading
class Runner(threading.Thread):
def __init__(self, task):
super(Runner, self).__init__()
self.task = task
self.start()
def run(self):
threads = [Runner(r) for r in self.task.inputs()]
[t.join() for t in threads]
self.task.run()
Is there a better way to mimic this behavior? In addition, this approach does not currently implement a way to limit the number of tasks performed at a time.
source to share
Ask one main thread to route items to the queue as soon as they are ready for the process. Then try a pool of workers in the queue for tasks. (Python provides a synchronized queue in Queue
module , renamed to lowercase Queue
in Python 3).
The wizard first creates a map from dependencies for dependent tasks. Each task that does not have any dependencies can be queued. Each time a task completes, the wizard uses the dictionary to figure out what dependent tasks are and enqueues them if all their dependencies are now complete.
source to share
Celery ( http://www.celeryproject.org/ ) is the leading task management tool for Python. He should be able to help you with this.
source to share