Rev 2692: Allow adaption of KnitData to pack files. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Thu Aug 2 07:26:57 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2692
revision-id: robertc at robertcollins.net-20070802062654-klu22dv5vnelnk4m
parent: robertc at robertcollins.net-20070802041827-sllv9yvivttnw3az
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Thu 2007-08-02 16:26:54 +1000
message:
  Allow adaption of KnitData to pack files.
modified:
  bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
  bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2007-08-01 08:00:22 +0000
+++ b/bzrlib/knit.py	2007-08-02 06:26:54 +0000
@@ -70,6 +70,12 @@
 import warnings
 
 import bzrlib
+from bzrlib.lazy_import import lazy_import
+lazy_import(globals(), """
+from bzrlib import (
+    pack,
+    )
+""")
 from bzrlib import (
     cache_utf8,
     errors,
@@ -339,7 +345,6 @@
 def make_empty_knit(transport, relpath):
     """Construct a empty knit at the specified location."""
     k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
-    k._data._open_file()
 
 
 class KnitVersionedFile(VersionedFile):
@@ -394,10 +399,11 @@
                 dir_mode=dir_mode)
         else:
             self._index = index
-        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
-            access_mode, create=create and not len(self), file_mode=file_mode,
-            create_parent_dir=create_parent_dir, delay_create=delay_create,
-            dir_mode=dir_mode)
+        _access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
+            ((create and not len(self)) and delay_create), create_parent_dir)
+        if create and not len(self) and not delay_create:
+            _access.create()
+        self._data = _KnitData(_access)
 
     def __repr__(self):
         return '%s(%s)' % (self.__class__.__name__, 
@@ -496,12 +502,13 @@
                      the preceding records sizes.
         """
         # write all the data
-        pos = self._data.add_raw_record(data)
+        raw_record_sizes = [record[3] for record in records]
+        positions = self._data.add_raw_records(raw_record_sizes, data)
         offset = 0
         index_entries = []
-        for (version_id, options, parents, size) in records:
-            index_entries.append((version_id, options, pos+offset,
-                                  size, parents))
+        for (version_id, options, parents, size), (pos, length) in zip(
+            records, positions):
+            index_entries.append((version_id, options, pos, size, parents))
             if self._data._do_cache:
                 self._data._cache[version_id] = data[offset:offset+size]
             offset += size
@@ -1073,9 +1080,6 @@
             raise KnitHeaderError(badline=line,
                               filename=self._transport.abspath(self._filename))
 
-    def commit(self):
-        """Commit is a nop."""
-
     def __repr__(self):
         return '%s(%s)' % (self.__class__.__name__, self._filename)
 
@@ -1661,18 +1665,180 @@
         
     def _version_ids_to_keys(self, version_ids):
         return set((version_id, ) for version_id in version_ids)
-        
-
-class _KnitData(_KnitComponentFile):
-    """Contents of the knit data file"""
-
-    def __init__(self, transport, filename, mode, create=False, file_mode=None,
-                 create_parent_dir=False, delay_create=False,
-                 dir_mode=None):
-        _KnitComponentFile.__init__(self, transport, filename, mode,
-                                    file_mode=file_mode,
-                                    create_parent_dir=create_parent_dir,
-                                    dir_mode=dir_mode)
+
+
+class _KnitAccess(object):
+    """Access to knit records in a .knit file."""
+
+    def __init__(self, transport, filename, _file_mode, _dir_mode,
+        _need_to_create, _create_parent_dir):
+        """Create a _KnitAccess for accessing and inserting data.
+
+        :param transport: The transport the .knit is located on.
+        :param filename: The filename of the .knit.
+        """
+        self._transport = transport
+        self._filename = filename
+        self._file_mode = _file_mode
+        self._dir_mode = _dir_mode
+        self._need_to_create = _need_to_create
+        self._create_parent_dir = _create_parent_dir
+
+    def add_raw_records(self, sizes, raw_data):
+        """Add raw knit bytes to a storage area.
+
+        The data is spooled to whereever the access method is storing data.
+
+        :param sizes: An iterable containing the size of each raw data segment.
+        :param raw_data: A bytestring containing the data.
+        :return: A list of memos to retrieve the record later. For the .knit access
+            method these are readv pairs - offset, length. Note that this is
+            matched to a particular index engine, so can vary between
+            access methods.
+        """
+        assert type(raw_data) == str, \
+            'data must be plain bytes was %s' % type(raw_data)
+        if not self._need_to_create:
+            base = self._transport.append_bytes(self._filename, raw_data)
+        else:
+            self._transport.put_bytes_non_atomic(self._filename, raw_data,
+                                   create_parent_dir=self._create_parent_dir,
+                                   mode=self._file_mode,
+                                   dir_mode=self._dir_mode)
+            self._need_to_create = False
+            base = 0
+        result = []
+        for size in sizes:
+            result.append((base, size))
+            base += size
+        return result
+
+    def create(self):
+        """IFF this data access has its own storage area, initialise it.
+
+        :return: None.
+        """
+        self._transport.put_bytes_non_atomic(self._filename, '',
+                                             mode=self._file_mode)
+
+    def open_file(self):
+        """IFF this data access can be represented as a single file, open it.
+
+        For knits that are not mapped to a single file on disk this will
+        always return None.
+
+        :return: None or a file handle.
+        """
+        try:
+            return self._transport.get(self._filename)
+        except NoSuchFile:
+            pass
+        return None
+
+    def get_raw_records(self, memos_for_retrieval):
+        """Get the raw bytes for a records.
+
+        :param memos_for_retrieval: An iterable containing the access method
+            specific memo for retriving the bytes. For the .knit method this is
+            a readv tuple.
+        :return: An iterator over the bytes of the records.
+        """
+        for pos, data in self._transport.readv(self._filename, memos_for_retrieval):
+            yield data
+
+
+class _PackAccess(object):
+    """Access to knit records via a collection of packs."""
+
+    def __init__(self, index_to_packs, writer=None):
+        """Create a _PackAccess object.
+
+        :param index_to_packs: A dict mapping index objects to the transport
+            and file names for obtaining data.
+        :param writer: A tuple (pack.ContainerWriter, write_index) which
+            is contains the pack to write, and the index that reads from
+            it will be associated with.
+        """
+        if writer:
+            self.container_writer = writer[0]
+            self.write_index = writer[1]
+        else:
+            self.container_writer = None
+            self.write_index = None
+        self.indices = index_to_packs
+
+    def add_raw_records(self, sizes, raw_data):
+        """Add raw knit bytes to a storage area.
+
+        The data is spooled to the container writer in one bytes record per
+        raw data item.
+
+        :param sizes: An iterable containing the size of each raw data segment.
+        :param raw_data: A bytestring containing the data.
+        :return: A list of memos to retrieve the record later. For the pack
+            access method these are the pack offset, lenth pairs with a 
+            pack key of the write index.
+            Note that this is matched to a particular index engine, so can vary
+            between access methods.
+        """
+        assert type(raw_data) == str, \
+            'data must be plain bytes was %s' % type(raw_data)
+        result = []
+        offset = 0
+        for size in sizes:
+            p_offset, p_length = self.container_writer.add_bytes_record(
+                raw_data[offset:offset+size], [])
+            offset += size
+            result.append((self.write_index, p_offset, p_length))
+        return result
+
+    def create(self):
+        """Pack based knits do not get individually created."""
+
+    def get_raw_records(self, memos_for_retrieval):
+        """Get the raw bytes for a records.
+
+        :param memos_for_retrieval: An iterable containing the access method
+            specific memo for retriving the bytes. For the Pack access method
+            this is a tuple (index, offset, length).
+        :return: An iterator over the bytes of the records.
+        """
+        # first pass, group into same-index requests
+        request_lists = []
+        current_index = None
+        for (index, offset, length) in memos_for_retrieval:
+            if current_index == index:
+                current_list.append((offset, length))
+            else:
+                if current_index is not None:
+                    request_lists.append((current_index, current_list))
+                current_index = index
+                current_list = [(offset, length)]
+        # handle the last entry
+        if current_index is not None:
+            request_lists.append((current_index, current_list))
+        for index, offsets in request_lists:
+            transport, path = self.indices[index]
+            reader = pack.make_readv_reader(transport, path, offsets)
+            for names, read_func in reader.iter_records():
+                yield read_func(None)
+
+    def open_file(self):
+        """Pack based knits have no single file."""
+        return None
+
+
+class _KnitData(object):
+    """Manage extraction of data from a KnitAccess, caching and decompressing."""
+
+    def __init__(self, access):
+        """Create a KnitData object.
+
+        :param access: The access method to use. Access methods such as
+            _KnitAccess manage the insertion of raw records and the subsequent
+            retrieval of the same.
+        """
+        self._access = access
         self._checked = False
         # TODO: jam 20060713 conceptually, this could spill to disk
         #       if the cached size gets larger than a certain amount
@@ -1680,12 +1846,6 @@
         #       a simple dictionary
         self._cache = {}
         self._do_cache = False
-        if create:
-            if delay_create:
-                self._need_to_create = create
-            else:
-                self._transport.put_bytes_non_atomic(self._filename, '',
-                                                     mode=self._file_mode)
 
     def enable_cache(self):
         """Enable caching of reads."""
@@ -1697,11 +1857,7 @@
         self._cache = {}
 
     def _open_file(self):
-        try:
-            return self._transport.get(self._filename)
-        except NoSuchFile:
-            pass
-        return None
+        return self._access.open_file()
 
     def _record_to_data(self, version_id, digest, lines):
         """Convert version_id, digest, lines into a raw data block.
@@ -1724,39 +1880,27 @@
         sio.seek(0)
         return length, sio
 
-    def add_raw_record(self, raw_data):
+    def add_raw_records(self, sizes, raw_data):
         """Append a prepared record to the data file.
         
-        :return: the offset in the data file raw_data was written.
+        :param sizes: An iterable containing the size of each raw data segment.
+        :param raw_data: A bytestring containing the data.
+        :return: a list of index data for the way the data was stored.
+            See the access method add_raw_records documentation for more
+            details.
         """
-        assert isinstance(raw_data, str), 'data must be plain bytes'
-        if not self._need_to_create:
-            return self._transport.append_bytes(self._filename, raw_data)
-        else:
-            self._transport.put_bytes_non_atomic(self._filename, raw_data,
-                                   create_parent_dir=self._create_parent_dir,
-                                   mode=self._file_mode,
-                                   dir_mode=self._dir_mode)
-            self._need_to_create = False
-            return 0
+        return self._access.add_raw_records(sizes, raw_data)
         
     def add_record(self, version_id, digest, lines):
-        """Write new text record to disk.  Returns the position in the
-        file where it was written."""
+        """Write new text record to disk. 
+        
+        Returns index data for retrieving it later, as per add_raw_records.
+        """
         size, sio = self._record_to_data(version_id, digest, lines)
-        # write to disk
-        if not self._need_to_create:
-            start_pos = self._transport.append_file(self._filename, sio)
-        else:
-            self._transport.put_file_non_atomic(self._filename, sio,
-                               create_parent_dir=self._create_parent_dir,
-                               mode=self._file_mode,
-                               dir_mode=self._dir_mode)
-            self._need_to_create = False
-            start_pos = 0
+        result = self.add_raw_records([size], sio.getvalue())
         if self._do_cache:
             self._cache[version_id] = sio.getvalue()
-        return start_pos, size
+        return result[0]
 
     def _parse_record_header(self, version_id, raw_data):
         """Parse a record header for consistency.
@@ -1768,7 +1912,7 @@
         try:
             rec = self._check_header(version_id, df.readline())
         except Exception, e:
-            raise KnitCorrupt(self._filename,
+            raise KnitCorrupt(self._access,
                               "While reading {%s} got %s(%s)"
                               % (version_id, e.__class__.__name__, str(e)))
         return df, rec
@@ -1776,10 +1920,10 @@
     def _check_header(self, version_id, line):
         rec = line.split()
         if len(rec) != 4:
-            raise KnitCorrupt(self._filename,
+            raise KnitCorrupt(self._access,
                               'unexpected number of elements in record header')
         if rec[1] != version_id:
-            raise KnitCorrupt(self._filename,
+            raise KnitCorrupt(self._access,
                               'unexpected version, wanted %r, got %r'
                               % (version_id, rec[1]))
         return rec
@@ -1794,7 +1938,7 @@
         try:
             record_contents = df.readlines()
         except Exception, e:
-            raise KnitCorrupt(self._filename,
+            raise KnitCorrupt(self._access,
                               "While reading {%s} got %s(%s)"
                               % (version_id, e.__class__.__name__, str(e)))
         header = record_contents.pop(0)
@@ -1802,13 +1946,13 @@
 
         last_line = record_contents.pop()
         if len(record_contents) != int(rec[2]):
-            raise KnitCorrupt(self._filename,
+            raise KnitCorrupt(self._access,
                               'incorrect number of lines %s != %s'
                               ' for version {%s}'
                               % (len(record_contents), int(rec[2]),
                                  version_id))
         if last_line != 'end %s\n' % rec[1]:
-            raise KnitCorrupt(self._filename,
+            raise KnitCorrupt(self._access,
                               'unexpected version end line %r, wanted %r' 
                               % (last_line, version_id))
         df.close()
@@ -1833,14 +1977,14 @@
                 needed_offsets = [(pos, size) for version_id, pos, size
                                                in records]
 
-            raw_records = self._transport.readv(self._filename, needed_offsets)
+            raw_records = self._access.get_raw_records(needed_offsets)
 
         for version_id, pos, size in records:
             if version_id in self._cache:
                 # This data has already been validated
                 data = self._cache[version_id]
             else:
-                pos, data = raw_records.next()
+                data = raw_records.next()
                 if self._do_cache:
                     self._cache[version_id] = data
 
@@ -1885,11 +2029,11 @@
 
         # The transport optimizes the fetching as well 
         # (ie, reads continuous ranges.)
-        readv_response = self._transport.readv(self._filename,
+        raw_data = self._access.get_raw_records(
             [(pos, size) for version_id, pos, size in needed_records])
 
-        for (version_id, pos, size), (pos, data) in \
-                izip(iter(needed_records), readv_response):
+        for (version_id, pos, size), data in \
+                izip(iter(needed_records), raw_data):
             content, digest = self._parse_record(version_id, data)
             if self._do_cache:
                 self._cache[version_id] = data

=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py	2007-08-01 08:00:22 +0000
+++ b/bzrlib/tests/test_knit.py	2007-08-02 06:26:54 +0000
@@ -26,6 +26,7 @@
     errors,
     generate_ids,
     knit,
+    pack,
     )
 from bzrlib.errors import (
     RevisionAlreadyPresent,
@@ -40,13 +41,20 @@
     KnitVersionedFile,
     KnitPlainFactory,
     KnitAnnotateFactory,
+    _KnitAccess,
     _KnitData,
     _KnitIndex,
+    _PackAccess,
     WeaveToKnit,
     KnitSequenceMatcher,
     )
 from bzrlib.osutils import split_lines
-from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
+from bzrlib.tests import (
+    Feature,
+    TestCase,
+    TestCaseWithMemoryTransport,
+    TestCaseWithTransport,
+    )
 from bzrlib.transport import TransportLogger, get_transport
 from bzrlib.transport.memory import MemoryTransport
 from bzrlib.weave import Weave
@@ -145,6 +153,76 @@
         return queue_call
 
 
+class KnitRecordAccessTestsMixin(object):
+    """Tests for getting and putting knit records."""
+
+    def assertAccessExists(self, access):
+        """Ensure the data area for access has been initialised/exists."""
+        raise NotImplementedError(self.assertAccessExists)
+
+    def test_add_raw_records(self):
+        """Add_raw_records adds records retrievable later."""
+        access = self.get_access()
+        memos = access.add_raw_records([10], '1234567890')
+        self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
+    
+    def test_create(self):
+        """create() should make a file on disk."""
+        access = self.get_access()
+        access.create()
+        self.assertAccessExists(access)
+
+    def test_open_file(self):
+        """open_file never errors."""
+        access = self.get_access()
+        access.open_file()
+
+# what is the key interface elements - what code do I want to write:
+# insertion:
+# here is a raw record. please write it somewhere and return the readv I should
+# make to get it back.
+# here are many records -> returns many readvs
+# here is a readv I was given earlier, please return the raw data
+
+
+class TestKnitKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
+    """Tests for the .kndx implementation."""
+
+    def assertAccessExists(self, access):
+        self.assertNotEqual(None, access.open_file())
+
+    def get_access(self):
+        """Get a .knit style access instance."""
+        access = _KnitAccess(self.get_transport(), "foo.knit", None, None,
+            False, False)
+        return access
+    
+
+class TestPackKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
+    """Tests for the pack based access."""
+
+    def assertAccessExists(self, access):
+        # as pack based access has no backing unless an index maps data, this
+        # is a no-op.
+        pass
+
+    def get_access(self):
+        transport = self.get_transport()
+        def write_data(bytes):
+            transport.append_bytes('packfile', bytes)
+        writer = pack.ContainerWriter(write_data)
+        writer.begin()
+        index = "FOO"
+        indices = {"FOO":(transport, 'packfile')}
+        access = _PackAccess(indices, writer=(writer, 'FOO'))
+        return access
+
+# missing tests:
+# - add several records
+# - read from several packs
+# - add data readonly?
+
+
 class LowLevelKnitDataTests(TestCase):
 
     def create_gz_content(self, text):
@@ -162,7 +240,8 @@
                                         'end rev-id-1\n'
                                         % (sha1sum,))
         transport = MockTransport([gz_txt])
-        data = _KnitData(transport, 'filename', mode='r')
+        access = _KnitAccess(transport, 'filename', None, None, False, False)
+        data = _KnitData(access=access)
         records = [('rev-id-1', 0, len(gz_txt))]
 
         contents = data.read_records(records)
@@ -179,7 +258,8 @@
                                         'end rev-id-1\n'
                                         % (sha1sum,))
         transport = MockTransport([gz_txt])
-        data = _KnitData(transport, 'filename', mode='r')
+        access = _KnitAccess(transport, 'filename', None, None, False, False)
+        data = _KnitData(access=access)
         records = [('rev-id-1', 0, len(gz_txt))]
         self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
@@ -196,7 +276,8 @@
                                         'end rev-id-1\n'
                                         % (sha1sum,))
         transport = MockTransport([gz_txt])
-        data = _KnitData(transport, 'filename', mode='r')
+        access = _KnitAccess(transport, 'filename', None, None, False, False)
+        data = _KnitData(access=access)
         records = [('rev-id-1', 0, len(gz_txt))]
         self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
@@ -212,7 +293,8 @@
                                         'end rev-id-1\n'
                                         % (sha1sum,))
         transport = MockTransport([gz_txt])
-        data = _KnitData(transport, 'filename', mode='r')
+        access = _KnitAccess(transport, 'filename', None, None, False, False)
+        data = _KnitData(access=access)
         # We are asking for rev-id-2, but the data is rev-id-1
         records = [('rev-id-2', 0, len(gz_txt))]
         self.assertRaises(errors.KnitCorrupt, data.read_records, records)
@@ -229,7 +311,8 @@
                'end rev-id-1\n'
                % (sha1sum,))
         transport = MockTransport([txt])
-        data = _KnitData(transport, 'filename', mode='r')
+        access = _KnitAccess(transport, 'filename', None, None, False, False)
+        data = _KnitData(access=access)
         records = [('rev-id-1', 0, len(txt))]
 
         # We don't have valid gzip data ==> corrupt
@@ -249,7 +332,8 @@
         # Change 2 bytes in the middle to \xff
         gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
         transport = MockTransport([gz_txt])
-        data = _KnitData(transport, 'filename', mode='r')
+        access = _KnitAccess(transport, 'filename', None, None, False, False)
+        data = _KnitData(access=access)
         records = [('rev-id-1', 0, len(gz_txt))]
 
         self.assertRaises(errors.KnitCorrupt, data.read_records, records)



More information about the bazaar-commits mailing list