Viewflow.io: performing a queue task

I would like to implement the following use case with the ViewFlow library :

Problem

Thread-specific processes started by a user must wait in the queue before executing a celery job. Each user has a queue of these processes. Scheduled or manually started, the next process in the queue is allowed.

Example

A node inside my thread enters a named queue. Other logic in the application determines for each queue when to resolve the next task. The next task in the queue is selected and its activation method () is called.

An example stream might look like this:

class MyFlow(Flow):

    start = flow.Start(...).Next(queue_wait)
    queue_wait = QueueWait("myQueue").Next(job)
    job = celery.Job(...).Next(end)
    end = flow.End()

      

Question

What would be the best approach to implementing queues? In the above example, I don't know what "QueueWait" should be.

I've read the doc and code, but it's not clear to me yet if this can be done with built-in node and Activation classes like func.Function, or if I need to extend custom classes.

+3


source to share


1 answer


After much experimentation, I came up with a practical and simple solution:

from viewflow.flow import base
from viewflow.flow.func import FuncActivation
from viewflow.activation import STATUS


class Queue(base.NextNodeMixin,
            base.UndoViewMixin,
            base.CancelViewMixin,
            base.DetailsViewMixin,
            base.Event):

    """
    Node that halts the flow and waits in a queue. To process the next waiting task
    call the dequeue method, optionally specifying the task owner.

    Example placing a job in a queue::

        class MyFlow(Flow):
            wait = Queue().Next(this.job)
            job = celery.Job(send_stuff).Next(this.end)
            end = flow.End()

        somewhere in the application code:
        MyFlow.wait.dequeue()
        or:
        MyFlow.wait.dequeue(process__myprocess__owner=user)

    Queues are logically separated by the task_type, so new queues defined in a
    subclass by overriding task_type attribute.
    """

    task_type = 'QUEUE'
    activation_cls = FuncActivation

    def __init__(self, **kwargs):
        super(Queue, self).__init__(**kwargs)

    def dequeue(self, **kwargs):
        """
        Process the next task in the queue by created date/time. kwargs is
        used to add task filter arguments, thereby effectively splitting the queue
        into subqueues. This could be used to implement per-user queues.

        Returns True if task was found and dequeued, False otherwise
        """
        filter_kwargs = {'flow_task_type': self.task_type, 'status': STATUS.NEW}
        if kwargs is not None:
            filter_kwargs.update(kwargs)

        task = self.flow_cls.task_cls.objects.filter(**filter_kwargs).order_by('created').first()
        if task is not None:
            lock = self.flow_cls.lock_impl(self.flow_cls.instance)
            with lock(self.flow_cls, task.process_id):
                task = self.flow_cls.task_cls._default_manager.get(pk=task.pk)
                activation = self.activation_cls()
                activation.initialize(self, task)
                activation.prepare()
                activation.done()
            return True

        return False

      



I tried to make it as general as possible and maintain the definition of multiple named queues as well as subnets such as queues for each user.

+2


source







All Articles