[storm] Cache for nonexistent result

Free Ekanayaka free at 64studio.com
Fri Jan 16 21:20:11 UTC 2015


See:

http://en.wikipedia.org/wiki/Isolation_%28database_systems%29

for reference.

On Fri, Jan 16, 2015 at 10:19 PM, Free Ekanayaka <free at 64studio.com> wrote:

> Hi Ivan,
>
> it feels what you suggest would work safely on for transactions set the
> serializable isolation level, not repeteable reads down to read uncommitted
> (since phantom reads could occur there, and the non-existing cache would
> hide new results).
>
> Cheers
>
> On Fri, Jan 16, 2015 at 5:55 PM, Ivan Zakrevskyi <
> ivan.zakrevskyi at rebelmouse.com> wrote:
>
>> Hi, all. Thanks for answer. I'll try to explain.
>>
>> Try to get existent object.
>>
>> In [2]: store.get(StTwitterProfile, (1,3))
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM twitterprofile WHERE twitterprofile.context_id
>> = %s AND twitterprofile.user_id = %s LIMIT 1; args=(1, 3)'
>> Out[2]: <users.orm.TwitterProfile at 0x7f1e93b6d450>
>>
>> In [3]: store.get(StTwitterProfile, (1,3))
>> Out[3]: <users.orm.TwitterProfile at 0x7f1e93b6d450>
>>
>> In [4]: store.get(StTwitterProfile, (1,3))
>> Out[4]: <users.orm.TwitterProfile at 0x7f1e93b6d450>
>>
>> You can see, that storm made only one query.
>>
>> Ok, now try get nonexistent twitter profile for given context:
>>
>> In [5]: store.get(StTwitterProfile, (10,3))
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM twitterprofile WHERE twitterprofile.context_id
>> = %s AND twitterprofile.user_id = %s LIMIT 1; args=(1, 10)'
>>
>> In [6]: store.get(StTwitterProfile, (10,3))
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM twitterprofile WHERE twitterprofile.context_id
>> = %s AND twitterprofile.user_id = %s LIMIT 1; args=(1, 10)'
>>
>> In [7]: store.get(StTwitterProfile, (10,3))
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM twitterprofile WHERE twitterprofile.context_id
>> = %s AND twitterprofile.user_id = %s LIMIT 1; args=(1, 10)'
>>
>> Storm sends a query to the database each time.
>>
>> For example, we have a some util:
>>
>> def myutil(user_id, *args, **kwargs):
>>     context_id =
>> get_context_from_mongodb_redis_memcache_environment_etc(user_id, *args,
>> **kwargs)
>>     twitter_profile = store.get(TwitterProfile, (context_id, user_id))
>>     return twitter_profile.some_attr
>>
>> In this case, Storm will send a query to the database each time.
>>
>> The similar situation for non-existent relation.
>>
>> In [20]: u = store.get(StUser, 10)
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM user WHERE user.id = %s LIMIT 1; args=(10,)'
>>
>>
>> In [22]: u.profile
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM userprofile WHERE userprofile.user_id = %s
>> LIMIT 1; args=(10,)'
>>
>> In [23]: u.profile
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM userprofile WHERE userprofile.user_id = %s
>> LIMIT 1; args=(10,)'
>>
>> In [24]: u.profile
>> base.py:50 =>
>> u'(0.001) SELECT ... FROM userprofile WHERE userprofile.user_id = %s
>> LIMIT 1; args=(10,)'
>>
>> I've created a temporary patch, to reduce number of DB queries (see
>> bellow). But I am sure that a solution can be more elegant (on library
>> level).
>>
>>
>> class NonexistentCache(list):
>>
>>     _size = 1000
>>
>>     def add(self, val):
>>         if val in self:
>>             self.remove(val)
>>         self.insert(0, val)
>>         if len(self) > self._size:
>>             self.pop()
>>
>>
>> class Store(StoreOrig):
>>
>>     def __init__(self, database, cache=None):
>>         StoreOrig.__init__(self, database, cache)
>>         self.nonexistent_cache = NonexistentCache()
>>
>>     def get(self, cls, key, exists=False):
>>         """Get object of type cls with the given primary key from the
>> database.
>>
>>         This method is patched to cache nonexistent values to reduce
>> number of DB-queries.
>>         If the object is alive the database won't be touched.
>>
>>         @param cls: Class of the object to be retrieved.
>>         @param key: Primary key of object. May be a tuple for composed
>> keys.
>>
>>         @return: The object found with the given primary key, or None
>>             if no object is found.
>>         """
>>
>>         if self._implicit_flush_block_count == 0:
>>             self.flush()
>>
>>         if type(key) != tuple:
>>             key = (key,)
>>
>>         cls_info = get_cls_info(cls)
>>
>>         assert len(key) == len(cls_info.primary_key)
>>
>>         primary_vars = []
>>         for column, variable in zip(cls_info.primary_key, key):
>>             if not isinstance(variable, Variable):
>>                 variable = column.variable_factory(value=variable)
>>             primary_vars.append(variable)
>>
>>         primary_values = tuple(var.get(to_db=True) for var in
>> primary_vars)
>>
>>         # Patched
>>         alive_key = (cls_info.cls, primary_values)
>>         obj_info = self._alive.get(alive_key)
>>         if obj_info is not None and not obj_info.get("invalidated"):
>>             return self._get_object(obj_info)
>>
>>         if obj_info is None and not exists and alive_key in
>> self.nonexistent_cache:
>>             return None
>>         # End of patch
>>
>>         where = compare_columns(cls_info.primary_key, primary_vars)
>>
>>         select = Select(cls_info.columns, where,
>>                         default_tables=cls_info.table, limit=1)
>>
>>         result = self._connection.execute(select)
>>         values = result.get_one()
>>         if values is None:
>>             # Patched
>>             self.nonexistent_cache.add(alive_key)
>>             # End of patch
>>             return None
>>         return self._load_object(cls_info, result, values)
>>
>>     def get_multi(self, cls, keys, exists=False):
>>         """Get objects of type cls with the given primary key from the
>> database.
>>
>>         If the object is alive the database won't be touched.
>>
>>         @param cls: Class of the object to be retrieved.
>>         @param key: Collection of primary key of objects (that may be a
>> tuple for composed keys).
>>
>>         @return: The object found with the given primary key, or None
>>             if no object is found.
>>         """
>>         result = {}
>>         missing = {}
>>         if self._implicit_flush_block_count == 0:
>>             self.flush()
>>
>>         for key in keys:
>>             key_orig = key
>>             if type(key) != tuple:
>>                 key = (key,)
>>
>>             cls_info = get_cls_info(cls)
>>
>>             assert len(key) == len(cls_info.primary_key)
>>
>>             primary_vars = []
>>             for column, variable in zip(cls_info.primary_key, key):
>>                 if not isinstance(variable, Variable):
>>                     variable = column.variable_factory(value=variable)
>>                 primary_vars.append(variable)
>>
>>             primary_values = tuple(var.get(to_db=True) for var in
>> primary_vars)
>>
>>             alive_key = (cls_info.cls, primary_values)
>>             obj_info = self._alive.get(alive_key)
>>             if obj_info is not None and not obj_info.get("invalidated"):
>>                 result[key_orig] = self._get_object(obj_info)
>>                 continue
>>
>>             if obj_info is None and not exists and alive_key in
>> self.nonexistent_cache:
>>                 result[key_orig] = None
>>                 continue
>>
>>             missing[primary_values] = key_orig
>>
>>         if not missing:
>>             return result
>>
>>         wheres = []
>>         for i, column in enumerate(cls_info.primary_key):
>>             wheres.append(In(cls_info.primary_key[i], tuple(v[i] for v in
>> missing)))
>>         where = And(*wheres) if len(wheres) > 1 else wheres[0]
>>
>>         for obj in self.find(cls, where):
>>             key_orig = missing.pop(tuple(var.get(to_db=True) for var in
>> get_obj_info(obj).get("primary_vars")))
>>             result[key_orig] = obj
>>
>>         for primary_values, key_orig in missing.items():
>>             self.nonexistent_cache.add((cls, primary_values))
>>             result[key_orig] = None
>>
>>         return result
>>
>>     def reset(self):
>>         StoreOrig.reset(self)
>>         del self.nonexistent_cache[:]
>>
>>
>>
>> 2015-01-16 9:03 GMT+02:00 Free Ekanayaka <free at 64studio.com>:
>>
>>> Hi Ivan
>>>
>>> On Thu, Jan 15, 2015 at 10:23 PM, Ivan Zakrevskyi <
>>> ivan.zakrevskyi at rebelmouse.com> wrote:
>>>
>>>> Hi all.
>>>>
>>>> Storm has excellent caching behavior, but stores in Store._alive only
>>>> existent objects. If object does not exists for some key, storm makes
>>>> DB-query again and again.
>>>>
>>>> Are you planning add caching for keys of nonexistent objects to prevent
>>>> DB-query?
>>>>
>>>
>>> If an object doesn't exist in the cache it meas that either it was not
>>> yet loaded at all,  or it was loaded but it's now mark as "invalidated"
>>> (for example the transaction in which it was loaded fresh has terminated).
>>>
>>> So I'm note sure what you mean in you question, but I don't think
>>> anything more that could be cached (in terms of key->object values).
>>>
>>> Cheers
>>>
>>>
>>
>> --
>> storm mailing list
>> storm at lists.canonical.com
>> Modify settings or unsubscribe at:
>> https://lists.ubuntu.com/mailman/listinfo/storm
>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.ubuntu.com/archives/storm/attachments/20150116/62f97d57/attachment.html>


More information about the storm mailing list