Rev 2828: Move write stream management into NewPack. in http://people.ubuntu.com/~robertc/baz2.0/repository
Robert Collins
robertc at robertcollins.net
Tue Oct 16 05:59:04 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/repository
------------------------------------------------------------
revno: 2828
revision-id: robertc at robertcollins.net-20071016045856-w4b1pjj0a6fp8itd
parent: robertc at robertcollins.net-20071016025911-x3eef87ruitb7so9
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Tue 2007-10-16 14:58:56 +1000
message:
Move write stream management into NewPack.
modified:
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-10-16 02:59:11 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-10-16 04:58:56 +0000
@@ -225,6 +225,26 @@
self.random_name = rand_chars(20) + upload_suffix
# when was this pack started ?
self.start_time = time.time()
+ # open an output stream for the data added to the pack.
+ self.write_stream = self.upload_transport.open_write_stream(
+ self.random_name)
+ if 'fetch' in debug.debug_flags:
+ mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
+ time.ctime(), self.upload_transport.base, self.random_name,
+ time.time() - self.start_time)
+ # create a callable for adding data
+ def _write_data(bytes, write=self.write_stream.write,
+ update=self._hash.update):
+ write(bytes)
+ update(bytes)
+ # expose this on self, for the occasion when clients want to add data.
+ self._write_data = _write_data
+
+ def abort(self):
+ """Cancel creating this pack."""
+ # Remove the temporary pack file.
+ self.upload_transport.delete(self.random_name)
+ # The indices have no state on disk.
def data_inserted(self):
"""True if data has been added to this pack."""
@@ -252,7 +272,7 @@
self._write_index('text', self.text_index, 'file texts')
self._write_index('signature', self.signature_index,
'revision signatures')
-
+ self.write_stream.close()
def make_index(self, index_type):
"""Construct a GraphIndex object for this packs index 'index_type'."""
@@ -523,18 +543,12 @@
'%s%s %s revisions wanted %s t=0',
time.ctime(), self._upload_transport.base, random_name,
plain_pack_list, rev_count)
- write_stream = self._upload_transport.open_write_stream(random_name)
- if 'fetch' in debug.debug_flags:
- mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
- time.ctime(), self._upload_transport.base, random_name,
- time.time() - new_pack.start_time)
buffer = []
- def write_data(bytes, update=new_pack._hash.update, write=write_stream.write):
+ def write_data(bytes, write=new_pack._write_data):
buffer.append(bytes)
if len(buffer) == 640:
bytes = ''.join(buffer)
write(bytes)
- update(bytes)
del buffer[:]
writer = pack.ContainerWriter(write_data)
writer.begin()
@@ -628,13 +642,12 @@
time.time() - new_pack.start_time)
# finish the pack
writer.end()
+ # flush the write-cache if needed.
if len(buffer):
bytes = ''.join(buffer)
- write_stream.write(bytes)
- new_pack._hash.update(bytes)
+ new_pack._write_data(bytes)
if not new_pack.data_inserted():
- # nothing was copied, discard the new pack.
- self._upload_transport.delete(random_name)
+ new_pack.abort()
return None
# write indices
new_pack.finish()
@@ -642,7 +655,6 @@
self.allocate(new_pack)
# rename into place. XXX: should rename each index too rather than just
# uploading blind under the chosen name.
- write_stream.close()
self._upload_transport.rename(random_name, '../packs/' + new_pack.name + '.pack')
result = new_pack
if 'fetch' in debug.debug_flags:
@@ -1098,24 +1110,18 @@
finally:
self.release_names()
- def setup(self):
+ def _start_write_group(self):
# Do not permit preparation for writing if we're not in a 'write lock'.
if not self.repo.is_write_locked():
raise errors.NotWriteLocked(self)
self._new_pack = NewPack(self._upload_transport, self._index_transport,
self._pack_transport, upload_suffix='.pack')
- def _start_write_group(self):
- self.setup()
random_name = self._new_pack.random_name
+
self.repo._open_pack_tuple = (self._upload_transport, random_name)
- write_stream = self._upload_transport.open_write_stream(random_name)
- self._write_stream = write_stream
- def write_data(bytes, write=write_stream.write,
- update=self._new_pack._hash.update):
- write(bytes)
- update(bytes)
- self._open_pack_writer = pack.ContainerWriter(write_data)
+
+ self._open_pack_writer = pack.ContainerWriter(self._new_pack._write_data)
self._open_pack_writer.begin()
self.repo._revision_store.setup()
self.repo.weave_store.setup()
@@ -1130,7 +1136,6 @@
if self._new_pack.data_inserted():
self._open_pack_writer.end()
self._new_pack.finish()
- self._write_stream.close()
self._upload_transport.rename(self.repo._open_pack_tuple[1],
'../packs/' + self._new_pack.name + '.pack')
# If this fails, its a hash collision. We should:
@@ -1154,12 +1159,10 @@
self.refresh_inventory_index()
self.refresh_revision_signature_indices()
else:
- # remove the pending upload
- self._upload_transport.delete(self.repo._open_pack_tuple[1])
+ self._new_pack.abort()
# forget what names there are - XXX should just refresh them and apply
# the delta to our pack list and object maps.
self.reset()
- self._write_stream = None
class GraphKnitRevisionStore(KnitRevisionStore):
More information about the bazaar-commits
mailing list