Celery - Sqlite DB query during task execution

I have a Python based flash application where I use a Celery task queue to handle a lot of email tasks. I would like the Celery task to be able to query the sqlite database that I have bound throughout the application to pull in and use certain data, but I keep getting the following error. If I pull a single row in a celery task that queries my SQLite database, the task is then executed without throwing this error, so my guess is that I am making a fundamental mistake regarding binding celery and my database together.

[2015-07-18 21:36:25,168: ERROR/MainProcess] Process 'Worker-1' pid:6657 exited with 'signal 11 (SIGSEGV)'
[2015-07-18 21:36:25,187: ERROR/MainProcess] Task app.views.send_mail_task[4c3d5b1a-5ac3-4ab8-b633-b925eba5dd02] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
  File "/Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/billiard/pool.py", line 1171, in mark_as_worker_lost
    human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).

      

The relevant code looks like this:

models.py

...

class Company(Model):
  dont_contact_anymore = BooleanField(default=False)
  company_name = CharField()
  website = CharField(unique=True)
  email_address = CharField()
  country = CharField()
  scraped_on = DateTimeField(formats="%m-%d-%Y")
  have_contacted = BooleanField(default=False)
  current_pipeline_phase = IntegerField(default=0)


  sector = CharField()
  contacted_by = ForeignKeyField(
    rel_model=User,
    related_name='contacted_by',
    db_column='contacted_by'
    )

  class Meta:
    database = DATABASE
    order_by = ('have_contacted',)

  @classmethod
  def create_company(cls, company_name, website, email_address):
    try:
      with DATABASE.transaction():
        cls.create(company_name=company_name, website=website, email_address=email_address, scraped_on=datetime.now)
        print 'Saved {}'.format(company_name)
    except IntegrityError:
      print '{} already exists in the database'.format(company_name)

...

      

celery.js

function start_the_magic() {
  //Get the ids of the companies to contact and remove
  var ids = get_the_ids();

  var contact = ids[0]
  var remove = ids[1]

  console.log(contact);
  console.log(remove);

  $.ajax({
      type: 'POST',
      url: '/email_task',
      data: JSON.stringify({contact: contact, remove: remove}),
      contentType: "application/json; charset=utf-8",
      dataType: 'json',
      success: function(data, status, request) {
          status_url = request.getResponseHeader('Location');
          update_progress(status_url, '#progress div');
      },
      error: function() {
          alert('Unexpected error');
      }
  });
}


function update_progress(status_url, status_div) {
  // send GET request to status URL
  $.getJSON(status_url, function(data) {
      // update UI
      percent = parseInt(data['current'] * 100 / data['total']);
      //Remove the table and install the progress bar
      $('div.progress-container').html("");
      $('div.progress-container').html(bootstrap_progress);
      $(status_div).text(percent + '%');
      if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
          if ('result' in data) {
              // show result
              console.log('update progress worked!');
              // $(status_div.childNodes[3]).text('Result: ' + data['result']);
          }
          else {
              // something unexpected happened
              console.log('something unexpected happened');
              // $(status_div.childNodes[3]).text('Result: ' + data['state']);
          }
      }
      else {
          // rerun in 2 seconds
          setTimeout(function() {
              update_progress(status_url, status_div);
          }, 2000);
      }
  });
}

      

view.py

...

@celery.task(base=DbTask, bind=True)
def send_mail_task(self, contact, remove):
    total_contact_companies = len(contact);
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''

    for num,i in enumerate(contact):
      company = Company.select().where(Company.id == i).get()

      **^This is the line where things are failing^**

      print 'Contacting ' + company
      message = 'E-mailing {} at {}...'.format(company.company_name, company.email_address)

      self.update_state(state='PROGRESS',
                          meta={'current': num, 'total': total_contact_companies,
                                'status': message})
      time.sleep(3)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

....

      

Basically, the error is triggered when I try to query my Sqlite database in the celery task.

company = Company.select().where(Company.id == i).get()

      

Also, I am using Redis database for Celery.

Update

Here's a more detailed footprint from a celery log:

[2015-07-19 21:13:46,326: INFO/MainProcess] Received task: app.views.send_mail_task[acff718c-a5e3-430c-96d7-789d969cdb4f]
[2015-07-19 21:13:46,328: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x10bf07aa0> (args:('app.views.send_mail_task', 'acff718c-a5e3-430c-96d7-789d969cdb4f', [[u'1869', u'1870', u'1871'], []], {}, {'utc': True, u'is_eager': False, 'chord': None, u'group': None, 'args': [[u'1869', u'1870', u'1871'], []], 'retries': 0, u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u'celery'}, 'expires': None, u'hostname': 'celery@JoeyOrlandoMacBookAir.local', 'task': 'app.views.send_mail_task', 'callbacks': None, u'correlation_id': u'acff718c-a5e3-430c-96d7-789d969cdb4f', 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, u'reply_to': u'58c6e8a9-cab4-3f00-aacd-d761a3550934', 'id': 'acff718c-a5e3-430c-96d7-789d969cdb4f', u'headers': {}}) kwargs:{})
[2015-07-19 21:13:46,331: WARNING/Worker-1] this line will eventually get replaced with the company name that is getting contacted
[2015-07-19 21:13:46,332: DEBUG/MainProcess] Task accepted: app.views.send_mail_task[acff718c-a5e3-430c-96d7-789d969cdb4f] pid:44579
[2015-07-19 21:13:46,336: DEBUG/Worker-1] ('SELECT "t1"."id", "t1"."dont_contact_anymore", "t1"."company_name", "t1"."website", "t1"."email_address", "t1"."country", "t1"."scraped_on", "t1"."have_contacted", "t1"."current_pipeline_phase", "t1"."day_0_message_id", "t1"."day_0_response", "t1"."day_0_sent", "t1"."day_5_message_id", "t1"."day_5_response", "t1"."day_5_sent", "t1"."day_35_message_id", "t1"."day_35_response", "t1"."day_35_sent", "t1"."day_125_message_id", "t1"."day_125_response", "t1"."day_125_sent", "t1"."batch", "t1"."sector", "t1"."contacted_by" FROM "company" AS t1 WHERE ("t1"."id" = ?) ORDER BY "t1"."have_contacted" ASC LIMIT 1', [1869])
[2015-07-19 21:13:46,717: ERROR/MainProcess] Process 'Worker-1' pid:44579 exited with 'signal 11 (SIGSEGV)'
[2015-07-19 21:13:46,730: ERROR/MainProcess] Task app.views.send_mail_task[acff718c-a5e3-430c-96d7-789d969cdb4f] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
  File "/Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/billiard/pool.py", line 1171, in mark_as_worker_lost
    human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).

      

Update # 2 with rdb celery feature

~$ telnet localhost 6903
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
> /Users/wyssuser/Desktop/new_danish/app/views.py(107)send_mail_task()
-> company = Company.select().where(Company.id == i).get()
(Pdb) n
Connection closed by foreign host.
wyssuser@joeyorlcbookair:~$ telnet localhost 6902
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
> /Users/wyssuser/Desktop/new_danish/app/views.py(107)send_mail_task()
-> company = Company.select().where(Company.id == i).get()
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(4092)select()
-> @classmethod
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(4094)select()
-> query = SelectQuery(cls, *selection)
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2536)__init__()
-> def __init__(self, model_class, *selection):
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2537)__init__()
-> super(SelectQuery, self).__init__(model_class)
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2328)__init__()
-> def __init__(self, model_class):
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2329)__init__()
-> super(Query, self).__init__()
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(376)__init__()
-> def __init__(self):
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(377)__init__()
-> self._negated = False
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(378)__init__()
-> self._alias = None
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(379)__init__()
-> self._bind_to = None
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(380)__init__()
-> self._ordering = None  # ASC or DESC.
(Pdb) s
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(380)__init__()->None
-> self._ordering = None  # ASC or DESC.
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2331)__init__()
-> self.model_class = model_class
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2332)__init__()
-> self.database = model_class._meta.database
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2334)__init__()
-> self._dirty = True
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2335)__init__()
-> self._query_ctx = model_class
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2336)__init__()
-> self._joins = {self.model_class: []}  # Join graph as adjacency list.
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2337)__init__()
-> self._where = None
(Pdb) s
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2337)__init__()->None
-> self._where = None
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2538)__init__()
-> self.require_commit = self.database.commit_select
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2539)__init__()
-> self.__select(*selection)
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2621)__select()
-> def __select(self, *selection):
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2622)__select()
-> self._explicit_selection = len(selection) > 0
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2623)__select()
-> selection = selection or self.model_class._meta.get_fields()
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3932)get_fields()
-> def get_fields(self):
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3933)get_fields()
-> return [f[1] for f in self.get_sorted_fields()]
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3925)get_sorted_fields()
-> def get_sorted_fields(self):
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3926)get_sorted_fields()
-> key = lambda i: i[1]._sort_key
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3927)get_sorted_fields()
-> return sorted(self.fields.items(), key=key)
(Pdb) s
--Call--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3926)<lambda>()
-> key = lambda i: i[1]._sort_key
(Pdb) r
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3926)<lambda>()->(2, 42)
-> key = lambda i: i[1]._sort_key
(Pdb) r
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3927)get_sorted_fields()->[('id', <peewee....1228f910>), ('dont_contact_anymore', <peewee....12289e50>), ('company_name', <peewee....1228f190>), ('website', <peewee....1228f1d0>), ('email_address', <peewee....1228f210>), ('country', <peewee....1228f250>), ...]
-> return sorted(self.fields.items(), key=key)
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3933)get_fields()
-> return [f[1] for f in self.get_sorted_fields()]
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3933)get_fields()
-> return [f[1] for f in self.get_sorted_fields()]
(Pdb) s
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3933)get_fields()
-> return [f[1] for f in self.get_sorted_fields()]
(Pdb) r
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(3933)get_fields()->[<peewee....1228f910>, <peewee....12289e50>, <peewee....1228f190>, <peewee....1228f1d0>, <peewee....1228f210>, <peewee....1228f250>, ...]
-> return [f[1] for f in self.get_sorted_fields()]
(Pdb) r
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2624)__select()
-> self._select = self._model_shorthand(selection)
(Pdb) r
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2624)__select()->None
-> self._select = self._model_shorthand(selection)
(Pdb) r
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2540)__init__()
-> self._from = None
(Pdb) r
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(2554)__init__()->None
-> self._qr = None
(Pdb) r
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(4095)select()
-> if cls._meta.order_by:
(Pdb) r
--Return--
> /Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/peewee.py(4097)select()-><class '...d" ASC []
-> return query
(Pdb) r
Connection closed by foreign host.

      

+3


source to share





All Articles