[Checkins] SVN: relstorage/branches/postgres_blob_oid/ Uploading and downloading of ZODB blobs on PostgreSQL.
Martijn Pieters
mj at zopatista.com
Tue Jun 14 15:51:37 EDT 2011
Log message for revision 121935:
Uploading and downloading of ZODB blobs on PostgreSQL.
Note that we use lo_import and lo_export where possible, using the native PostgreSQL up- and download methods for maximum performance.
Also clean out any orphaned oids from the temp_blob_chunk table when deleting rows from it when not copied over to blob_chunk. The blob_chunk_delete_trigger procedure can handle this directly.
Changed:
U relstorage/branches/postgres_blob_oid/CHANGES.txt
U relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py
-=-
Modified: relstorage/branches/postgres_blob_oid/CHANGES.txt
===================================================================
--- relstorage/branches/postgres_blob_oid/CHANGES.txt 2011-06-14 18:30:15 UTC (rev 121934)
+++ relstorage/branches/postgres_blob_oid/CHANGES.txt 2011-06-14 19:51:37 UTC (rev 121935)
@@ -11,8 +11,9 @@
- Fix object reference downloading performance for large Oracle RelStorage
database during the garbage collection phase of a pack.
-- On Oracle, switch to storing ZODB blob in chunks up to 4GB, (the maximum
- supported by cx_Oracle) to maximize blob reading and writing performance.
+- On Oracle and PostgreSQL, switch to storing ZODB blob in chunks up to 4GB
+ (the maximum supported by cx_Oracle) or 2GB (PostgreSQL maximum blob size)
+ to maximize blob reading and writing performance.
1.5.0b2 (2011-03-02)
--------------------
Modified: relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py
===================================================================
--- relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py 2011-06-14 18:30:15 UTC (rev 121934)
+++ relstorage/branches/postgres_blob_oid/relstorage/adapters/mover.py 2011-06-14 19:51:37 UTC (rev 121935)
@@ -396,6 +396,11 @@
) ON COMMIT DROP;
CREATE UNIQUE INDEX temp_blob_chunk_key
ON temp_blob_chunk (zoid, chunk_num);
+ -- Trigger to clean out oids that did not get copied to blob_chunk
+ CREATE TRIGGER temp_blob_chunk_delete
+ BEFORE DELETE ON temp_blob_chunk
+ FOR EACH ROW
+ EXECUTE PROCEDURE blob_chunk_delete_trigger();
"""
cursor.execute(stmt)
@@ -994,7 +999,49 @@
def postgresql_download_blob(self, cursor, oid, tid, filename):
"""Download a blob into a file."""
+ stmt = """
+ SELECT chunk_num, chunk
+ FROM blob_chunk
+ WHERE zoid = %s
+ AND tid = %s
+ ORDER BY chunk_num
+ """
+ f = None
+ bytes = 0
+
+ try:
+ cursor.execute(stmt, (oid, tid))
+ for chunk_num, loid in cursor.fetchall():
+ blob = cursor.connection.lobject(loid, 'rb')
+
+ if chunk_num == 0:
+ # Use the native psycopg2 blob export functionality
+ blob.export(filename)
+ blob.close()
+ bytes = os.path.getsize(filename)
+ continue
+
+ if f is None:
+ f = open(filename, 'ab') # Append, chunk 0 was an export
+ read_chunk_size = self.blob_chunk_size
+ while True:
+ read_chunk = blob.read(read_chunk_size)
+ if read_chunk:
+ f.write(read_chunk)
+ bytes += len(read_chunk)
+ else:
+ break
+ except:
+ if f is not None:
+ f.close()
+ os.remove(filename)
+ raise
+
+ if f is not None:
+ f.close()
+ return bytes
+
def mysql_download_blob(self, cursor, oid, tid, filename):
"""Download a blob into a file."""
stmt = """
@@ -1101,7 +1148,74 @@
If serial is None, upload to the temporary table.
"""
+ if tid is not None:
+ if self.keep_history:
+ delete_stmt = """
+ DELETE FROM blob_chunk
+ WHERE zoid = %s AND tid = %s
+ """
+ cursor.execute(delete_stmt, (oid, tid))
+ else:
+ delete_stmt = "DELETE FROM blob_chunk WHERE zoid = %s"
+ cursor.execute(delete_stmt, (oid,))
+ use_tid = True
+ insert_stmt = """
+ INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
+ VALUES (%(oid)s, %(tid)s, %(chunk_num)s, %(loid)s)
+ """
+
+ else:
+ use_tid = False
+ delete_stmt = "DELETE FROM temp_blob_chunk WHERE zoid = %s"
+ cursor.execute(delete_stmt, (oid,))
+
+ insert_stmt = """
+ INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk)
+ VALUES (%(oid)s, %(chunk_num)s, %(loid)s)
+ """
+
+ blob = None
+ # PostgreSQL only supports up to 2GB of data per BLOB.
+ maxsize = 1<<31
+ filesize = os.path.getsize(filename)
+
+ if filesize <= maxsize:
+ # File is small enough to fit in one chunk, just use
+ # psycopg2 native file copy support
+ blob = cursor.connection.lobject(0, 'wb', 0, filename)
+ blob.close()
+ params = dict(oid=oid, chunk_num=0, loid=blob.oid)
+ if use_tid:
+ params['tid'] = tid
+ cursor.execute(insert_stmt, params)
+ return
+
+ # We need to divide this up into multiple chunks
+ f = open(filename, 'rb')
+ try:
+ chunk_num = 0
+ while True:
+ blob = cursor.connection.lobject(0, 'wb')
+ params = dict(oid=oid, chunk_num=chunk_num, loid=blob.oid)
+ if use_tid:
+ params['tid'] = tid
+ cursor.execute(insert_stmt, params)
+
+ write_chunk_size = self.blob_chunk_size
+ for i in xrange(maxsize / write_chunk_size):
+ write_chunk = f.read(write_chunk_size)
+ if not blob.write(write_chunk):
+ # EOF.
+ return
+ if not blob.closed:
+ blob.close()
+ chunk_num += 1
+ finally:
+ f.close()
+ if blob is not None and not blob.closed:
+ blob.close()
+
def mysql_upload_blob(self, cursor, oid, tid, filename):
"""Upload a blob from a file.
More information about the checkins
mailing list