Python pyramid settings not available in registry
As per the docs, here it is possible to access the settings in the .ini files for use throughout the application, even if there is no access to the request object. I would find this very useful because I want to specify different database lines in .ini and be able to access them not only in __init__
, but also from other applications. However, whenever and wherever I try it, the value settings
is None
. What for? (Pyramid v1.5.7)
registry = pyramid.threadlocal.get_current_registry()
settings = registry.settings
debug_frobnosticator = settings['debug_frobnosticator']
This is how the application is initialized:
from pyramid.config import Configurator
from pyramid.authentication import AuthTktAuthenticationPolicy
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.security import authenticated_userid
from bang.model import RootFactory, groupFinder
from pyramid.renderers import JSON
import json
from datetime import datetime, date
from bson.objectid import ObjectId
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from pytz import utc
import platform
from bang.classes import * # the db string is required in this module
import logging
from datetime import timedelta
# main app config
def main(global_config, **settings):
authn_policy = AuthTktAuthenticationPolicy(
secret='xxxxxx',
callback=groupFinder,
hashalg='sha512',
include_ip=True,
timeout=36000)
authz_policy = ACLAuthorizationPolicy()
config = Configurator(
settings=settings,
root_factory=RootFactory)
config.set_authentication_policy(authn_policy)
config.set_authorization_policy(authz_policy)
logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
logging.getLogger("apscheduler.executors.default").setLevel(logging.ERROR)
logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.ERROR)
### first use of db from bang.classes here
# set house_id auto_inc to the highest current house_id
new_int = Asset().find().sort('house_id_int', -1)[0].house_id_int
Asset()._database['Sequences'].find_and_modify({'_id': 'house_id_int'}, {'$set': {'seq': new_int}}, upsert=True)
# set house_id auto_inc to the highest current house_id
new_int = Contract().find().sort('code_int', -1)[0].code_int
Contract()._database['Sequences'].find_and_modify({'_id': 'code_int'}, {'$set': {'seq': new_int}}, upsert=True)
def get_user(request):
user = User().find_one({'_id': ObjectId(authenticated_userid(request))}, fields={'password': False})
return {'_id': user._id, 'defaults': user.defaults, 'full_name': user.full_name, 'first_name': user.first_name,
'last_name': user.last_name, 'login_count': user.login_count, 'permissions': user.permissions}
# if this Pyramid instance is an Agent host, then start a scheduler
agent = Agent().find_one({'host_name': platform.node()})
if agent:
# created background scheduler
scheduler = BackgroundScheduler(
jobstores={
'mongo': MongoDBJobStore(client=cn, database='foobar', collection='ScheduledJobs_%s' % agent.host_name, timezone=utc,
job_defaults={'coalesce': True, 'misfire_grace_time': 86400})})
log.info('created scheduler for agent_host %s' % agent.host_name)
# start the scheduler
scheduler.start()
def get_scheduler(request):
return scheduler
config.add_request_method(get_scheduler, 'scheduler', reify=True)
for agent in Agent().find({'host_name': platform.node()}):
# add a job to scan the agent
log.info('added scan schedule for %s on %s' % (agent.name, agent.host_name))
scheduler.add_job(agent_scan, trigger='interval', args=[str(agent._id)], jobstore='mongo',
id='%s_scan' % agent._id, seconds=agent.scan_cycle, replace_existing=True)
# add a job to process the agent joblist
log.info('added jobs schedule for %s on %s' % (agent.name, agent.host_name))
scheduler.add_job(agent_jobs, trigger='interval', args=[str(agent._id)], jobstore='mongo',
id='%s_jobs' % agent._id, seconds=10, start_date=datetime.utcnow() + timedelta(seconds=60), replace_existing=True)
# add the authenticated user to each request
config.add_request_method(get_user, 'user', reify=True)
# custom json adapters
custom_json = JSON()
def haystack_obj_adapter(obj, request):
return obj.json
config.add_renderer('json', custom_json)
# routes for standard static images
config.add_route('apple-touch', '/apple-touch-icon.png')
# etc
# routes
config.add_route('remote.files', '/remote/files/{method}')
# etc
config.add_static_view('/', 'static', cache_max_age=0)
config.scan()
return config.make_wsgi_app()
TypeError: 'NoneType' object has no attribute '__getitem__'
source to share
The sample code is missing the complete tracking and viewing code.
It doesn't actually work the way you might expect. Thread local cannot be set as this feature is mainly for pyramid.testing
unit tests and if I understood correctly. The lifecycle of a thread is different from regular requests and unit tests.
Please use request.registry
in views and registry
explicitly skip . If you need to access registry
in main()
, pass it on config.registry
.
source to share