How to deal with queue corruption when using Process.Terminate ()

I am creating a Python script / application that launches several so called Fetchers. They, in turn, do something and return data to the queue.

I want to make sure the Fetchers don't run for more than 60 seconds (because the whole application starts multiple times in one hour).

While reading the Python docs I noticed that they say to be careful when using Process.Terminate () as it can interrupt the queue.

My current code:

# Result Queue
resultQueue = Queue();

# Create Fetcher Instance
fetcher = fetcherClass()

# Create Fetcher Process List
fetcherProcesses = []

# Run Fetchers
for config in configList:
    # Create Process to encapsulate Fetcher
    log.debug("Creating Fetcher for Target: %s" % config['object_name'])
    fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue))

    log.debug("Starting Fetcher for Target: %s" % config['object_name'])
    fetcherProcess.start()
    fetcherProcesses.append((config, fetcherProcess))

# Wait for all Workers to complete
for config, fetcherProcess in fetcherProcesses:
    log.debug("Waiting for Thread to complete (%s)." % str(config['object_name']))
    fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT)
    if fetcherProcess.is_alive():
        log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name'])
        fetcherProcess.terminate()

# Loop thru results, and save them in RRD
while not resultQueue.empty():
    config, fetcherResult = resultQueue.get()
    result = storage.Save(config, fetcherResult)

      

I want to make sure my queue is not damaged when one of my Fetchers time out.

What's the best way to do this?

Edit: In response to the chat with sebdelsol, a few clarifications:

1) I want to start processing the data as soon as possible, because otherwise I have to do many disk-intensive operations at the same time. So sleeping main thread for X_Timeout is not an option.

2) I only need to wait for Timeout once, but per process, so if the main thread starts 50 sets and it takes a few seconds to half a minute, I need to compensate.

3) I want to make sure that the data that comes from Queue.Get () is put in there by the Fetcher, which is not timed out (since it is theoretically possible that the collector was putting data in the queue when timeout and it was shot ...) This data must be discarded.

It's not too bad when a timeout occurs, it's not a desirable situation, but corrupted data is worse.

+3


source to share


2 answers


You can pass a new multiprocessing.Lock()

one to every novice user.

During the selection process, be sure to wrap Queue.put()

this lock:

with self.lock:
    self.queue.put(result)

      

If you need to complete the selection process, use a lock:



with fetcherLock:
    fetcherProcess.terminate()

      

This way, your queue will not be corrupted by killing the collector while accessing the queue.

Some chip locks can be damaged. But this is not a problem, since every new recipient you launch has a new lock.

+4


source


Why not



  • create a new queue and run all collectors that will use this queue.
  • you have a script actually sleeping the time you want the receiver process to have to get the result.
  • get everything from resultQueue - it won't get corrupted since you didn't have to kill any process.
  • finally, terminate any processes that are still persisting.
  • loop!
0


source







All Articles