Task to start airflow from UI, KeyError: No such transport

Celery related cfg airflow parameters:

broker_url = 'amqp://guest:guest@rabbitmq_server:8080'
celery_result_backend = db+postgresql://developer:password@postgres_server:5432/db_name

      

airflow webserver

works fine, but while executing a task from the airflow UI, I get an error. error when starting task from ui

I am wrong when running the airflow scheduler, tracecak:

 Traceback (most recent call last):
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 125, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 172, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 952, in run
executor.heartbeat()
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py", line 124, in heartbeat
self.execute_async(key, command=command, queue=queue)
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 80, in execute_async
args=[command], queue=queue)
File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 536, in apply_async
**options
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 734, in send_task
with self.producer_or_acquire(producer) as P:
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 863, in producer_or_acquire
producer, self.producer_pool.acquire, block=True,
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 1233, in producer_pool
return self.amqp.producer_pool
File "/usr/local/lib/python2.7/dist-packages/celery/app/amqp.py", line 614, in producer_pool
self.app.connection_for_write()]
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 760, in connection_for_write
return self._connection(url or self.conf.broker_write_url, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 828, in _connection
'broker_connection_timeout', connect_timeout
File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 179, in __init__
if not get_transport_cls(transport).can_parse_url:
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 83, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 64, in resolve_transport
raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: '

      

My module versions:

  • air flow == 1.8
  • celery == 4.1.0
  • kombu == 4.1.0
  • python == 2.7.12
+3


source to share


1 answer


I spent a lot of time on this issue, the quotes in the cause of this error was broker_url = 'amqp://guest:guest@rabbitmq_server:8080'

, just removing the quotes: broker_url = amqp://guest:guest@rabbitmq_server:8080

solved the problem.



+6


source







All Articles