Creating a stream queue balancer

My project involves image processing for clients in bulk. Clients send image files encrypted, causing ImageMagick command line scripts to fail for each image. The problem I'm trying to solve is that if these commands are queued in the order that I receive them, then a client that needs to process 10k images will take hours to process all resources. My solution is to round each client queue so that everyone slows each other down equally. I created this class to implement this:

class QueueBalancer():
    def __init__(self, cycle_list=[]):
        self.cycle_list = cycle_list
        self.update_status()

    def cmd_gen(self):
        index = -1
        while True:
            try:
                if self.cycle_list:
                    self.processing = True
                    index += 1
                    commands = self.cycle_list[index]["commands"]
                    if commands:
                        command = commands.pop(0)
                        if len(commands) == 0:
                            del self.cycle_list[index]
                            index -= 1
                        self.update_status()
                        yield command
                else:
                    yield None
            except IndexError:
                index = -1

    def get_user(self, user):
        return next((item for item in self.cycle_list[0] if item["user"] == user), None)

    def create_or_append(self, user, commands):
        existing_user = self.get_user(user)
        if existing_user:
            index = self.cycle_list.index(existing_user)
            self.cycle_list[index]["commands"] += commands
        else:
            self.cycle_list += [{
                                      "user"     : user,
                                      "commands" : commands
                                   }]

    def update_status(self):
        if next((item for item in self.cycle_list if item["commands"] != []), None):
            self.processing = True
        else:
            self.processing = False

    def status(self):
        return self.processing

      

As you can see from the else clause create_or_append()

, cycle_list

this is a list of dictionaries like this:

{"user": "test1", "commands": ["command1", "command2"]},
{"user": "test2", "commands": ["x", "y", "z"]},
{"user": "test3", "commands": ["a", "b", "c"]}

      

(removed real commands, used example lines)

One instance cmd_gen()

will be used to feed commands to my shell, and I will use create_or_append()

to add users and commands on the fly, while the commands in the queue are still being processed. This seems to work fine so far in my initial tests, but is it theoretically thread safe? If not, what should I do to verify this?

+3


source to share


3 answers


I thought I had a chance to create an overall balanced queue as you described - this is the result. I think there are still some pathological cases where a user can have many jobs being processed sequentially, but this has to do with adding other user jobs to specific times / orders, so I don't think this will happen in real jobs and Will not be able to be exploited if several users do not agree.

from threading import Lock


class UserBalancedJobQueue(object):

    def __init__(self):
        self._user_jobs = {}
        self._user_list = []
        self._user_index = 0
        self._lock = Lock()

    def pop_user_job(self):
        with self._lock:
            if not self._user_jobs:
                raise ValueError("No jobs to run")

            if self._user_index >= len(self._user_list):
                self._user_index = 0
            user = self._user_list[self._user_index]

            jobs = self._user_jobs[user]
            job = jobs.pop(0)

            if not jobs:
                self._delete_current_user()

            self._user_index += 1
            return user, job

    def _delete_current_user(self):
        user = self._user_list.pop(self._user_index)
        del self._user_jobs[user]

    def add_user_job(self, user, job):
        with self._lock:
            if user not in self._user_jobs:
                self._user_list.append(user)
                self._user_jobs[user] = []
            self._user_jobs[user].append(job)


if __name__ == "__main__":
    q = UserBalancedJobQueue()
    q.add_user_job("tom", "job1")
    q.add_user_job("tom", "job2")
    q.add_user_job("tom", "job3")
    q.add_user_job("fred", "job4")
    q.add_user_job("fred", "job5")

    for i in xrange(3):
        print q.pop_user_job()

    print "Adding more jobs"
    q.add_user_job("dave", "job6")
    q.add_user_job("dave", "job7")
    q.add_user_job("dave", "job8")
    q.add_user_job("dave", "job9")

    try:
        while True:
            print q.pop_user_job()
    except ValueError:
        pass

      

With that in mind, an alternative implementation would have to remember for each user when their last job was done and then select the next user where the last job was the oldest. This would probably be more "correct", but it would have (perhaps negligible) additional memory overhead to remember the last uptime for each user.



Edit: so this is a slow day - here's this different approach. I think I prefer this above, although it is slower due to O (N) looking for the user with the oldest previous job.

from collections import defaultdict
from threading import Lock
import time


class UserBalancedJobQueue(object):

    def __init__(self):
        self._user_jobs = defaultdict(list)
        self._user_last_run = defaultdict(lambda: 0.0)
        self._lock = Lock()

    def pop_user_job(self):

        with self._lock:
            if not self._user_jobs:
                raise ValueError("No jobs to run")

            user = min(
                self._user_jobs.keys(),
                key=lambda u: self._user_last_run[u]
            )
            self._user_last_run[user] = time.time()

            jobs = self._user_jobs[user]
            job = jobs.pop(0)

            if not jobs:
                del self._user_jobs[user]

            return user, job

    def add_user_job(self, user, job):
        with self._lock:
            self._user_jobs[user].append(job)

      

+1


source


I have doubts about the thread safety of the next part:

def create_or_append(self, user, commands):
    existing_user = self.get_user(user)
    if existing_user:
        index = self.cycle_list.index(existing_user)
        self.cycle_list[index]["commands"] += commands
    else:
        self.cycle_list += [{
                                  "user"     : user,
                                  "commands" : commands
                               }]

      

If 2 threads run a method create_or_append

, there is a possibility for 2 threads to be in the else closure and then mess up your data a bit. Perhaps defining a lock might be a good idea in this function.



from threading import Lock

class QueueBalancer():

    def __init__(self, cycle_list=None):
        self.cycle_list = [] if cycle_list is None else cycle_list
        self.lock = Lock()

    # .../...

    def create_or_append(self, user, commands):
        with self.lock:
            # ...

      

EDIT: as @matino pointed out, you may also have a problem with the function update_status

, since it modifies the instance attribute processing

. I would recommend using another lock for this to ensure thread safety.

def update_status(self):
    with self.update_lock:
        if next((item for item in self.cycle_list if item["commands"] != []), None):
            self.processing = True
        else:
            self.processing = False

      

+1


source


Your class is definitely not thread safe as you are mutating its instance attributes:

  • In update_status

    you mutate the self.processing

    meaning
  • In create_or_append

    you changeself.cycle_list

    Without blocking these attributes, your class will not be thread safe.

Side note : always initialize all instance attributes in a method __init__

. Since you are using self.processing

in your code, it should be in__init__

+1


source







All Articles