SQLAlchemy exectutemany with generator

Question. For the execution function of the Connnection object, you can use a generator that returns dictionaries rather than a list of dictionaries to execute the "executemany" insert?

More details: I am trying to learn SQLAlchemy working with basic expressions . As a test, I have a fairly large dataset accessible from a file via an iterator that I am trying to transfer into a PostgreSQL table, but inserting individual rows is quite slow (see Example 1 below). As per the documentation , the Connnection object function execute()

will do the equivalent executemany()

if a list of dictionaries is passed in rather than a single dictionary. I did some quick tests, and indeed, this method speeds up quite a bit for groups of inserts. Unfortunately, with my large dataset, I cannot create a complete list of dictionaries in memory, so my question is ...

Example 1: The following (pseudo) code is very slow for a lot of data

from sqlalchemy import MetaData, Table, Column

metadata = MetaData()
data = Table('data', metadata, Column...)

engine = sql.create_engine('postgresql://user:pass$@localhost/testdb')
metadata.create_all(engine)

conn = engine.connect()
ins = data.insert()
for datum in large_data_iterator:
    datum_dict = do_some_proc(datum)
    conn.execute(ins, datum_dict)

      

Since execution can take on multiple values, it would be nice to replace the end loop with the for

following version of the generator:

def datagen(iterator):
    for datum in large_data_iterator:
        datum_dict = do_some_proc(datum)
        yield datum_dict

conn = engine.connect()
ins = data.insert()
conn.execute(ins, datagen(large_data_iterator))

      

However, this throws the following exception: AttributeError: The 'list' object has no attributes 'keys'.

Does anyone know if it is possible to get the generator version? Or a better way to do this would be great too. Thank!

Note. I checked out a modified generator expression that gives chunks as a list of dictionaries (below) and is faster than being executed by an individual user. However, I do not know how to choose the optimal number of chunks, and I am concerned that the added complexity to my generator code makes it potentially more error prone. (But if that's the only way ...)

def datagen(iterator):
    output = []
    N = 0
    for datum in large_data_iterator:
        datum_dict = do_some_proc(datum)
        output.append(datum_dict)
        N += 1
        if N == 100: # or whatever
            yield output
            N = 0
            output = []
    if output != []:
        yield output

      

+3


source to share


1 answer


There are execution_options for Connection

that take a parameter stream_results

, but luckily it says at the bottom that "currently the flag is only understood in the psycopg2 dialect", although there are other streaming-enabled drivers (like oursql).



Until it is fully supported in sqlalchemy, you can easily write a helper function to break any iterable into chunks to avoid the error of modifying your generators.

0


source







All Articles