SQLAlchemy exectutemany with generator
Question. For the execution function of the Connnection object, you can use a generator that returns dictionaries rather than a list of dictionaries to execute the "executemany" insert?
More details: I am trying to learn SQLAlchemy working with basic expressions . As a test, I have a fairly large dataset accessible from a file via an iterator that I am trying to transfer into a PostgreSQL table, but inserting individual rows is quite slow (see Example 1 below). As per the documentation , the Connnection object function execute()
will do the equivalent executemany()
if a list of dictionaries is passed in rather than a single dictionary. I did some quick tests, and indeed, this method speeds up quite a bit for groups of inserts. Unfortunately, with my large dataset, I cannot create a complete list of dictionaries in memory, so my question is ...
Example 1: The following (pseudo) code is very slow for a lot of data
from sqlalchemy import MetaData, Table, Column
metadata = MetaData()
data = Table('data', metadata, Column...)
engine = sql.create_engine('postgresql://user:pass$@localhost/testdb')
metadata.create_all(engine)
conn = engine.connect()
ins = data.insert()
for datum in large_data_iterator:
datum_dict = do_some_proc(datum)
conn.execute(ins, datum_dict)
Since execution can take on multiple values, it would be nice to replace the end loop with the for
following version of the generator:
def datagen(iterator):
for datum in large_data_iterator:
datum_dict = do_some_proc(datum)
yield datum_dict
conn = engine.connect()
ins = data.insert()
conn.execute(ins, datagen(large_data_iterator))
However, this throws the following exception: AttributeError: The 'list' object has no attributes 'keys'.
Does anyone know if it is possible to get the generator version? Or a better way to do this would be great too. Thank!
Note. I checked out a modified generator expression that gives chunks as a list of dictionaries (below) and is faster than being executed by an individual user. However, I do not know how to choose the optimal number of chunks, and I am concerned that the added complexity to my generator code makes it potentially more error prone. (But if that's the only way ...)
def datagen(iterator):
output = []
N = 0
for datum in large_data_iterator:
datum_dict = do_some_proc(datum)
output.append(datum_dict)
N += 1
if N == 100: # or whatever
yield output
N = 0
output = []
if output != []:
yield output
source to share
There are execution_options for Connection
that take a parameter stream_results
, but luckily it says at the bottom that "currently the flag is only understood in the psycopg2 dialect", although there are other streaming-enabled drivers (like oursql).
Until it is fully supported in sqlalchemy, you can easily write a helper function to break any iterable into chunks to avoid the error of modifying your generators.
source to share