Using a MySQL table as a queue with threads in Python

I have a DB with a table queue

, new records are constantly being queued.

I want a Python script to execute the queue as fast as possible, and I think I need some kind of multi-threaded code to do that, running like a daemon.

But I can't figure out how to use DB as a queue.

I am considering this example:

import MySQLdb
from Queue import Queue
from threading import Thread

def do_stuff(q):
    while True:
        print q.get()
        q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_stuff, args=(q,))
    worker.setDaemon(True)
    worker.start()

// TODO:  Use the DB
db = MySQLdb.connect(...)
cursor = db.cursor()
q = cursor.execute("SELECT * FROM queue")

for x in range(100):
    q.put(x)
q.join()

      

+3


source to share


2 answers


2 quick points:

  • Assuming you are using cPython, the GIL will effectively render threads useless, only allowing 1 thread through the translator at a time. A couple of workarounds:

    • Library Gevent

      [ source ]

      gevent is a coroutine-based Python networking library that uses greenlet to provide a high level synchronous API on top of the libev event loop.

    • Module multiprocessing

      , you can create multiple processes is true concurrency in python.

    • Module concurrent.futures

      - new in python 3, port available for python 2. [source]

      This is a new high-level library that only works at the "job" level, which means you no longer have to fuss with synchronization or thread or process control. you simply specify a thread or a pool of processes with a certain number of "workers", send
      jobs and compare the results. Its new in Python 3.2, but a port for Python 2.6+ is available at http://code.google.com/p/pythonfutures .

You can use SSDictCursor()

for MySQLdb and do fetchone (). It's a streaming cursor, and you can run this in an infinite while () loop to make it look like a queue:



cur = MySQLdb.cursors.SSDictCursor()

cur.execute(query)

while True:

row = cursor.fetchone()

if not row : break # (or sleep()!)

else: # other

      

  1. Having said all this, I suggest you take a look at the implementation of tools such as celery

    or mongodb

    for emulating queues and workers. Relational databases just aren't cut out for this kind of work and don't have unnecessary fragmentation. Here's a great source if you want to know more about fragmentation in mysql.
+3


source


I'm not sure if this is the best solution, but I am thinking about the structure of the main thread that reads the db and populates the queue. Be sure to avoid doublets. Perhaps using the primary key the increase in the number will be easy to verify.

Worker-Structure is nice, but as mentioned in the comments: The GIL avoids any promotion. But you can use multiprocessing if your "do_stuff" does not depend on the script itself (for example, tasks are pictures, and "do_stuff" is "image rotation" 90 °). Afaik does not suffer from GIL



https://docs.python.org/2/library/subprocess.html get info on this.

PS: English is not my first language.

+1


source







All Articles