Sqlalchemy session.rollback from IntegrityError makes queuepool work with handlers?

I have the following table:

class Feedback(Base):
  __tablename__ = 'feedbacks'
  __table_args__ = (UniqueConstraint('user_id', 'look_id'),)
  id = Column(Integer, primary_key=True)
  user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
  look_id = Column(Integer, ForeignKey('looks.id'), nullable=False)

      

I am currently inserting a lot of records into this table which will break this UniqueConstraint.

I am using the following code:

  for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
    feedback = Feedback()
    feedback.user_id = User.get_or_create(comment.model_id).id
    feedback.look_id = comment.commentable_id
    session.add(feedback)
    try:        # Refer to T20
      session.flush()
    except IntegrityError,e:
      print "IntegrityError", e
      session.rollback()
  session.commit()

      

and I am getting the following error:

IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
DETAIL:  Key (user_id, look_id)=(140, 263008) already exists.
 'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 140, 'score': 1, 'look_id': 263008}
IntegrityError (IntegrityError) duplicate key value violates unique constraint "feedbacks_user_id_look_id_key"
...
(there about 24 of these integrity errors here)
...
DETAIL:  Key (user_id, look_id)=(173, 263008) already exists.
 'INSERT INTO feedbacks (user_id, look_id, score) VALUES (%(user_id)s, %(look_id)s, %(score)s) RETURNING feedbacks.id' {'user_id': 173, 'score': 1, 'look_id': 263008}
No handlers could be found for logger "sqlalchemy.pool.QueuePool"
Traceback (most recent call last):
  File "load.py", line 40, in <module>
    load_crawl_data_into_feedback()
  File "load.py", line 21, in load_crawl_data_into_feedback
    for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2337, in instances
    fetch = cursor.fetchmany(self._yield_per)
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3230, in fetchmany
    self.cursor, self.context)
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3223, in fetchmany
    l = self.process_rows(self._fetchmany_impl(size))
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3343, in _fetchmany_impl
    row = self._fetchone_impl()
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3333, in _fetchone_impl
    self.__buffer_rows()
  File "/Volumes/Data2/Dropbox/projects/Giordano/venv/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3326, in __buffer_rows
    self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
sqlalchemy.exc.ProgrammingError: (ProgrammingError) named cursor isn't valid anymore None None

      

Before jumping to conclusions about this error caused by yield_per, I can assure you that yield_per is not the culprit here.

I tried the same code with no unique constraints and I didn't experience any errors at all.

I believe that integrity errors are being thrown . No handlers could be found for the logger "sqlalchemy.pool.QueuePool" .

I assume that every integrity error kills every "thread" in the queue.

Can someone enlighten me on what is going on?

If there is not much I can do with the data at the moment, what would you recommend to me?

+3


source to share


2 answers


This error only comes from the Python module logging

; your pool class is trying to log some debug message, but you don't have SQLA protocols configured. Setting up logging is easy and you can see what it is actually trying to say.

I'm not really sure what's going on here, but it certainly doesn't help that you are failing top-level transactions dozens of times. Rollback completes the transaction and invalidates each direct row object. It will definitely not interact well with yield_per

.

If your database supports savepoints or nested transactions (i.e. Postgres or Oracle ... or maybe the latest MySQL?), Try running a nested transaction for every try:

for comment in session.query(Comment).filter(Comment.type == Comment.TYPE_LOOK).yield_per(100):
    try:
        with session.begin_nested():
            feedback = Feedback()
            feedback.user_id = User.get_or_create(comment.model_id).id
            feedback.look_id = comment.commentable_id
            session.add(feedback)
            session.flush()
    except IntegrityError, e:
        print "IntegrityError", e

session.commit()

      



with

falls back on error and commits success, so the failure flush

won't wreak havoc on the rest of your main transaction.

If you don't have backend support, other reasonable options come to mind:

  • Build your query: do LEFT JOIN

    with your feedback table so you know whether or not there is a feedback string in the application.

  • If you want to make it (user_id, look_id)

    your primary key, I think you can use session.merge(feedback)

    . This acts like an insert-or-update based on the primary key: if SQLA can find an existing row with the same pk, it will update it, otherwise it will create a new one in the database. It would take the risk of turning off the extra SELECT

    for every new line.

+4


source


"Before jumping to conclusions about this yield_per error, I can assure you that yield_per is not the culprit here."

I'm not sure why you think the yield_per () function is important here, and this could be quickly predicted by simply trying the same test without yield_per () to see if the behavior is different. By using yield_per (), psycopg2's cursor stays open as this loop continues. But then you use ROLLBACK to connect psycopg2 via session.rollback()

. This will cause an error like "named cursor is not valid anymore" to happen. Actually the only reason there is a named cursor is because you are executing server side cursors with psycopg2, which is part of what yield_per () gives.



"I've tried the same code with no unique constraints and I haven't experienced any errors at all."

this is because no exception is thrown without constraints and rollback () is not hit.

+3


source







All Articles