Ndb models are not stored in memcache when using MapReduce
I have created two MapReduce Pipelines to upload CSV files to create categories and bulk products. Each product is linked to a category via KeyProperty. Category and product models are built on ndb.Model, so based on the documentation I think they will be automatically cached in Memcache when retrieved from the datastore.
I ran these scripts on a server to load 30 categories and then 3000 products. All data appears in the data warehouse as expected.
However, it looks like product upload is using Memcache to get categories. When I check the Memcache viewer on the portal, it says something like lines with about 180 hits and about 60 hits. If I download 3000 products and get a category every time, shouldn't I have about 3000 hits + hits from selecting a category (i.e. Category.get_by_id (category_id))? And probably 3000 more misses from trying to fetch an existing product before creating a new one (the algorithm handles both entity creation and updates).
Here's a related product mapping function that takes a string from a CSV file to create or update a product:
def product_bulk_import_map(data):
"""Product Bulk Import map function."""
result = {"status" : "CREATED"}
product_data = data
try:
# parse input parameter tuple
byteoffset, line_data = data
# parse base product data
product_data = [x for x in csv.reader([line_data])][0]
(p_id, c_id, p_type, p_description) = product_data
# process category
category = Category.get_by_id(c_id)
if category is None:
raise Exception(product_import_error_messages["category"] % c_id)
# store in datastore
product = Product.get_by_id(p_id)
if product is not None:
result["status"] = "UPDATED"
product.category = category.key
product.product_type = p_type
product.description = p_description
else:
product = Product(
id = p_id,
category = category.key,
product_type = p_type,
description = p_description
)
product.put()
result["entity"] = product.to_dict()
except Exception as e:
# catch any exceptions, and note failure in output
result["status"] = "FAILED"
result["entity"] = str(e)
# return results
yield (str(product_data), result)
source to share
MapReduce intentionally disables memcache for NDB.
See mapreduce / util.py ln 373, _set_ndb_cache_policy()
(as of 2015-05-01):
def _set_ndb_cache_policy():
"""Tell NDB to never cache anything in memcache or in-process.
This ensures that entities fetched from Datastore input_readers via NDB
will not bloat up the request memory size and Datastore Puts will avoid
doing calls to memcache. Without this you get soft memory limit exits,
which hurts overall throughput.
"""
ndb_ctx = ndb.get_context()
ndb_ctx.set_cache_policy(lambda key: False)
ndb_ctx.set_memcache_policy(lambda key: False)
You can force get_by_id()
and put()
use memcache, for example:
product = Product.get_by_id(p_id, use_memcache=True) ... product.put(use_memcache=True)
Alternatively, you can change the NDB context if you are grouping together with mapreduce.operation
. However, I don't know enough to tell if this has other unwanted effects:
ndb_ctx = ndb.get_context()
ndb_ctx.set_memcache_policy(lambda key: True)
...
yield operation.db.Put(product)
As far as the docstring about "memory-constrained exits" is concerned, I don't understand why this would happen if only memcache was enabled (i.e. non-context cache).
It actually seems like you want memcache to be enabled for puts, otherwise your application will end up reading stale data from the memcache NDB after your mapper has changed the data below it.
source to share
As Slawek Rewaj already mentioned, this is caused by the cache in context. When retrieving an entity, NDB first tries to use the in-context cache, then memcache, and finally retrieves the object from the datastore if it is not found in either the context cache or memcache. The cache in context is just a Python dictionary, and its lifespan and visibility are limited by the current request, but MapReduce makes multiple calls to product_bulk_import_map () within a single request.
More information on cache in context can be found here: https://cloud.google.com/appengine/docs/python/ndb/cache#incontext
source to share