[storm] Limit cache size by memory usage?

Stuart Bishop stuart.bishop at canonical.com
Mon Jul 27 11:31:45 BST 2009


So at the moment, we have to guess our cache sizes pretty much by
educated guesses and trial and error. Ideally I'd have a cache that
tunes itself, adapting to my changing data. I was experimenting with
adjusting the cache size based on memory usage. Unfortunately, it
looks like Python or maybe the OS is not aggressive enough at freeing
up RAM so the approach for my first experiment failed (I get spikes
cache size growth followed by lengthy lulls sitting at the minimum
size).

Can anyone think of any alternative approaches? I'm thinking my next
attempt when I'm bored will be a GenerationalCache that keeps bumping
up a global cache size when the cache is bumped until the memory limit
is reached, at which point it is fixed for the life of the program.
This should work for many applications, although will of course fail
for applications that delay loading their large objects until later in
the run time. I also thought of just growing the cache until I detect
swapping, but when I have multiple scripts running on the same server
that doesn't work.


=== modified file 'storm/cache.py'
--- storm/cache.py	2009-02-16 10:44:31 +0000
+++ storm/cache.py	2009-07-24 10:21:22 +0000
@@ -1,4 +1,8 @@
+import gc
 import itertools
+import os
+import threading
+import weakref


 class Cache(object):
@@ -160,3 +164,79 @@
         cached = self._new_cache.copy()
         cached.update(self._old_cache)
         return list(cached)
+
+
+class MemLimitedCache(GenerationalCache):
+    """A Cache that attempts to keep the process RAM usage within a limit.
+
+    The behavior is configured using class attributes:
+
+        MemLimitedCache.ram_limit is the size in bytes the process' data
+        set is allowed to grow to before the caches are shrunk.
+
+        MemLimitedCache.additions_between_checks is the number of
+        object additions to a MemLimitedCache that occur before a RAM
+        usage check is made.
+
+    This implementation is Unix specific and relies on /proc/[pid]/statm.
+    See the proc(5) man page to see if this is supported on your platform.
+    """
+
+    # When the process is using more than this many bytes of RAM,
+    # all the caches larger than their minimum size are bumped.
+    ram_limit = 200000000
+    additions_between_checks = 200
+
+    _cache_refs = []   # Class property.
+
+    def __init__(self):
+        if not os.path.exists("/proc/%s/statm" % os.getpid()):
+            raise NotImplementedError(
+                "/proc/[pid]/statm not available on this platform")
+        super(MemLimitedCache, self).__init__()
+        MemLimitedCache._cache_refs.append(
+            weakref.ref(self, MemLimitedCache._cache_refs.remove))
+
+    def add(self, obj_info):
+        """See `storm.store.Cache.add`."""
+        if obj_info not in self._new_cache:
+            MemLimitedCache._bump_generations()
+            self._new_cache[obj_info] = obj_info.get_obj()
+
+    _lock = threading.Lock()
+    _check_counter = 0
+
+    @staticmethod
+    def _bump_generations():
+        MemLimitedCache._check_counter += 1
+
+        should_check = (
+            MemLimitedCache._check_counter
+            >= MemLimitedCache.additions_between_checks)
+
+        if should_check and MemLimitedCache._lock.acquire(False):
+            try:
+                should_check = (
+                    MemLimitedCache._check_counter
+                    > MemLimitedCache.additions_between_checks)
+                if should_check:
+                    data_size = int(
+                        open("/proc/%s/statm" % os.getpid()).read().split()[5])
+                    print "data_size %d." % data_size,
+                    # Reset the counter here - we are interested in the
+                    # number of additions made since the last time the
+                    # memory usage was checked.
+                    MemLimitedCache._check_counter = 0
+                    if data_size > MemLimitedCache.ram_limit:
+                        print "over limit.",
+                        for ref in MemLimitedCache._cache_refs:
+                            cache = ref()
+                            if cache is not None:
+                                print "bumping. old objs %d. new objs %d" % (
+                                    len(cache._old_cache),
+                                    len(cache._new_cache)),
+                                cache._bump_generation()
+                        gc.collect()
+                    print
+            finally:
+                MemLimitedCache._lock.release()

=== added file 'testy.py'
--- testy.py	1970-01-01 00:00:00 +0000
+++ testy.py	2009-07-24 10:29:46 +0000
@@ -0,0 +1,50 @@
+import random
+from storm.locals import *
+from storm.cache import MemLimitedCache
+
+class Person(object):
+    __storm_table__ = "person"
+    id = Int(primary=True)
+    name = Unicode()
+
+random.seed(42)
+
+database = create_database("postgres:stub")
+MemLimitedCache.ram_limit = 50000
+MemLimitedCache.additions_between_checks = 1000
+cache = MemLimitedCache()
+
+store = Store(database, cache)
+
+store.execute("""
+    DROP TABLE IF EXISTS person
+    """, noresult=True)
+store.execute("""
+    CREATE TABLE person (
+        id serial PRIMARY KEY,
+        name VARCHAR)
+    """, noresult=True)
+
+p = None
+commit_every = 10000
+
+def checkpoint():
+    store.commit()
+    print "created %d. id was %d." % (commit_every, p.id)
+
+for i in range(0, 250000):
+    p = Person()
+    p.name = u"name %f" * 100
+    store.add(p)
+    if i % commit_every == 0:
+        checkpoint()
+checkpoint()
+
+max_id = p.id
+
+notify_every = 20000
+for i in range(0, 1000000):
+    p = store.get(Person, random.randint(1, max_id))
+    if i % notify_every == 0:
+        print "Loaded %d." % notify_every
+


-- 
Stuart Bishop <stuart at stuartbishop.net>
http://www.stuartbishop.net/



More information about the storm mailing list