Resumable queue multiprocessing

I am trying to figure out how to write a python program that uses a multiprocessing queue.

I have multiple servers and one of them will provide a queue remotely using this:

from multiprocessing.managers import BaseManager
import Queue
import daemonme

queue = Queue.Queue()

class QueueManager(BaseManager):
    pass

daemonme.createDaemon()
QueueManager.register('get_job', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='')
s = m.get_server()
s.serve_forever()

      

Now I want to use my dual-core Xeon server with four cores to process jobs from this remote queue. The works are completely independent from each other. So if I have 8 cores, I would like to start 7 processes that pick a job from the queue, process it, and then come back for the next one. Each of the 7 processes will do this, but I can't get my head wrapped around the structure of this program.

Can anyone give me some educated ideas about the basic structure of this?

Thanks in advance.

+2


source to share


2 answers


Look at the document on how to return the queue from manager (clause 17.6.2.7) than from the pool (clause 17.6.2.9) workers start 7 jobs, transferring the queue to each of them.

alternatively you can think of a producer / consumer problem:



from multiprocessing.managers import BaseManager
import random

class Producer():
def __init__(self):
    BaseManager.register('queue')
    self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.cm_queue = self.m.queue()
    while 1:
        time.sleep(random.randint(1,3))
        self.cm_queue.put(<PUT-HERE-JOBS>)

from multiprocessing.managers import BaseManager
import time
import random
class Consumer():
def __init__(self):
    BaseManager.register('queue')

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.queue = self.m.queue()
    while 1:
        <EXECUTE(job = self.queue.get())>


from multiprocessing.managers import BaseManager, Queue
class Manager():

def __init__(self):

    self.queue = QueueQueu()

    BaseManager.register('st_queue', callable=lambda:self.queue)

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.s = self.m.get_server()

    self.s.serve_forever()

      

+2


source


You must use the master-slave (aka. Farmer-worker) pattern. The initial process is to be a wizard and create jobs. it

  • creates a queue
  • creates 7 slave processes passing the queue as a parameter
  • starts writing jobs to the queue


Slave processes continuously read from the queue and execute jobs (possibly until they receive a stop message from the queue). In this scenario, you do not need to use Manager objects, AFAICT.

0


source







All Articles