Rev 4805: (robertc) Fix threaded access to chk_map functions corrupting a in file:///home/pqm/archives/thelove/bzr/2.1/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Fri Jan 29 16:20:21 GMT 2010


At file:///home/pqm/archives/thelove/bzr/2.1/

------------------------------------------------------------
revno: 4805 [merge]
revision-id: pqm at pqm.ubuntu.com-20100129162018-0a1o6pjjpky5zwql
parent: pqm at pqm.ubuntu.com-20100129100124-va4vnmro5vyjfods
parent: robertc at robertcollins.net-20100129151858-0c1m88b013wluaxn
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: 2.1
timestamp: Fri 2010-01-29 16:20:18 +0000
message:
  (robertc) Fix threaded access to chk_map functions corrupting a
  	module level cache. (Robert Collins, John Arbash Meinel, bug 514090)
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/chk_map.py              chk_map.py-20081001014447-ue6kkuhofvdecvxa-1
  bzrlib/tests/test_chk_map.py   test_chk_map.py-20081001014447-ue6kkuhofvdecvxa-2
=== modified file 'NEWS'
--- a/NEWS	2010-01-29 09:13:30 +0000
+++ b/NEWS	2010-01-29 15:18:58 +0000
@@ -5,6 +5,24 @@
 .. contents:: List of Releases
    :depth: 1
 
+bzr 2.1.0 (not released yet)
+############################
+
+:Codename: 
+:2.1.0: 
+
+Bug Fixes
+*********
+
+* Using the ``bzrlib.chk_map`` module from within multiple threads at the
+  same time was broken due to race conditions with a module level page
+  cache. This shows up as a KeyError in the ``bzrlib.lru_cache`` code with
+  ``bzrlib.chk_map`` in the backtrace, and can be triggered without using
+  the same high level objects such as ``bzrlib.repository.Repository``
+  from different threads. chk_map now uses a thread local cache which may
+  increase memory pressure on processes using threads.
+  (Robert Collins, John Arbash Meinel, #514090)
+
 bzr 2.1.0rc2
 ############
 

=== modified file 'bzrlib/chk_map.py'
--- a/bzrlib/chk_map.py	2009-11-02 17:27:52 +0000
+++ b/bzrlib/chk_map.py	2010-01-29 15:18:58 +0000
@@ -38,6 +38,7 @@
 """
 
 import heapq
+import threading
 
 from bzrlib import lazy_import
 lazy_import.lazy_import(globals(), """
@@ -59,11 +60,31 @@
 # If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
 # out, it takes 3.1MB to cache the layer.
 _PAGE_CACHE_SIZE = 4*1024*1024
-# We are caching bytes so len(value) is perfectly accurate
-_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
+# Per thread caches for 2 reasons:
+# - in the server we may be serving very different content, so we get less
+#   cache thrashing.
+# - we avoid locking on every cache lookup.
+_thread_caches = threading.local()
+# The page cache.
+_thread_caches.page_cache = None
+
+def _get_cache():
+    """Get the per-thread page cache.
+
+    We need a function to do this because in a new thread the _thread_caches
+    threading.local object does not have the cache initialized yet.
+    """
+    page_cache = getattr(_thread_caches, 'page_cache', None)
+    if page_cache is None:
+        # We are caching bytes so len(value) is perfectly accurate
+        page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
+        _thread_caches.page_cache = page_cache
+    return page_cache
+
 
 def clear_cache():
-    _page_cache.clear()
+    _get_cache().clear()
+
 
 # If a ChildNode falls below this many bytes, we check for a remap
 _INTERESTING_NEW_SIZE = 50
@@ -161,11 +182,11 @@
 
     def _read_bytes(self, key):
         try:
-            return _page_cache[key]
+            return _get_cache()[key]
         except KeyError:
             stream = self._store.get_record_stream([key], 'unordered', True)
             bytes = stream.next().get_bytes_as('fulltext')
-            _page_cache[key] = bytes
+            _get_cache()[key] = bytes
             return bytes
 
     def _dump_tree(self, include_keys=False):
@@ -901,7 +922,7 @@
         bytes = ''.join(lines)
         if len(bytes) != self._current_size():
             raise AssertionError('Invalid _current_size')
-        _page_cache.add(self._key, bytes)
+        _get_cache().add(self._key, bytes)
         return [self._key]
 
     def refs(self):
@@ -1143,7 +1164,7 @@
             found_keys = set()
             for key in keys:
                 try:
-                    bytes = _page_cache[key]
+                    bytes = _get_cache()[key]
                 except KeyError:
                     continue
                 else:
@@ -1174,7 +1195,7 @@
                     prefix, node_key_filter = keys[record.key]
                     node_and_filters.append((node, node_key_filter))
                     self._items[prefix] = node
-                    _page_cache.add(record.key, bytes)
+                    _get_cache().add(record.key, bytes)
                 for info in node_and_filters:
                     yield info
 
@@ -1300,7 +1321,7 @@
             lines.append(serialised[prefix_len:])
         sha1, _, _ = store.add_lines((None,), (), lines)
         self._key = StaticTuple("sha1:" + sha1,).intern()
-        _page_cache.add(self._key, ''.join(lines))
+        _get_cache().add(self._key, ''.join(lines))
         yield self._key
 
     def _search_key(self, key):
@@ -1489,11 +1510,11 @@
         self._state = None
 
     def _read_nodes_from_store(self, keys):
-        # We chose not to use _page_cache, because we think in terms of records
-        # to be yielded. Also, we expect to touch each page only 1 time during
-        # this code. (We may want to evaluate saving the raw bytes into the
-        # page cache, which would allow a working tree update after the fetch
-        # to not have to read the bytes again.)
+        # We chose not to use _get_cache(), because we think in
+        # terms of records to be yielded. Also, we expect to touch each page
+        # only 1 time during this code. (We may want to evaluate saving the
+        # raw bytes into the page cache, which would allow a working tree
+        # update after the fetch to not have to read the bytes again.)
         as_st = StaticTuple.from_sequence
         stream = self._store.get_record_stream(keys, 'unordered', True)
         for record in stream:

=== modified file 'bzrlib/tests/test_chk_map.py'
--- a/bzrlib/tests/test_chk_map.py	2009-10-21 20:53:21 +0000
+++ b/bzrlib/tests/test_chk_map.py	2010-01-29 15:18:58 +0000
@@ -905,7 +905,7 @@
         # Unmapping the new node will check the existing nodes to see if they
         # would fit.
         # Clear the page cache so we ensure we have to read all the children
-        chk_map._page_cache.clear()
+        chk_map.clear_cache()
         chkmap.unmap(('aad',))
         self.assertIsInstance(chkmap._root_node._items['aaa'], LeafNode)
         self.assertIsInstance(chkmap._root_node._items['aab'], LeafNode)
@@ -945,12 +945,12 @@
         # Now clear the page cache, and only include 2 of the children in the
         # cache
         aab_key = chkmap._root_node._items['aab']
-        aab_bytes = chk_map._page_cache[aab_key]
+        aab_bytes = chk_map._get_cache()[aab_key]
         aac_key = chkmap._root_node._items['aac']
-        aac_bytes = chk_map._page_cache[aac_key]
-        chk_map._page_cache.clear()
-        chk_map._page_cache[aab_key] = aab_bytes
-        chk_map._page_cache[aac_key] = aac_bytes
+        aac_bytes = chk_map._get_cache()[aac_key]
+        chk_map.clear_cache()
+        chk_map._get_cache()[aab_key] = aab_bytes
+        chk_map._get_cache()[aac_key] = aac_bytes
 
         # Unmapping the new node will check the nodes from the page cache
         # first, and not have to read in 'aaa'




More information about the bazaar-commits mailing list