[Checkins] SVN: relstorage/branches/postgres_blob_oid/ Uploading and downloading of ZODB blobs on PostgreSQL.

Martijn Pieters mj at zopatista.com
Tue Jun 14 15:51:37 EDT 2011


Log message for revision 121935:
  Uploading and downloading of ZODB blobs on PostgreSQL.
  
  Note that we use lo_import and lo_export where possible, using the native PostgreSQL up- and download methods for maximum performance.
  
  Also clean out any orphaned oids from the temp_blob_chunk table when deleting rows from it when not copied over to blob_chunk. The blob_chunk_delete_trigger procedure can handle this directly.

Changed:
  U   relstorage/branches/postgres_blob_oid/CHANGES.txt
  U   relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py

-=-
Modified: relstorage/branches/postgres_blob_oid/CHANGES.txt
===================================================================
--- relstorage/branches/postgres_blob_oid/CHANGES.txt	2011-06-14 18:30:15 UTC (rev 121934)
+++ relstorage/branches/postgres_blob_oid/CHANGES.txt	2011-06-14 19:51:37 UTC (rev 121935)
@@ -11,8 +11,9 @@
 - Fix object reference downloading performance for large Oracle RelStorage
   database during the garbage collection phase of a pack.
 
-- On Oracle, switch to storing ZODB blob in chunks up to 4GB, (the maximum
-  supported by cx_Oracle) to maximize blob reading and writing performance.
+- On Oracle and PostgreSQL, switch to storing ZODB blob in chunks up to 4GB
+  (the maximum supported by cx_Oracle) or 2GB (PostgreSQL maximum blob size)
+  to maximize blob reading and writing performance.
 
 1.5.0b2 (2011-03-02)
 --------------------

Modified: relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py
===================================================================
--- relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py	2011-06-14 18:30:15 UTC (rev 121934)
+++ relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py	2011-06-14 19:51:37 UTC (rev 121935)
@@ -396,6 +396,11 @@
         ) ON COMMIT DROP;
         CREATE UNIQUE INDEX temp_blob_chunk_key
             ON temp_blob_chunk (zoid, chunk_num);
+        -- Trigger to clean out oids that did not get copied to blob_chunk
+        CREATE TRIGGER temp_blob_chunk_delete 
+            BEFORE DELETE ON temp_blob_chunk
+            FOR EACH ROW
+            EXECUTE PROCEDURE blob_chunk_delete_trigger();
         """
         cursor.execute(stmt)
 
@@ -994,7 +999,49 @@
 
     def postgresql_download_blob(self, cursor, oid, tid, filename):
         """Download a blob into a file."""
+        stmt = """
+        SELECT chunk_num, chunk
+        FROM blob_chunk
+        WHERE zoid = %s
+            AND tid = %s
+        ORDER BY chunk_num
+        """
 
+        f = None
+        bytes = 0
+        
+        try:
+            cursor.execute(stmt, (oid, tid))
+            for chunk_num, loid in cursor.fetchall():
+                blob = cursor.connection.lobject(loid, 'rb')
+
+                if chunk_num == 0:
+                    # Use the native psycopg2 blob export functionality
+                    blob.export(filename)
+                    blob.close()
+                    bytes = os.path.getsize(filename)
+                    continue
+
+                if f is None:
+                    f = open(filename, 'ab') # Append, chunk 0 was an export
+                read_chunk_size = self.blob_chunk_size
+                while True:
+                    read_chunk = blob.read(read_chunk_size)
+                    if read_chunk:
+                        f.write(read_chunk)
+                        bytes += len(read_chunk)
+                    else:
+                        break
+        except:
+            if f is not None:
+                f.close()
+                os.remove(filename)
+            raise
+
+        if f is not None:
+            f.close()
+        return bytes
+
     def mysql_download_blob(self, cursor, oid, tid, filename):
         """Download a blob into a file."""
         stmt = """
@@ -1101,7 +1148,74 @@
 
         If serial is None, upload to the temporary table.
         """
+        if tid is not None:
+            if self.keep_history:
+                delete_stmt = """
+                DELETE FROM blob_chunk
+                WHERE zoid = %s AND tid = %s
+                """
+                cursor.execute(delete_stmt, (oid, tid))
+            else:
+                delete_stmt = "DELETE FROM blob_chunk WHERE zoid = %s"
+                cursor.execute(delete_stmt, (oid,))
 
+            use_tid = True
+            insert_stmt = """
+            INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
+            VALUES (%(oid)s, %(tid)s, %(chunk_num)s, %(loid)s)
+            """
+
+        else:
+            use_tid = False
+            delete_stmt = "DELETE FROM temp_blob_chunk WHERE zoid = %s"
+            cursor.execute(delete_stmt, (oid,))
+
+            insert_stmt = """
+            INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk)
+            VALUES (%(oid)s, %(chunk_num)s, %(loid)s)
+            """
+
+        blob = None
+        # PostgreSQL only supports up to 2GB of data per BLOB.
+        maxsize = 1<<31
+        filesize = os.path.getsize(filename)
+
+        if filesize <= maxsize:
+            # File is small enough to fit in one chunk, just use
+            # psycopg2 native file copy support
+            blob = cursor.connection.lobject(0, 'wb', 0, filename)
+            blob.close()
+            params = dict(oid=oid, chunk_num=0, loid=blob.oid)
+            if use_tid:
+                params['tid'] = tid
+            cursor.execute(insert_stmt, params)
+            return
+
+        # We need to divide this up into multiple chunks
+        f = open(filename, 'rb')
+        try:
+            chunk_num = 0
+            while True:
+                blob = cursor.connection.lobject(0, 'wb')
+                params = dict(oid=oid, chunk_num=chunk_num, loid=blob.oid)
+                if use_tid:
+                    params['tid'] = tid
+                cursor.execute(insert_stmt, params)
+                
+                write_chunk_size = self.blob_chunk_size
+                for i in xrange(maxsize / write_chunk_size):
+                    write_chunk = f.read(write_chunk_size)
+                    if not blob.write(write_chunk):
+                        # EOF.
+                        return
+                if not blob.closed:
+                    blob.close()
+                chunk_num += 1
+        finally:
+            f.close()
+            if blob is not None and not blob.closed:
+                blob.close()
+
     def mysql_upload_blob(self, cursor, oid, tid, filename):
         """Upload a blob from a file.
 



More information about the checkins mailing list