Rev 2828: Move write stream management into NewPack. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Tue Oct 16 05:59:04 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2828
revision-id: robertc at robertcollins.net-20071016045856-w4b1pjj0a6fp8itd
parent: robertc at robertcollins.net-20071016025911-x3eef87ruitb7so9
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Tue 2007-10-16 14:58:56 +1000
message:
  Move write stream management into NewPack.
modified:
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-10-16 02:59:11 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-10-16 04:58:56 +0000
@@ -225,6 +225,26 @@
         self.random_name = rand_chars(20) + upload_suffix
         # when was this pack started ?
         self.start_time = time.time()
+        # open an output stream for the data added to the pack.
+        self.write_stream = self.upload_transport.open_write_stream(
+            self.random_name)
+        if 'fetch' in debug.debug_flags:
+            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
+                time.ctime(), self.upload_transport.base, self.random_name,
+                time.time() - self.start_time)
+        # create a callable for adding data
+        def _write_data(bytes, write=self.write_stream.write,
+                       update=self._hash.update):
+            write(bytes)
+            update(bytes)
+        # expose this on self, for the occasion when clients want to add data.
+        self._write_data = _write_data
+
+    def abort(self):
+        """Cancel creating this pack."""
+        # Remove the temporary pack file.
+        self.upload_transport.delete(self.random_name)
+        # The indices have no state on disk.
 
     def data_inserted(self):
         """True if data has been added to this pack."""
@@ -252,7 +272,7 @@
         self._write_index('text', self.text_index, 'file texts')
         self._write_index('signature', self.signature_index,
             'revision signatures')
-
+        self.write_stream.close()
 
     def make_index(self, index_type):
         """Construct a GraphIndex object for this packs index 'index_type'."""
@@ -523,18 +543,12 @@
                 '%s%s %s revisions wanted %s t=0',
                 time.ctime(), self._upload_transport.base, random_name,
                 plain_pack_list, rev_count)
-        write_stream = self._upload_transport.open_write_stream(random_name)
-        if 'fetch' in debug.debug_flags:
-            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
-                time.ctime(), self._upload_transport.base, random_name,
-                time.time() - new_pack.start_time)
         buffer = []
-        def write_data(bytes, update=new_pack._hash.update, write=write_stream.write):
+        def write_data(bytes, write=new_pack._write_data):
             buffer.append(bytes)
             if len(buffer) == 640:
                 bytes = ''.join(buffer)
                 write(bytes)
-                update(bytes)
                 del buffer[:]
         writer = pack.ContainerWriter(write_data)
         writer.begin()
@@ -628,13 +642,12 @@
                 time.time() - new_pack.start_time)
         # finish the pack
         writer.end()
+        # flush the write-cache if needed.
         if len(buffer):
             bytes = ''.join(buffer)
-            write_stream.write(bytes)
-            new_pack._hash.update(bytes)
+            new_pack._write_data(bytes)
         if not new_pack.data_inserted():
-            # nothing was copied, discard the new pack.
-            self._upload_transport.delete(random_name)
+            new_pack.abort()
             return None
         # write indices
         new_pack.finish()
@@ -642,7 +655,6 @@
         self.allocate(new_pack)
         # rename into place. XXX: should rename each index too rather than just
         # uploading blind under the chosen name.
-        write_stream.close()
         self._upload_transport.rename(random_name, '../packs/' + new_pack.name + '.pack')
         result = new_pack
         if 'fetch' in debug.debug_flags:
@@ -1098,24 +1110,18 @@
         finally:
             self.release_names()
 
-    def setup(self):
+    def _start_write_group(self):
         # Do not permit preparation for writing if we're not in a 'write lock'.
         if not self.repo.is_write_locked():
             raise errors.NotWriteLocked(self)
         self._new_pack = NewPack(self._upload_transport, self._index_transport,
             self._pack_transport, upload_suffix='.pack')
 
-    def _start_write_group(self):
-        self.setup()
         random_name = self._new_pack.random_name
+
         self.repo._open_pack_tuple = (self._upload_transport, random_name)
-        write_stream = self._upload_transport.open_write_stream(random_name)
-        self._write_stream = write_stream
-        def write_data(bytes, write=write_stream.write,
-                       update=self._new_pack._hash.update):
-            write(bytes)
-            update(bytes)
-        self._open_pack_writer = pack.ContainerWriter(write_data)
+
+        self._open_pack_writer = pack.ContainerWriter(self._new_pack._write_data)
         self._open_pack_writer.begin()
         self.repo._revision_store.setup()
         self.repo.weave_store.setup()
@@ -1130,7 +1136,6 @@
         if self._new_pack.data_inserted():
             self._open_pack_writer.end()
             self._new_pack.finish()
-            self._write_stream.close()
             self._upload_transport.rename(self.repo._open_pack_tuple[1],
                 '../packs/' + self._new_pack.name + '.pack')
             # If this fails, its a hash collision. We should:
@@ -1154,12 +1159,10 @@
             self.refresh_inventory_index()
             self.refresh_revision_signature_indices()
         else:
-            # remove the pending upload
-            self._upload_transport.delete(self.repo._open_pack_tuple[1])
+            self._new_pack.abort()
         # forget what names there are - XXX should just refresh them and apply
         # the delta to our pack list and object maps.
         self.reset()
-        self._write_stream = None
 
 
 class GraphKnitRevisionStore(KnitRevisionStore):



More information about the bazaar-commits mailing list