Rev 2692: Allow adaption of KnitData to pack files. in http://people.ubuntu.com/~robertc/baz2.0/repository
Robert Collins
robertc at robertcollins.net
Thu Aug 2 07:26:57 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/repository
------------------------------------------------------------
revno: 2692
revision-id: robertc at robertcollins.net-20070802062654-klu22dv5vnelnk4m
parent: robertc at robertcollins.net-20070802041827-sllv9yvivttnw3az
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Thu 2007-08-02 16:26:54 +1000
message:
Allow adaption of KnitData to pack files.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py 2007-08-01 08:00:22 +0000
+++ b/bzrlib/knit.py 2007-08-02 06:26:54 +0000
@@ -70,6 +70,12 @@
import warnings
import bzrlib
+from bzrlib.lazy_import import lazy_import
+lazy_import(globals(), """
+from bzrlib import (
+ pack,
+ )
+""")
from bzrlib import (
cache_utf8,
errors,
@@ -339,7 +345,6 @@
def make_empty_knit(transport, relpath):
"""Construct a empty knit at the specified location."""
k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
- k._data._open_file()
class KnitVersionedFile(VersionedFile):
@@ -394,10 +399,11 @@
dir_mode=dir_mode)
else:
self._index = index
- self._data = _KnitData(transport, relpath + DATA_SUFFIX,
- access_mode, create=create and not len(self), file_mode=file_mode,
- create_parent_dir=create_parent_dir, delay_create=delay_create,
- dir_mode=dir_mode)
+ _access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
+ ((create and not len(self)) and delay_create), create_parent_dir)
+ if create and not len(self) and not delay_create:
+ _access.create()
+ self._data = _KnitData(_access)
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__,
@@ -496,12 +502,13 @@
the preceding records sizes.
"""
# write all the data
- pos = self._data.add_raw_record(data)
+ raw_record_sizes = [record[3] for record in records]
+ positions = self._data.add_raw_records(raw_record_sizes, data)
offset = 0
index_entries = []
- for (version_id, options, parents, size) in records:
- index_entries.append((version_id, options, pos+offset,
- size, parents))
+ for (version_id, options, parents, size), (pos, length) in zip(
+ records, positions):
+ index_entries.append((version_id, options, pos, size, parents))
if self._data._do_cache:
self._data._cache[version_id] = data[offset:offset+size]
offset += size
@@ -1073,9 +1080,6 @@
raise KnitHeaderError(badline=line,
filename=self._transport.abspath(self._filename))
- def commit(self):
- """Commit is a nop."""
-
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._filename)
@@ -1661,18 +1665,180 @@
def _version_ids_to_keys(self, version_ids):
return set((version_id, ) for version_id in version_ids)
-
-
-class _KnitData(_KnitComponentFile):
- """Contents of the knit data file"""
-
- def __init__(self, transport, filename, mode, create=False, file_mode=None,
- create_parent_dir=False, delay_create=False,
- dir_mode=None):
- _KnitComponentFile.__init__(self, transport, filename, mode,
- file_mode=file_mode,
- create_parent_dir=create_parent_dir,
- dir_mode=dir_mode)
+
+
+class _KnitAccess(object):
+ """Access to knit records in a .knit file."""
+
+ def __init__(self, transport, filename, _file_mode, _dir_mode,
+ _need_to_create, _create_parent_dir):
+ """Create a _KnitAccess for accessing and inserting data.
+
+ :param transport: The transport the .knit is located on.
+ :param filename: The filename of the .knit.
+ """
+ self._transport = transport
+ self._filename = filename
+ self._file_mode = _file_mode
+ self._dir_mode = _dir_mode
+ self._need_to_create = _need_to_create
+ self._create_parent_dir = _create_parent_dir
+
+ def add_raw_records(self, sizes, raw_data):
+ """Add raw knit bytes to a storage area.
+
+ The data is spooled to whereever the access method is storing data.
+
+ :param sizes: An iterable containing the size of each raw data segment.
+ :param raw_data: A bytestring containing the data.
+ :return: A list of memos to retrieve the record later. For the .knit access
+ method these are readv pairs - offset, length. Note that this is
+ matched to a particular index engine, so can vary between
+ access methods.
+ """
+ assert type(raw_data) == str, \
+ 'data must be plain bytes was %s' % type(raw_data)
+ if not self._need_to_create:
+ base = self._transport.append_bytes(self._filename, raw_data)
+ else:
+ self._transport.put_bytes_non_atomic(self._filename, raw_data,
+ create_parent_dir=self._create_parent_dir,
+ mode=self._file_mode,
+ dir_mode=self._dir_mode)
+ self._need_to_create = False
+ base = 0
+ result = []
+ for size in sizes:
+ result.append((base, size))
+ base += size
+ return result
+
+ def create(self):
+ """IFF this data access has its own storage area, initialise it.
+
+ :return: None.
+ """
+ self._transport.put_bytes_non_atomic(self._filename, '',
+ mode=self._file_mode)
+
+ def open_file(self):
+ """IFF this data access can be represented as a single file, open it.
+
+ For knits that are not mapped to a single file on disk this will
+ always return None.
+
+ :return: None or a file handle.
+ """
+ try:
+ return self._transport.get(self._filename)
+ except NoSuchFile:
+ pass
+ return None
+
+ def get_raw_records(self, memos_for_retrieval):
+ """Get the raw bytes for a records.
+
+ :param memos_for_retrieval: An iterable containing the access method
+ specific memo for retriving the bytes. For the .knit method this is
+ a readv tuple.
+ :return: An iterator over the bytes of the records.
+ """
+ for pos, data in self._transport.readv(self._filename, memos_for_retrieval):
+ yield data
+
+
+class _PackAccess(object):
+ """Access to knit records via a collection of packs."""
+
+ def __init__(self, index_to_packs, writer=None):
+ """Create a _PackAccess object.
+
+ :param index_to_packs: A dict mapping index objects to the transport
+ and file names for obtaining data.
+ :param writer: A tuple (pack.ContainerWriter, write_index) which
+ is contains the pack to write, and the index that reads from
+ it will be associated with.
+ """
+ if writer:
+ self.container_writer = writer[0]
+ self.write_index = writer[1]
+ else:
+ self.container_writer = None
+ self.write_index = None
+ self.indices = index_to_packs
+
+ def add_raw_records(self, sizes, raw_data):
+ """Add raw knit bytes to a storage area.
+
+ The data is spooled to the container writer in one bytes record per
+ raw data item.
+
+ :param sizes: An iterable containing the size of each raw data segment.
+ :param raw_data: A bytestring containing the data.
+ :return: A list of memos to retrieve the record later. For the pack
+ access method these are the pack offset, lenth pairs with a
+ pack key of the write index.
+ Note that this is matched to a particular index engine, so can vary
+ between access methods.
+ """
+ assert type(raw_data) == str, \
+ 'data must be plain bytes was %s' % type(raw_data)
+ result = []
+ offset = 0
+ for size in sizes:
+ p_offset, p_length = self.container_writer.add_bytes_record(
+ raw_data[offset:offset+size], [])
+ offset += size
+ result.append((self.write_index, p_offset, p_length))
+ return result
+
+ def create(self):
+ """Pack based knits do not get individually created."""
+
+ def get_raw_records(self, memos_for_retrieval):
+ """Get the raw bytes for a records.
+
+ :param memos_for_retrieval: An iterable containing the access method
+ specific memo for retriving the bytes. For the Pack access method
+ this is a tuple (index, offset, length).
+ :return: An iterator over the bytes of the records.
+ """
+ # first pass, group into same-index requests
+ request_lists = []
+ current_index = None
+ for (index, offset, length) in memos_for_retrieval:
+ if current_index == index:
+ current_list.append((offset, length))
+ else:
+ if current_index is not None:
+ request_lists.append((current_index, current_list))
+ current_index = index
+ current_list = [(offset, length)]
+ # handle the last entry
+ if current_index is not None:
+ request_lists.append((current_index, current_list))
+ for index, offsets in request_lists:
+ transport, path = self.indices[index]
+ reader = pack.make_readv_reader(transport, path, offsets)
+ for names, read_func in reader.iter_records():
+ yield read_func(None)
+
+ def open_file(self):
+ """Pack based knits have no single file."""
+ return None
+
+
+class _KnitData(object):
+ """Manage extraction of data from a KnitAccess, caching and decompressing."""
+
+ def __init__(self, access):
+ """Create a KnitData object.
+
+ :param access: The access method to use. Access methods such as
+ _KnitAccess manage the insertion of raw records and the subsequent
+ retrieval of the same.
+ """
+ self._access = access
self._checked = False
# TODO: jam 20060713 conceptually, this could spill to disk
# if the cached size gets larger than a certain amount
@@ -1680,12 +1846,6 @@
# a simple dictionary
self._cache = {}
self._do_cache = False
- if create:
- if delay_create:
- self._need_to_create = create
- else:
- self._transport.put_bytes_non_atomic(self._filename, '',
- mode=self._file_mode)
def enable_cache(self):
"""Enable caching of reads."""
@@ -1697,11 +1857,7 @@
self._cache = {}
def _open_file(self):
- try:
- return self._transport.get(self._filename)
- except NoSuchFile:
- pass
- return None
+ return self._access.open_file()
def _record_to_data(self, version_id, digest, lines):
"""Convert version_id, digest, lines into a raw data block.
@@ -1724,39 +1880,27 @@
sio.seek(0)
return length, sio
- def add_raw_record(self, raw_data):
+ def add_raw_records(self, sizes, raw_data):
"""Append a prepared record to the data file.
- :return: the offset in the data file raw_data was written.
+ :param sizes: An iterable containing the size of each raw data segment.
+ :param raw_data: A bytestring containing the data.
+ :return: a list of index data for the way the data was stored.
+ See the access method add_raw_records documentation for more
+ details.
"""
- assert isinstance(raw_data, str), 'data must be plain bytes'
- if not self._need_to_create:
- return self._transport.append_bytes(self._filename, raw_data)
- else:
- self._transport.put_bytes_non_atomic(self._filename, raw_data,
- create_parent_dir=self._create_parent_dir,
- mode=self._file_mode,
- dir_mode=self._dir_mode)
- self._need_to_create = False
- return 0
+ return self._access.add_raw_records(sizes, raw_data)
def add_record(self, version_id, digest, lines):
- """Write new text record to disk. Returns the position in the
- file where it was written."""
+ """Write new text record to disk.
+
+ Returns index data for retrieving it later, as per add_raw_records.
+ """
size, sio = self._record_to_data(version_id, digest, lines)
- # write to disk
- if not self._need_to_create:
- start_pos = self._transport.append_file(self._filename, sio)
- else:
- self._transport.put_file_non_atomic(self._filename, sio,
- create_parent_dir=self._create_parent_dir,
- mode=self._file_mode,
- dir_mode=self._dir_mode)
- self._need_to_create = False
- start_pos = 0
+ result = self.add_raw_records([size], sio.getvalue())
if self._do_cache:
self._cache[version_id] = sio.getvalue()
- return start_pos, size
+ return result[0]
def _parse_record_header(self, version_id, raw_data):
"""Parse a record header for consistency.
@@ -1768,7 +1912,7 @@
try:
rec = self._check_header(version_id, df.readline())
except Exception, e:
- raise KnitCorrupt(self._filename,
+ raise KnitCorrupt(self._access,
"While reading {%s} got %s(%s)"
% (version_id, e.__class__.__name__, str(e)))
return df, rec
@@ -1776,10 +1920,10 @@
def _check_header(self, version_id, line):
rec = line.split()
if len(rec) != 4:
- raise KnitCorrupt(self._filename,
+ raise KnitCorrupt(self._access,
'unexpected number of elements in record header')
if rec[1] != version_id:
- raise KnitCorrupt(self._filename,
+ raise KnitCorrupt(self._access,
'unexpected version, wanted %r, got %r'
% (version_id, rec[1]))
return rec
@@ -1794,7 +1938,7 @@
try:
record_contents = df.readlines()
except Exception, e:
- raise KnitCorrupt(self._filename,
+ raise KnitCorrupt(self._access,
"While reading {%s} got %s(%s)"
% (version_id, e.__class__.__name__, str(e)))
header = record_contents.pop(0)
@@ -1802,13 +1946,13 @@
last_line = record_contents.pop()
if len(record_contents) != int(rec[2]):
- raise KnitCorrupt(self._filename,
+ raise KnitCorrupt(self._access,
'incorrect number of lines %s != %s'
' for version {%s}'
% (len(record_contents), int(rec[2]),
version_id))
if last_line != 'end %s\n' % rec[1]:
- raise KnitCorrupt(self._filename,
+ raise KnitCorrupt(self._access,
'unexpected version end line %r, wanted %r'
% (last_line, version_id))
df.close()
@@ -1833,14 +1977,14 @@
needed_offsets = [(pos, size) for version_id, pos, size
in records]
- raw_records = self._transport.readv(self._filename, needed_offsets)
+ raw_records = self._access.get_raw_records(needed_offsets)
for version_id, pos, size in records:
if version_id in self._cache:
# This data has already been validated
data = self._cache[version_id]
else:
- pos, data = raw_records.next()
+ data = raw_records.next()
if self._do_cache:
self._cache[version_id] = data
@@ -1885,11 +2029,11 @@
# The transport optimizes the fetching as well
# (ie, reads continuous ranges.)
- readv_response = self._transport.readv(self._filename,
+ raw_data = self._access.get_raw_records(
[(pos, size) for version_id, pos, size in needed_records])
- for (version_id, pos, size), (pos, data) in \
- izip(iter(needed_records), readv_response):
+ for (version_id, pos, size), data in \
+ izip(iter(needed_records), raw_data):
content, digest = self._parse_record(version_id, data)
if self._do_cache:
self._cache[version_id] = data
=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py 2007-08-01 08:00:22 +0000
+++ b/bzrlib/tests/test_knit.py 2007-08-02 06:26:54 +0000
@@ -26,6 +26,7 @@
errors,
generate_ids,
knit,
+ pack,
)
from bzrlib.errors import (
RevisionAlreadyPresent,
@@ -40,13 +41,20 @@
KnitVersionedFile,
KnitPlainFactory,
KnitAnnotateFactory,
+ _KnitAccess,
_KnitData,
_KnitIndex,
+ _PackAccess,
WeaveToKnit,
KnitSequenceMatcher,
)
from bzrlib.osutils import split_lines
-from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
+from bzrlib.tests import (
+ Feature,
+ TestCase,
+ TestCaseWithMemoryTransport,
+ TestCaseWithTransport,
+ )
from bzrlib.transport import TransportLogger, get_transport
from bzrlib.transport.memory import MemoryTransport
from bzrlib.weave import Weave
@@ -145,6 +153,76 @@
return queue_call
+class KnitRecordAccessTestsMixin(object):
+ """Tests for getting and putting knit records."""
+
+ def assertAccessExists(self, access):
+ """Ensure the data area for access has been initialised/exists."""
+ raise NotImplementedError(self.assertAccessExists)
+
+ def test_add_raw_records(self):
+ """Add_raw_records adds records retrievable later."""
+ access = self.get_access()
+ memos = access.add_raw_records([10], '1234567890')
+ self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
+
+ def test_create(self):
+ """create() should make a file on disk."""
+ access = self.get_access()
+ access.create()
+ self.assertAccessExists(access)
+
+ def test_open_file(self):
+ """open_file never errors."""
+ access = self.get_access()
+ access.open_file()
+
+# what is the key interface elements - what code do I want to write:
+# insertion:
+# here is a raw record. please write it somewhere and return the readv I should
+# make to get it back.
+# here are many records -> returns many readvs
+# here is a readv I was given earlier, please return the raw data
+
+
+class TestKnitKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
+ """Tests for the .kndx implementation."""
+
+ def assertAccessExists(self, access):
+ self.assertNotEqual(None, access.open_file())
+
+ def get_access(self):
+ """Get a .knit style access instance."""
+ access = _KnitAccess(self.get_transport(), "foo.knit", None, None,
+ False, False)
+ return access
+
+
+class TestPackKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
+ """Tests for the pack based access."""
+
+ def assertAccessExists(self, access):
+ # as pack based access has no backing unless an index maps data, this
+ # is a no-op.
+ pass
+
+ def get_access(self):
+ transport = self.get_transport()
+ def write_data(bytes):
+ transport.append_bytes('packfile', bytes)
+ writer = pack.ContainerWriter(write_data)
+ writer.begin()
+ index = "FOO"
+ indices = {"FOO":(transport, 'packfile')}
+ access = _PackAccess(indices, writer=(writer, 'FOO'))
+ return access
+
+# missing tests:
+# - add several records
+# - read from several packs
+# - add data readonly?
+
+
class LowLevelKnitDataTests(TestCase):
def create_gz_content(self, text):
@@ -162,7 +240,8 @@
'end rev-id-1\n'
% (sha1sum,))
transport = MockTransport([gz_txt])
- data = _KnitData(transport, 'filename', mode='r')
+ access = _KnitAccess(transport, 'filename', None, None, False, False)
+ data = _KnitData(access=access)
records = [('rev-id-1', 0, len(gz_txt))]
contents = data.read_records(records)
@@ -179,7 +258,8 @@
'end rev-id-1\n'
% (sha1sum,))
transport = MockTransport([gz_txt])
- data = _KnitData(transport, 'filename', mode='r')
+ access = _KnitAccess(transport, 'filename', None, None, False, False)
+ data = _KnitData(access=access)
records = [('rev-id-1', 0, len(gz_txt))]
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
@@ -196,7 +276,8 @@
'end rev-id-1\n'
% (sha1sum,))
transport = MockTransport([gz_txt])
- data = _KnitData(transport, 'filename', mode='r')
+ access = _KnitAccess(transport, 'filename', None, None, False, False)
+ data = _KnitData(access=access)
records = [('rev-id-1', 0, len(gz_txt))]
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
@@ -212,7 +293,8 @@
'end rev-id-1\n'
% (sha1sum,))
transport = MockTransport([gz_txt])
- data = _KnitData(transport, 'filename', mode='r')
+ access = _KnitAccess(transport, 'filename', None, None, False, False)
+ data = _KnitData(access=access)
# We are asking for rev-id-2, but the data is rev-id-1
records = [('rev-id-2', 0, len(gz_txt))]
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
@@ -229,7 +311,8 @@
'end rev-id-1\n'
% (sha1sum,))
transport = MockTransport([txt])
- data = _KnitData(transport, 'filename', mode='r')
+ access = _KnitAccess(transport, 'filename', None, None, False, False)
+ data = _KnitData(access=access)
records = [('rev-id-1', 0, len(txt))]
# We don't have valid gzip data ==> corrupt
@@ -249,7 +332,8 @@
# Change 2 bytes in the middle to \xff
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
transport = MockTransport([gz_txt])
- data = _KnitData(transport, 'filename', mode='r')
+ access = _KnitAccess(transport, 'filename', None, None, False, False)
+ data = _KnitData(access=access)
records = [('rev-id-1', 0, len(gz_txt))]
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
More information about the bazaar-commits
mailing list