Rev 3911: Factor out tests that rely on the exact bytecode. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/vilajam
John Arbash Meinel
john at arbash-meinel.com
Wed Mar 25 20:24:27 GMT 2009
At http://bzr.arbash-meinel.com/branches/bzr/brisbane/vilajam
------------------------------------------------------------
revno: 3911
revision-id: john at arbash-meinel.com-20090325202406-d3na661pqwtw75dx
parent: john at arbash-meinel.com-20090325193113-7crd62vmi7ryobh5
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: vilajam
timestamp: Wed 2009-03-25 15:24:06 -0500
message:
Factor out tests that rely on the exact bytecode.
The Pyrex compressor gives slightly different encoding than the
Python compressor, because it has a better diff engine.
-------------- next part --------------
=== modified file 'bzrlib/_groupcompress_py.py'
--- a/bzrlib/_groupcompress_py.py 2009-03-25 17:20:33 +0000
+++ b/bzrlib/_groupcompress_py.py 2009-03-25 20:24:06 +0000
@@ -44,6 +44,7 @@
def _update_matching_lines(self, new_lines, index):
matches = self._matching_lines
start_idx = len(self.lines)
+ assert len(new_lines) == len(index)
for idx, do_index in enumerate(index):
if not do_index:
continue
@@ -124,30 +125,6 @@
return ((min(copy_ends) - range_len, range_start, range_len)), pos, locations
-def parse(line_list):
- result = []
- lines = iter(line_list)
- next = lines.next
- label_line = next()
- sha1_line = next()
- if (not label_line.startswith('label: ') or
- not sha1_line.startswith('sha1: ')):
- raise AssertionError("bad text record %r" % lines)
- label = tuple(label_line[7:-1].split('\x00'))
- sha1 = sha1_line[6:-1]
- for header in lines:
- op = header[0]
- numbers = header[2:]
- numbers = [int(n) for n in header[2:].split(',')]
- if op == 'c':
- result.append((op, numbers[0], numbers[1], None))
- else:
- contents = [next() for i in xrange(numbers[0])]
- result.append((op, None, numbers[0], contents))
- ## return result
- return label, sha1, result
-
-
def apply_delta(basis, delta):
"""Apply delta to this object to become new_version_id."""
lines = []
@@ -159,15 +136,7 @@
lines.append(basis[start:start+count])
else:
lines.extend(delta_lines)
- trim_encoding_newline(lines)
return lines
-def trim_encoding_newline(lines):
- if lines[-1] == '\n':
- del lines[-1]
- else:
- lines[-1] = lines[-1][:-1]
-
-
### ^ imported from gc plugin at revno30
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2009-03-25 19:31:13 +0000
+++ b/bzrlib/groupcompress.py 2009-03-25 20:24:06 +0000
@@ -767,9 +767,65 @@
def __init__(self):
"""Create a GroupCompressor."""
self.lines = []
+ self._last = None
self.endpoint = 0
self.input_bytes = 0
self.labels_deltas = {}
+ self._block = GroupCompressBlock()
+
+ def extract(self, key):
+ """Extract a key previously added to the compressor.
+
+ :param key: The key to extract.
+ :return: An iterable over bytes and the sha1.
+ """
+ delta_details = self.labels_deltas[key]
+ delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
+ stored_bytes = ''.join(delta_chunks)
+ # TODO: Fix this, we shouldn't really be peeking here
+ entry = self._block._entries[key]
+ if entry.type == 'fulltext':
+ if stored_bytes[0] != 'f':
+ raise ValueError('Index claimed fulltext, but stored bytes'
+ ' indicate %s' % (stored_bytes[0],))
+ fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
+ if fulltext_len + 1 + offset != len(stored_bytes):
+ raise ValueError('Index claimed fulltext len, but stored bytes'
+ ' claim %s != %s'
+ % (len(stored_bytes),
+ fulltext_len + 1 + offset))
+ bytes = stored_bytes[offset + 1:]
+ else:
+ if entry.type != 'delta':
+ raise ValueError('Unknown entry type: %s' % (entry.type,))
+ # XXX: This is inefficient at best
+ source = ''.join(self.lines)
+ if stored_bytes[0] != 'd':
+ raise ValueError('Entry type claims delta, bytes claim %s'
+ % (stored_bytes[0],))
+ delta_len, offset = decode_base128_int(stored_bytes[1:10])
+ if delta_len + 1 + offset != len(stored_bytes):
+ raise ValueError('Index claimed delta len, but stored bytes'
+ ' claim %s != %s'
+ % (len(stored_bytes),
+ delta_len + 1 + offset))
+ bytes = apply_delta(source, stored_bytes[offset + 1:])
+ bytes_sha1 = osutils.sha_string(bytes)
+ if entry.sha1 != bytes_sha1:
+ raise ValueError('Recorded sha1 != measured %s != %s'
+ % (entry.sha1, bytes_sha1))
+ return bytes, entry.sha1
+
+ def pop_last(self):
+ """Call this if you want to 'revoke' the last compression.
+
+ After this, the data structures will be rolled back, but you cannot do
+ more compression.
+ """
+ self._delta_index = None
+ del self.lines[self._last[0]:]
+ self.endpoint = self._last[1]
+ self._last = None
def ratio(self):
"""Return the overall compression ratio."""
@@ -848,52 +904,65 @@
the group output so far.
:seealso VersionedFiles.add_lines:
"""
+ if not bytes: # empty, like a dir entry, etc
+ if nostore_sha == _null_sha1:
+ raise errors.ExistingContent()
+ self._block.add_entry(key, type='empty',
+ sha1=None, start=0,
+ length=0)
+ return _null_sha1, 0, 0, 'fulltext', 0
+ bytes_length = len(bytes)
new_lines = osutils.split_lines(bytes)
sha1 = osutils.sha_string(bytes)
+ if sha1 == nostore_sha:
+ raise errors.ExistingContent()
if key[-1] is None:
key = key[:-1] + ('sha1:' + sha1,)
- out_lines = []
- index_lines = []
+ # reserved for content type, content length, source_len, target_len
+ out_lines = ['', '', '', '']
+ index_lines = [False, False, False, False]
blocks = self.get_matching_blocks(new_lines, soft=soft)
current_line_num = 0
# We either copy a range (while there are reusable lines) or we
# insert new lines. To find reusable lines we traverse
for old_start, new_start, range_len in blocks:
if new_start != current_line_num:
- # non-matching region
- self.flush_insert(current_line_num, new_start,
- new_lines, out_lines, index_lines)
+ # non-matching region, insert the content
+ self._flush_insert(current_line_num, new_start,
+ new_lines, out_lines, index_lines)
current_line_num = new_start + range_len
- if not range_len:
- continue
- self._flush_copy(old_start, range_len,
- new_lines, out_lines, index_lines)
+ if range_len:
+ self._flush_copy(old_start, range_len, out_lines, index_lines)
+ delta_length = sum(map(len, out_lines))
+ if delta_length * 2 > bytes_length:
+ # The delta is longer than the fulltext, insert a fulltext
+ type = 'fulltext'
+ out_lines = ['f', encode_base128_int(bytes_length)]
+ out_lines.extend(new_lines)
+ index_lines = [False, False]
+ index_lines.extend([True] * len(new_lines))
+ out_length = len(out_lines[1]) + bytes_length + 1
+ else:
+ # this is a worthy delta, output it
+ type = 'delta'
+ out_lines[0] = 'd'
+ # A delta record includes the source and target lengths
+ out_lines[2] = encode_base128_int(self.endpoint)
+ out_lines[3] = encode_base128_int(bytes_length)
+ # Update the delta_length to include those two encoded integers
+ delta_length += len(out_lines[2]) + len(out_lines[3])
+ out_lines[1] = encode_base128_int(delta_length)
+ out_length = len(out_lines[3]) + 1 + delta_length
+ self._block.add_entry(key, type=type, sha1=sha1,
+ start=self.endpoint, length=out_length)
start = self.endpoint # Keep it
delta_start = (self.endpoint, len(self.lines))
self.output_lines(out_lines, index_lines)
- length = len(bytes)
- self.input_bytes += length
+ self.input_bytes += bytes_length
delta_end = (self.endpoint, len(self.lines))
self.labels_deltas[key] = (delta_start, delta_end)
# FIXME: lot of guessing below
- return sha1, start, self.endpoint, 'delta', length
-
- def extract(self, key):
- """Extract a key previously added to the compressor.
-
- :param key: The key to extract.
- :return: An iterable over bytes and the sha1.
- """
- delta_details = self.labels_deltas[key]
- delta_lines = self.lines[delta_details[0][1]:delta_details[1][1]]
- label, sha1, delta = parse(delta_lines)
- ## delta = parse(delta_lines)
- if label != key:
- raise AssertionError("wrong key: %r, wanted %r" % (label, key))
- # Perhaps we want to keep the line offsets too in memory at least?
- chunks = apply_delta(''.join(self.lines), delta)
- sha1 = osutils.sha_strings(chunks)
- return chunks, sha1
+ return sha1, start, self.endpoint, 'delta', out_length
def _flush_insert(self, start_linenum, end_linenum,
new_lines, out_lines, index_lines):
@@ -923,45 +992,14 @@
# code, we will also limit it to a 64kB copy
for start_byte in xrange(first_byte, stop_byte, 64*1024):
num_bytes = min(64*1024, stop_byte - first_byte)
- copy_command, copy_bytes = encode_copy_instruction(start_byte,
- num_bytes)
-
- def flush_range(self, new_line_start, source_line_start, match_num_lines,
- new_lines, out_lines, index_lines):
- """Insert the control codes for this copy & insert instruction.
-
- :param range_start:
- """
- if copy_start is not None:
- # range stops, flush and start a new copy range
- stop_byte = self.line_offsets[copy_start + range_len - 1]
- if copy_start == 0:
- start_byte = 0
- else:
- start_byte = self.line_offsets[copy_start - 1]
- bytes = stop_byte - start_byte
- copy_byte = 0
- copy_control_instruction =0
- new_lines.append(copy_control_instruction)
+ copy_bytes = encode_copy_instruction(start_byte, num_bytes)
+ out_lines.append(copy_bytes)
index_lines.append(False)
- return
- # not copying, or inserting is shorter than copying, so insert.
- new_lines.append(insert_instruction)
- new_lines.extend(lines[range_start:range_start+range_len])
- index_lines.append(False)
- index_lines.extend([copy_start is None]*range_len)
def flush(self):
# FIXME: ugly hack to masquerade ourself as the pyrex version
- class content(object):
-
- def __init__(self, s):
- self.s = s
-
- def to_bytes(self):
- return self.s
-
- return content(zlib.compress(''.join(self.lines)))
+ self._block.set_content(''.join(self.lines))
+ return self._block
def output_lines(self, new_lines, index_lines):
"""Output some lines.
@@ -974,6 +1012,7 @@
# if val and new_lines[idx] == '\n']
# if indexed_newlines:
# import pdb; pdb.set_trace()
+ self._last = (len(self.lines), self.endpoint)
endpoint = self.endpoint
self.line_locations.extend_lines(new_lines, index_lines)
for line in new_lines:
@@ -999,11 +1038,9 @@
"""
def __init__(self):
- super(PythonGroupCompressor, self).__init__()
+ super(PyrexGroupCompressor, self).__init__()
self.num_keys = 0
- self._last = None
self._delta_index = DeltaIndex()
- self._block = GroupCompressBlock()
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
"""Compress lines with label key.
@@ -1150,17 +1187,6 @@
endpoint += sum(map(len, new_chunks))
self.endpoint = endpoint
- def pop_last(self):
- """Call this if you want to 'revoke' the last compression.
-
- After this, the data structures will be rolled back, but you cannot do
- more compression.
- """
- self._delta_index = None
- del self.lines[self._last[0]:]
- self.endpoint = self._last[1]
- self._last = None
-
def make_pack_factory(graph, delta, keylength):
"""Create a factory for creating a pack based groupcompress.
@@ -2002,6 +2028,11 @@
return node[0], start, stop, basis_end, delta_end
+from bzrlib._groupcompress_py import (
+ apply_delta,
+ EquivalenceTable,
+ _get_longest_match,
+ )
try:
from bzrlib._groupcompress_pyx import (
apply_delta,
@@ -2009,11 +2040,5 @@
)
GroupCompressor = PyrexGroupCompressor
except ImportError:
- from bzrlib._groupcompress_py import (
- apply_delta,
- EquivalenceTable,
- _get_longest_match,
- trim_encoding_newline,
- )
GroupCompressor = PythonGroupCompressor
=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py 2009-03-25 19:31:13 +0000
+++ b/bzrlib/tests/test_groupcompress.py 2009-03-25 20:24:06 +0000
@@ -32,28 +32,74 @@
)
+def load_tests(standard_tests, module, loader):
+ """Parameterize tests for view-aware vs not."""
+ to_adapt, result = tests.split_suite_by_condition(
+ standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
+ scenarios = [
+ ('python', {'compressor': groupcompress.PythonGroupCompressor}),
+ ]
+ if CompiledGroupcompressFeature.available():
+ scenarios.append(('C',
+ {'compressor': groupcompress.PyrexGroupCompressor}))
+ return multiply_tests(to_adapt, scenarios, result)
+
+
+class _CompiledGroupcompressFeature(tests.Feature):
+
+ def _probe(self):
+ try:
+ import bzrlib._groupcompress_pyx
+ except ImportError:
+ return False
+ return True
+
+ def feature_name(self):
+ return "bzrlib._groupcompress_pyx"
+
+CompiledGroupcompressFeature = _CompiledGroupcompressFeature()
+
+
class TestGroupCompressor(tests.TestCase):
+
+ def _chunks_to_repr_lines(self, chunks):
+ return '\n'.join(map(repr, ''.join(chunks).split('\n')))
+
+ def assertEqualDiffEncoded(self, expected, actual):
+ """Compare the actual content to the expected content.
+
+ :param expected: A group of chunks that we expect to see
+ :param actual: The measured 'chunks'
+
+ We will transform the chunks back into lines, and then run 'repr()'
+ over them to handle non-ascii characters.
+ """
+ self.assertEqualDiff(self._chunks_to_repr_lines(expected),
+ self._chunks_to_repr_lines(actual))
+
+
+class TestAllGroupCompressors(TestGroupCompressor):
"""Tests for GroupCompressor"""
+ compressor = None # Set by multiply_tests
+
def test_empty_delta(self):
- compressor = groupcompress.GroupCompressor()
+ compressor = self.compressor()
self.assertEqual([], compressor.lines)
def test_one_nosha_delta(self):
# diff against NUKK
- compressor = groupcompress.GroupCompressor()
+ compressor = self.compressor()
sha1, start_point, end_point, _, _ = compressor.compress(('label',),
'strange\ncommon\n', None)
self.assertEqual(sha_string('strange\ncommon\n'), sha1)
- expected_lines = [
- 'f', '\x0f', 'strange\ncommon\n',
- ]
- self.assertEqual(expected_lines, compressor.lines)
+ expected_lines = 'f' '\x0f' 'strange\ncommon\n'
+ self.assertEqual(expected_lines, ''.join(compressor.lines))
self.assertEqual(0, start_point)
self.assertEqual(sum(map(len, expected_lines)), end_point)
def test_empty_content(self):
- compressor = groupcompress.GroupCompressor()
+ compressor = self.compressor()
# Adding empty bytes should return the 'null' record
sha1, start_point, end_point, kind, _ = compressor.compress(('empty',),
'', None)
@@ -73,92 +119,10 @@
self.assertEqual('fulltext', kind)
self.assertEqual(groupcompress._null_sha1, sha1)
- def _chunks_to_repr_lines(self, chunks):
- return '\n'.join(map(repr, ''.join(chunks).split('\n')))
-
- def assertEqualDiffEncoded(self, expected, actual):
- """Compare the actual content to the expected content.
-
- :param expected: A group of chunks that we expect to see
- :param actual: The measured 'chunks'
-
- We will transform the chunks back into lines, and then run 'repr()'
- over them to handle non-ascii characters.
- """
- self.assertEqualDiff(self._chunks_to_repr_lines(expected),
- self._chunks_to_repr_lines(actual))
-
- def test_two_nosha_delta(self):
- compressor = groupcompress.GroupCompressor()
- sha1_1, _, _, _, _ = compressor.compress(('label',),
- 'strange\ncommon long line\nthat needs a 16 byte match\n', None)
- expected_lines = list(compressor.lines)
- sha1_2, start_point, end_point, _, _ = compressor.compress(('newlabel',),
- 'common long line\nthat needs a 16 byte match\ndifferent\n', None)
- self.assertEqual(sha_string('common long line\n'
- 'that needs a 16 byte match\n'
- 'different\n'), sha1_2)
- expected_lines.extend([
- # 'delta', delta length
- 'd\x10',
- # source and target length
- '\x36\x36',
- # copy the line common
- '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
- # add the line different, and the trailing newline
- '\x0adifferent\n', # insert 10 bytes
- ])
- self.assertEqualDiffEncoded(expected_lines, compressor.lines)
- self.assertEqual(sum(map(len, expected_lines)), end_point)
-
- def test_three_nosha_delta(self):
- # The first interesting test: make a change that should use lines from
- # both parents.
- compressor = groupcompress.GroupCompressor()
- sha1_1, _, _, _, _ = compressor.compress(('label',),
- 'strange\ncommon very very long line\nwith some extra text\n', None)
- sha1_2, _, _, _, _ = compressor.compress(('newlabel',),
- 'different\nmoredifferent\nand then some more\n', None)
- expected_lines = list(compressor.lines)
- sha1_3, start_point, end_point, _, _ = compressor.compress(('label3',),
- 'new\ncommon very very long line\nwith some extra text\n'
- 'different\nmoredifferent\nand then some more\n',
- None)
- self.assertEqual(
- sha_string('new\ncommon very very long line\nwith some extra text\n'
- 'different\nmoredifferent\nand then some more\n'),
- sha1_3)
- expected_lines.extend([
- # 'delta', delta length
- 'd\x0c',
- # source and target length
- '\x67\x5f'
- # insert new
- '\x03new',
- # Copy of first parent 'common' range
- '\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
- # Copy of second parent 'different' range
- '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
- ])
- self.assertEqualDiffEncoded(expected_lines, compressor.lines)
- self.assertEqual(sum(map(len, expected_lines)), end_point)
-
- def test_stats(self):
- compressor = groupcompress.GroupCompressor()
- compressor.compress(('label',), 'strange\ncommon long line\n'
- 'plus more text\n', None)
- compressor.compress(('newlabel',),
- 'common long line\nplus more text\n'
- 'different\nmoredifferent\n', None)
- compressor.compress(('label3',),
- 'new\ncommon long line\nplus more text\n'
- '\ndifferent\nmoredifferent\n', None)
- self.assertAlmostEqual(1.4, compressor.ratio(), 1)
-
def test_extract_from_compressor(self):
# Knit fetching will try to reconstruct texts locally which results in
# reading something that is in the compressor stream already.
- compressor = groupcompress.GroupCompressor()
+ compressor = self.compressor()
sha1_1, _, _, _, _ = compressor.compress(('label',),
'strange\ncommon long line\nthat needs a 16 byte match\n', None)
expected_lines = list(compressor.lines)
@@ -174,6 +138,151 @@
compressor.extract(('newlabel',)))
+class TestPyrexGroupCompressor(TestGroupCompressor):
+
+ _test_needs_features = [CompiledGroupcompressFeature]
+ compressor = groupcompress.PyrexGroupCompressor
+
+ def test_stats(self):
+ compressor = self.compressor()
+ compressor.compress(('label',), 'strange\ncommon long line\n'
+ 'plus more text\n', None)
+ compressor.compress(('newlabel',),
+ 'common long line\nplus more text\n'
+ 'different\nmoredifferent\n', None)
+ compressor.compress(('label3',),
+ 'new\ncommon long line\nplus more text\n'
+ '\ndifferent\nmoredifferent\n', None)
+ self.assertAlmostEqual(1.4, compressor.ratio(), 1)
+
+ def test_two_nosha_delta(self):
+ compressor = self.compressor()
+ sha1_1, _, _, _, _ = compressor.compress(('label',),
+ 'strange\ncommon long line\nthat needs a 16 byte match\n', None)
+ expected_lines = list(compressor.lines)
+ sha1_2, start_point, end_point, _, _ = compressor.compress(('newlabel',),
+ 'common long line\nthat needs a 16 byte match\ndifferent\n', None)
+ self.assertEqual(sha_string('common long line\n'
+ 'that needs a 16 byte match\n'
+ 'different\n'), sha1_2)
+ expected_lines.extend([
+ # 'delta', delta length
+ 'd\x10',
+ # source and target length
+ '\x36\x36',
+ # copy the line common
+ '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
+ # add the line different, and the trailing newline
+ '\x0adifferent\n', # insert 10 bytes
+ ])
+ self.assertEqualDiffEncoded(expected_lines, compressor.lines)
+ self.assertEqual(sum(map(len, expected_lines)), end_point)
+
+ def test_three_nosha_delta(self):
+ # The first interesting test: make a change that should use lines from
+ # both parents.
+ compressor = self.compressor()
+ sha1_1, _, _, _, _ = compressor.compress(('label',),
+ 'strange\ncommon very very long line\nwith some extra text\n', None)
+ sha1_2, _, _, _, _ = compressor.compress(('newlabel',),
+ 'different\nmoredifferent\nand then some more\n', None)
+ expected_lines = list(compressor.lines)
+ sha1_3, start_point, end_point, _, _ = compressor.compress(('label3',),
+ 'new\ncommon very very long line\nwith some extra text\n'
+ 'different\nmoredifferent\nand then some more\n',
+ None)
+ self.assertEqual(
+ sha_string('new\ncommon very very long line\nwith some extra text\n'
+ 'different\nmoredifferent\nand then some more\n'),
+ sha1_3)
+ expected_lines.extend([
+ # 'delta', delta length
+ 'd\x0c',
+ # source and target length
+ '\x67\x5f'
+ # insert new
+ '\x03new',
+ # Copy of first parent 'common' range
+ '\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
+ # Copy of second parent 'different' range
+ '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
+ ])
+ self.assertEqualDiffEncoded(expected_lines, compressor.lines)
+ self.assertEqual(sum(map(len, expected_lines)), end_point)
+
+
+class TestPythonGroupCompressor(TestGroupCompressor):
+
+ compressor = groupcompress.PythonGroupCompressor
+
+ def test_stats(self):
+ compressor = self.compressor()
+ compressor.compress(('label',), 'strange\ncommon long line\n'
+ 'plus more text\n', None)
+ compressor.compress(('newlabel',),
+ 'common long line\nplus more text\n'
+ 'different\nmoredifferent\n', None)
+ compressor.compress(('label3',),
+ 'new\ncommon long line\nplus more text\n'
+ '\ndifferent\nmoredifferent\n', None)
+ self.assertAlmostEqual(1.1, compressor.ratio(), 1)
+
+ def test_two_nosha_delta(self):
+ compressor = self.compressor()
+ sha1_1, _, _, _, _ = compressor.compress(('label',),
+ 'strange\ncommon long line\nthat needs a 16 byte match\n', None)
+ expected_lines = list(compressor.lines)
+ sha1_2, start_point, end_point, _, _ = compressor.compress(('newlabel',),
+ 'common long line\nthat needs a 16 byte match\ndifferent\n', None)
+ self.assertEqual(sha_string('common long line\n'
+ 'that needs a 16 byte match\n'
+ 'different\n'), sha1_2)
+ expected_lines.extend([
+ # 'delta', delta length
+ 'd\x10',
+ # source and target length
+ '\x36\x36',
+ # copy the line common
+ '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
+ # add the line different, and the trailing newline
+ '\x0adifferent\n', # insert 10 bytes
+ ])
+ self.assertEqualDiffEncoded(expected_lines, compressor.lines)
+ self.assertEqual(sum(map(len, expected_lines)), end_point)
+
+ def test_three_nosha_delta(self):
+ # The first interesting test: make a change that should use lines from
+ # both parents.
+ compressor = self.compressor()
+ sha1_1, _, _, _, _ = compressor.compress(('label',),
+ 'strange\ncommon very very long line\nwith some extra text\n', None)
+ sha1_2, _, _, _, _ = compressor.compress(('newlabel',),
+ 'different\nmoredifferent\nand then some more\n', None)
+ expected_lines = list(compressor.lines)
+ sha1_3, start_point, end_point, _, _ = compressor.compress(('label3',),
+ 'new\ncommon very very long line\nwith some extra text\n'
+ 'different\nmoredifferent\nand then some more\n',
+ None)
+ self.assertEqual(
+ sha_string('new\ncommon very very long line\nwith some extra text\n'
+ 'different\nmoredifferent\nand then some more\n'),
+ sha1_3)
+ expected_lines.extend([
+ # 'delta', delta length
+ 'd\x0d',
+ # source and target length
+ '\x67\x5f'
+ # insert new
+ '\x04new\n',
+ # Copy of first parent 'common' range
+ '\x91\x0a\x30' # copy, offset 0x0a, 0x30 bytes
+ # Copy of second parent 'different' range
+ '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
+ ])
+ self.assertEqualDiffEncoded(expected_lines, compressor.lines)
+ self.assertEqual(sum(map(len, expected_lines)), end_point)
+
+
class TestEncodeCopyInstruction(tests.TestCase):
def assertCopyInstruction(self, expected, offset, length):
More information about the bazaar-commits
mailing list