How to return an item to the queue. Queue

How to return an item to the queue. Queue? This would be useful in threading or multiprocessing, if the task fails, so the task cannot be lost.

The docs for queue.Queue.get () say the function can "Remove and return an item from the queue", but I believe the use of the word "return" here refers to a function returning an item to the calling thread rather than putting it back into item queue. This is demonstrated by the example code below, which simply blocks indefinitely on the second call to the queue.Queue.get()

main thread, instead of doing it in a call print()

on the thread.

import time
import threading
import queue


def threaded_func():
    thread_task = myqueue.get()
    print('thread_task: ' + thread_task)

myqueue = queue.Queue()
myqueue.put('some kind of task')
main_task = myqueue.get()
print('main_task: ' + main_task)

t = threading.Thread(target=threaded_func)
t.daemon = True
t.start()

time.sleep(5)
myqueue.get()   # This blocks indefinitely

      

I have to believe there is an easy way to return the task, so what is it? The call task_done()

and then put()

with the task of putting it back in the queue in two operations is not atomic and therefore could result in item loss.

One possible, but clumsy solution is to simply try to complete the task again, but then you will have to add a few extra lines to handle this complexity, and I'm not even sure that all unsuccessful tasks can be necessarily recovered this way.

+3


source to share


1 answer


Not all failed tasks can recover. You should not repeat them unless there is reason to believe they will pass later. For example, if your work item is a url and unsuccessful connection count, you can implement some sort of thing with max-retries.

Your biggest problem is that you have not yet implemented the viable worker model. You need 2 queues for a bi-directional worker conversation. One for publishing work items and one for getting status. Once you succeed, the receiver can always decide to cut this message in the work queue. Here's an example of a lazy worker who just conveys what he said.



import threading
import queue

def worker(in_q, out_q):
    while True:
        try:
            task, data = in_q.get()
            print('worker', task, data)
            if task == "done":
                return
            elif task == "pass this":
                out_q.put(("pass", data))
            else:
                out_q.put(("fail", data))
        except Exception as e:
            print('worker exception', e)
            out_q.put("exception", data)

in_que = queue.Queue()
out_que = queue.Queue()

work_thread = threading.Thread(target=worker, args=(in_que, out_que))
work_thread.start()

# lets make every other task a fail
in_que.put(('pass this', 0))
in_que.put(('fail this', 1))
in_que.put(('pass this', 2))
in_que.put(('fail this', 3))
in_que.put(('pass this', 4))
in_que.put(('fail this', 5))

pending_tasks = 6

while pending_tasks:
    status, data = out_que.get()
    if status == "pass":
        pending_tasks -= 1
    else:
        # make failing tast pass
        in_que.put(('pass this', data))

in_que.put(("done", None))
work_thread.join()
print('done')

      

+1


source







All Articles