Resumable queue multiprocessing
I am trying to figure out how to write a python program that uses a multiprocessing queue.
I have multiple servers and one of them will provide a queue remotely using this:
from multiprocessing.managers import BaseManager
import Queue
import daemonme
queue = Queue.Queue()
class QueueManager(BaseManager):
pass
daemonme.createDaemon()
QueueManager.register('get_job', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='')
s = m.get_server()
s.serve_forever()
Now I want to use my dual-core Xeon server with four cores to process jobs from this remote queue. The works are completely independent from each other. So if I have 8 cores, I would like to start 7 processes that pick a job from the queue, process it, and then come back for the next one. Each of the 7 processes will do this, but I can't get my head wrapped around the structure of this program.
Can anyone give me some educated ideas about the basic structure of this?
Thanks in advance.
source to share
Look at the document on how to return the queue from manager (clause 17.6.2.7) than from the pool (clause 17.6.2.9) workers start 7 jobs, transferring the queue to each of them.
alternatively you can think of a producer / consumer problem:
from multiprocessing.managers import BaseManager
import random
class Producer():
def __init__(self):
BaseManager.register('queue')
self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs')
self.m.connect()
self.cm_queue = self.m.queue()
while 1:
time.sleep(random.randint(1,3))
self.cm_queue.put(<PUT-HERE-JOBS>)
from multiprocessing.managers import BaseManager
import time
import random
class Consumer():
def __init__(self):
BaseManager.register('queue')
self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
self.m.connect()
self.queue = self.m.queue()
while 1:
<EXECUTE(job = self.queue.get())>
from multiprocessing.managers import BaseManager, Queue
class Manager():
def __init__(self):
self.queue = QueueQueu()
BaseManager.register('st_queue', callable=lambda:self.queue)
self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
self.s = self.m.get_server()
self.s.serve_forever()
source to share
You must use the master-slave (aka. Farmer-worker) pattern. The initial process is to be a wizard and create jobs. it
- creates a queue
- creates 7 slave processes passing the queue as a parameter
- starts writing jobs to the queue
Slave processes continuously read from the queue and execute jobs (possibly until they receive a stop message from the queue). In this scenario, you do not need to use Manager objects, AFAICT.
source to share