[Checkins] SVN: relstorage/trunk/ Merge the postgres_blob_oid branch into trunk
Martijn Pieters
mj at zopatista.com
Wed Jun 15 13:40:03 EDT 2011
Log message for revision 121952:
Merge the postgres_blob_oid branch into trunk
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/notes/migrate-to-1.5.txt
U relstorage/trunk/relstorage/adapters/mover.py
U relstorage/trunk/relstorage/adapters/schema.py
U relstorage/trunk/relstorage/tests/blob/testblob.py
U relstorage/trunk/relstorage/tests/testmysql.py
U relstorage/trunk/relstorage/tests/testoracle.py
U relstorage/trunk/relstorage/tests/testpostgresql.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/CHANGES.txt 2011-06-15 17:40:02 UTC (rev 121952)
@@ -11,8 +11,12 @@
- Fix object reference downloading performance for large Oracle RelStorage
database during the garbage collection phase of a pack.
-- On Oracle, switch to storing ZODB blob in chunks up to 4GB, (the maximum
- supported by cx_Oracle) to maximize blob reading and writing performance.
+- On Oracle and PostgreSQL, switch to storing ZODB blob in chunks up to 4GB
+ (the maximum supported by cx_Oracle) or 2GB (PostgreSQL maximum blob size)
+ to maximize blob reading and writing performance.
+
+ The PostgreSQL blob_chunk schema changed to support this, see
+ notes/migrate-to-1.5.txt to update existing databases.
1.5.0b2 (2011-03-02)
--------------------
Modified: relstorage/trunk/notes/migrate-to-1.5.txt
===================================================================
--- relstorage/trunk/notes/migrate-to-1.5.txt 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/notes/migrate-to-1.5.txt 2011-06-15 17:40:02 UTC (rev 121952)
@@ -21,7 +21,20 @@
ALTER TABLE object_state ALTER COLUMN state_size SET NOT NULL;
COMMIT;
+If you used RelStorage a 1.5.0 version before version b3 you'll need to
+migrate your blob_chunk table schema:
+ BEGIN;
+ ALTER TABLE blob_chunk RENAME COLUMN chunk TO oldbytea;
+ ALTER TABLE blob_chunk ADD COLUMN chunk OID;
+ UPDATE blob_chunk bc SET chunk = (
+ SELECT oid FROM (
+ SELECT oid, lowrite(lo_open(oid, 131072), bc.oldbytea)
+ FROM lo_create(0) o(oid)) x);
+ ALTER TABLE blob_chunk ALTER COLUMN chunk SET NOT NULL;
+ ALTER TABLE blob_chunk DROP COLUMN oldbytea;
+ COMMIT;
+
MySQL history-preserving
------------------------
Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/relstorage/adapters/mover.py 2011-06-15 17:40:02 UTC (rev 121952)
@@ -389,13 +389,19 @@
) ON COMMIT DROP;
CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid);
+ DROP TABLE IF EXISTS temp_blob_chunk;
CREATE TEMPORARY TABLE temp_blob_chunk (
zoid BIGINT NOT NULL,
chunk_num BIGINT NOT NULL,
- chunk BYTEA
- ) ON COMMIT DROP;
+ chunk OID
+ ) ON COMMIT DELETE ROWS;
CREATE UNIQUE INDEX temp_blob_chunk_key
ON temp_blob_chunk (zoid, chunk_num);
+ -- Trigger to clean out oids that did not get copied to blob_chunk
+ CREATE TRIGGER temp_blob_chunk_delete
+ BEFORE DELETE ON temp_blob_chunk
+ FOR EACH ROW
+ EXECUTE PROCEDURE blob_chunk_delete_trigger();
"""
cursor.execute(stmt)
@@ -992,9 +998,54 @@
- def generic_download_blob(self, cursor, oid, tid, filename):
+ def postgresql_download_blob(self, cursor, oid, tid, filename):
"""Download a blob into a file."""
stmt = """
+ SELECT chunk_num, chunk
+ FROM blob_chunk
+ WHERE zoid = %s
+ AND tid = %s
+ ORDER BY chunk_num
+ """
+
+ f = None
+ bytes = 0
+
+ try:
+ cursor.execute(stmt, (oid, tid))
+ for chunk_num, loid in cursor.fetchall():
+ blob = cursor.connection.lobject(loid, 'rb')
+
+ if chunk_num == 0:
+ # Use the native psycopg2 blob export functionality
+ blob.export(filename)
+ blob.close()
+ bytes = os.path.getsize(filename)
+ continue
+
+ if f is None:
+ f = open(filename, 'ab') # Append, chunk 0 was an export
+ read_chunk_size = self.blob_chunk_size
+ while True:
+ read_chunk = blob.read(read_chunk_size)
+ if read_chunk:
+ f.write(read_chunk)
+ bytes += len(read_chunk)
+ else:
+ break
+ except:
+ if f is not None:
+ f.close()
+ os.remove(filename)
+ raise
+
+ if f is not None:
+ f.close()
+ return bytes
+
+ def mysql_download_blob(self, cursor, oid, tid, filename):
+ """Download a blob into a file."""
+ stmt = """
SELECT chunk
FROM blob_chunk
WHERE zoid = %s
@@ -1002,12 +1053,6 @@
AND chunk_num = %s
"""
- use_base64 = False
- if self.database_name == 'postgresql':
- use_base64 = True
- stmt = stmt.replace(
- "SELECT chunk", "SELECT encode(chunk, 'base64')")
-
f = None
bytes = 0
try:
@@ -1023,8 +1068,6 @@
# all, then this method should not write a file.
break
- if use_base64:
- chunk = decodestring(chunk)
if f is None:
f = open(filename, 'wb')
f.write(chunk)
@@ -1040,9 +1083,6 @@
f.close()
return bytes
- mysql_download_blob = generic_download_blob
- postgresql_download_blob = generic_download_blob
-
def oracle_download_blob(self, cursor, oid, tid, filename):
"""Download a blob into a file."""
stmt = """
@@ -1102,7 +1142,7 @@
- def generic_upload_blob(self, cursor, oid, tid, filename):
+ def postgresql_upload_blob(self, cursor, oid, tid, filename):
"""Upload a blob from a file.
If serial is None, upload to the temporary table.
@@ -1121,8 +1161,9 @@
use_tid = True
insert_stmt = """
INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
- VALUES (%s, %s, %s, CHUNK)
+ VALUES (%(oid)s, %(tid)s, %(chunk_num)s, %(loid)s)
"""
+
else:
use_tid = False
delete_stmt = "DELETE FROM temp_blob_chunk WHERE zoid = %s"
@@ -1130,17 +1171,81 @@
insert_stmt = """
INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk)
- VALUES (%s, %s, CHUNK)
+ VALUES (%(oid)s, %(chunk_num)s, %(loid)s)
"""
- use_base64 = False
- if self.database_name == 'postgresql':
- use_base64 = True
- insert_stmt = insert_stmt.replace(
- "CHUNK", "decode(%s, 'base64')", 1)
+ blob = None
+ # PostgreSQL only supports up to 2GB of data per BLOB.
+ maxsize = 1<<31
+ filesize = os.path.getsize(filename)
+
+ if filesize <= maxsize:
+ # File is small enough to fit in one chunk, just use
+ # psycopg2 native file copy support
+ blob = cursor.connection.lobject(0, 'wb', 0, filename)
+ blob.close()
+ params = dict(oid=oid, chunk_num=0, loid=blob.oid)
+ if use_tid:
+ params['tid'] = tid
+ cursor.execute(insert_stmt, params)
+ return
+
+ # We need to divide this up into multiple chunks
+ f = open(filename, 'rb')
+ try:
+ chunk_num = 0
+ while True:
+ blob = cursor.connection.lobject(0, 'wb')
+ params = dict(oid=oid, chunk_num=chunk_num, loid=blob.oid)
+ if use_tid:
+ params['tid'] = tid
+ cursor.execute(insert_stmt, params)
+
+ write_chunk_size = self.blob_chunk_size
+ for i in xrange(maxsize / write_chunk_size):
+ write_chunk = f.read(write_chunk_size)
+ if not blob.write(write_chunk):
+ # EOF.
+ return
+ if not blob.closed:
+ blob.close()
+ chunk_num += 1
+ finally:
+ f.close()
+ if blob is not None and not blob.closed:
+ blob.close()
+
+ def mysql_upload_blob(self, cursor, oid, tid, filename):
+ """Upload a blob from a file.
+
+ If serial is None, upload to the temporary table.
+ """
+ if tid is not None:
+ if self.keep_history:
+ delete_stmt = """
+ DELETE FROM blob_chunk
+ WHERE zoid = %s AND tid = %s
+ """
+ cursor.execute(delete_stmt, (oid, tid))
+ else:
+ delete_stmt = "DELETE FROM blob_chunk WHERE zoid = %s"
+ cursor.execute(delete_stmt, (oid,))
+
+ use_tid = True
+ insert_stmt = """
+ INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
+ VALUES (%s, %s, %s, %s)
+ """
else:
- insert_stmt = insert_stmt.replace("CHUNK", "%s")
+ use_tid = False
+ delete_stmt = "DELETE FROM temp_blob_chunk WHERE zoid = %s"
+ cursor.execute(delete_stmt, (oid,))
+ insert_stmt = """
+ INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk)
+ VALUES (%s, %s, %s)
+ """
+
f = open(filename, 'rb')
try:
chunk_num = 0
@@ -1150,8 +1255,6 @@
# EOF. Note that we always write at least one
# chunk, even if the blob file is empty.
break
- if use_base64:
- chunk = encodestring(chunk)
if use_tid:
params = (oid, tid, chunk_num, chunk)
else:
@@ -1161,9 +1264,6 @@
finally:
f.close()
- mysql_upload_blob = generic_upload_blob
- postgresql_upload_blob = generic_upload_blob
-
def oracle_upload_blob(self, cursor, oid, tid, filename):
"""Upload a blob from a file.
Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/relstorage/adapters/schema.py 2011-06-15 17:40:02 UTC (rev 121952)
@@ -104,9 +104,10 @@
tid BIGINT NOT NULL,
chunk_num BIGINT NOT NULL,
PRIMARY KEY (zoid, tid, chunk_num),
- chunk BYTEA NOT NULL
+ chunk OID NOT NULL
);
CREATE INDEX blob_chunk_lookup ON blob_chunk (zoid, tid);
+ CREATE INDEX blob_chunk_loid ON blob_chunk (chunk);
ALTER TABLE blob_chunk ADD CONSTRAINT blob_chunk_fk
FOREIGN KEY (zoid, tid)
REFERENCES object_state (zoid, tid)
@@ -394,6 +395,38 @@
CREATE SEQUENCE zoid_seq;
"""
+postgresql_history_preserving_plpgsql = """
+CREATE OR REPLACE FUNCTION blob_chunk_delete_trigger() RETURNS TRIGGER
+AS $blob_chunk_delete_trigger$
+ -- Version: %s
+ -- Unlink large object data file after blob_chunck row deletion
+ DECLARE
+ expect integer;
+ cnt integer;
+ BEGIN
+ expect = 1; -- The number of rows where we'll unlink the oid
+ IF (TG_TABLE_NAME != 'blob_chunk') THEN
+ expect = 0; -- Deleting from elsewhere means we expect 0
+ END IF;
+ SELECT count(*) into cnt FROM blob_chunk WHERE chunk=OLD.chunk;
+ IF (cnt = expect) THEN
+ -- Last reference to this oid, unlink
+ PERFORM lo_unlink(OLD.chunk);
+ END IF;
+ RETURN OLD;
+ END;
+$blob_chunk_delete_trigger$ LANGUAGE plpgsql;
+/
+
+DROP TRIGGER IF EXISTS blob_chunk_delete ON blob_chunk;
+/
+CREATE TRIGGER blob_chunk_delete
+ BEFORE DELETE ON blob_chunk
+ FOR EACH ROW
+ EXECUTE PROCEDURE blob_chunk_delete_trigger();
+/
+""" % relstorage_op_version
+
oracle_history_preserving_plsql = """
CREATE OR REPLACE PACKAGE relstorage_op AS
TYPE numlist IS TABLE OF NUMBER(20) INDEX BY BINARY_INTEGER;
@@ -494,7 +527,7 @@
chunk_num BIGINT NOT NULL,
PRIMARY KEY (zoid, chunk_num),
tid BIGINT NOT NULL,
- chunk BYTEA NOT NULL
+ chunk OID NOT NULL
);
CREATE INDEX blob_chunk_lookup ON blob_chunk (zoid);
ALTER TABLE blob_chunk ADD CONSTRAINT blob_chunk_fk
@@ -674,6 +707,8 @@
CREATE SEQUENCE zoid_seq;
"""
+postgresql_history_free_plpgsql = postgresql_history_preserving_plpgsql
+
oracle_history_free_plsql = """
CREATE OR REPLACE PACKAGE relstorage_op AS
TYPE numlist IS TABLE OF NUMBER(20) INDEX BY BINARY_INTEGER;
@@ -885,6 +920,41 @@
connmanager, runner, keep_history)
self.locker = locker
+ def prepare(self):
+ """Create the database schema if it does not already exist."""
+ def callback(conn, cursor):
+ tables = self.list_tables(cursor)
+ if not 'object_state' in tables:
+ self.create(cursor)
+ else:
+ self.check_compatibility(cursor, tables)
+ self.update_schema(cursor, tables)
+ triggers = self.list_triggers(cursor)
+ if triggers.get('blob_chunk_delete_trigger') != relstorage_op_version:
+ self.install_triggers(cursor)
+ triggers = self.list_triggers(cursor)
+ if triggers.get('blob_chunk_delete_trigger') != relstorage_op_version:
+ raise AssertionError(
+ "Could not get version information after "
+ "installing the blob_chunk_delete_trigger trigger.")
+ self.connmanager.open_and_call(callback)
+
+ def install_triggers(self, cursor):
+ """Install the PL/pgSQL triggers"""
+ if self.keep_history:
+ plpgsql = postgresql_history_preserving_plpgsql
+ else:
+ plpgsql = postgresql_history_free_plpgsql
+
+ lines = []
+ for line in plpgsql.splitlines():
+ if line.strip() == '/':
+ # end of a statement
+ cursor.execute('\n'.join(lines))
+ lines = []
+ elif line.strip():
+ lines.append(line)
+
def create(self, cursor):
"""Create the database tables."""
super(PostgreSQLSchemaInstaller, self).create(cursor)
@@ -899,7 +969,34 @@
cursor.execute("SELECT relname FROM pg_class WHERE relkind = 'S'")
return [name for (name,) in cursor]
+ def list_triggers(self, cursor):
+ """Returns {trigger name: version}. version may be None."""
+ stmt = """
+ SELECT proname, prosrc
+ FROM pg_catalog.pg_namespace n
+ JOIN pg_catalog.pg_proc p ON pronamespace = n.oid
+ JOIN pg_catalog.pg_type t ON prorettype = t.oid
+ WHERE nspname = 'public' AND typname = 'trigger'
+ """
+ cursor.execute(stmt)
+ res = {}
+ for (name, text) in cursor:
+ version = None
+ match = re.search(r'Version:\s*([0-9a-zA-Z.]+)', text)
+ if match is not None:
+ version = match.group(1)
+ res[name.lower()] = version
+ return res
+ def drop_all(self):
+ def callback(conn, cursor):
+ # make sure we clean up our blob oids first
+ if 'blob_chunk' in self.list_tables(cursor):
+ cursor.execute("DELETE FROM blob_chunk")
+ self.connmanager.open_and_call(callback)
+ super(PostgreSQLSchemaInstaller, self).drop_all()
+
+
class MySQLSchemaInstaller(AbstractSchemaInstaller):
implements(ISchemaInstaller)
Modified: relstorage/trunk/relstorage/tests/blob/testblob.py
===================================================================
--- relstorage/trunk/relstorage/tests/blob/testblob.py 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/relstorage/tests/blob/testblob.py 2011-06-15 17:40:02 UTC (rev 121952)
@@ -17,6 +17,8 @@
from zope.testing import doctest
import atexit
+import collections
+import datetime
import os
import random
import re
@@ -34,6 +36,12 @@
import ZODB.tests.util
import zope.testing.renormalizing
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import new as md5
+
+
def new_time():
"""Create a _new_ time stamp.
@@ -49,6 +57,51 @@
return new_time
+def random_file(size, fd):
+ """Create a random data of at least the given size, writing to fd.
+
+ See http://jessenoller.com/2008/05/30/making-re-creatable-random-data-files-really-fast-in-python/
+ for the technique used.
+
+ Returns the md5 sum of the file contents for easy comparison.
+
+ """
+ def fdata():
+ seed = "1092384956781341341234656953214543219"
+ # Just use the this module as the source of our data
+ words = open(__file__, "r").read().replace("\n", '').split()
+ a = collections.deque(words)
+ b = collections.deque(seed)
+ while True:
+ yield ' '.join(list(a)[0:1024])
+ a.rotate(int(b[0]))
+ b.rotate(1)
+ datagen = fdata()
+ bytes = 0
+ md5sum = md5()
+ while bytes < size:
+ data = datagen.next()
+ md5sum.update(data)
+ fd.write(data)
+ bytes += len(data)
+ return md5sum.hexdigest()
+
+
+def md5sum(fd):
+ md5sum = md5()
+ blocksize = md5sum.block_size << 8
+ for data in iter(lambda: fd.read(blocksize), ''):
+ md5sum.update(data)
+ return md5sum.hexdigest()
+
+
+def sizeof_fmt(num):
+ for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
+ if num < 1024.0:
+ return "%3.1f%s" % (num, x)
+ num /= 1024.0
+
+
class BlobTestBase(ZODB.tests.StorageTestBase.StorageTestBase):
def setUp(self):
@@ -210,6 +263,47 @@
self.compare(self._storage, self._dst)
+class LargeBlobTest(BlobTestBase):
+ """Test large blob upload and download.
+
+ Note that this test excercises the blob storage and only makes sense
+ when shared_blob_support=False.
+
+ """
+ level = 2 # Only run when selecting -a 2 or higher, or --all
+ testsize = 0 # Set on the auto-generated parent class
+
+ def _log(self, msg):
+ print '%s [%s]: %s' % (
+ datetime.datetime.now().isoformat(' '),
+ self.__class__.__name__, msg)
+
+ def testLargeBlob(self):
+ # Large blobs are chunked into multiple pieces, we want to know
+ # if they come out the same way they went in.
+ db = DB(self._storage)
+ conn = db.open()
+ blob = conn.root()[1] = ZODB.blob.Blob()
+ size = sizeof_fmt(self.testsize)
+ self._log('Creating %s blob file' % size)
+ signature = random_file(self.testsize, blob.open('w'))
+ self._log('Committing %s blob file' % size)
+ transaction.commit()
+
+ # Clear the cache
+ for base, dir, files in os.walk('.'):
+ for f in files:
+ if f.endswith('.blob'):
+ ZODB.blob.remove_committed(os.path.join(base, f))
+
+ # Re-download blob
+ self._log('Caching %s blob file' % size)
+ conn = db.open()
+ blob = conn.root()[1].open('r')
+ self._log('Creating signature for %s blob cache' % size)
+ self.assertEqual(md5sum(blob), signature)
+
+
def packing_with_uncommitted_data_non_undoing():
"""
This covers regression for bug #130459.
@@ -495,6 +589,7 @@
keep_history=True,
pack_test_name='blob_packing.txt',
test_blob_cache=False,
+ large_blob_size=None
):
"""Return a test suite for a generic IBlobStorage.
@@ -539,10 +634,11 @@
blob_dir = '%s.bobs' % name
return factory(name, blob_dir, **kw)
- def add_test_based_on_test_class(class_):
+ def add_test_based_on_test_class(class_, **attr):
+ attr.update(create_storage=create_storage)
new_class = class_.__class__(
prefix+class_.__name__, (class_, ),
- dict(create_storage=create_storage),
+ attr,
)
suite.addTest(unittest.makeSuite(new_class))
@@ -550,6 +646,8 @@
add_test_based_on_test_class(RecoveryBlobStorage)
if test_undo:
add_test_based_on_test_class(BlobUndoTests)
+ if large_blob_size:
+ add_test_based_on_test_class(LargeBlobTest, testsize=large_blob_size)
suite.layer = MinimalTestLayer(prefix+'BlobTests')
Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/relstorage/tests/testmysql.py 2011-06-15 17:40:02 UTC (rev 121952)
@@ -202,6 +202,10 @@
else:
pack_test_name = 'blob_packing_history_free.txt'
+ # MySQL is limited to the blob_chunk_size as there is no
+ # native blob streaming support.
+ blob_size = Options().blob_chunk_size
+
suite.addTest(storage_reusable_suite(
prefix, create_storage,
test_blob_storage_recovery=True,
@@ -209,6 +213,7 @@
test_undo=keep_history,
pack_test_name=pack_test_name,
test_blob_cache=(not shared_blob_dir),
+ large_blob_size=(not shared_blob_dir) and blob_size + 100
))
return suite
Modified: relstorage/trunk/relstorage/tests/testoracle.py
===================================================================
--- relstorage/trunk/relstorage/tests/testoracle.py 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/relstorage/tests/testoracle.py 2011-06-15 17:40:02 UTC (rev 121952)
@@ -22,6 +22,7 @@
from relstorage.tests.hptestbase import HistoryPreservingToFileStorage
import logging
import os
+import sys
import unittest
@@ -211,6 +212,10 @@
else:
pack_test_name = 'blob_packing_history_free.txt'
+ # cx_Oracle blob support can only address up to sys.maxint on
+ # 32-bit systems, 4GB otherwise.
+ blob_size = min(sys.maxint, 1<<32)
+
suite.addTest(storage_reusable_suite(
prefix, create_storage,
test_blob_storage_recovery=True,
@@ -218,6 +223,7 @@
test_undo=keep_history,
pack_test_name=pack_test_name,
test_blob_cache=(not shared_blob_dir),
+ large_blob_size=(not shared_blob_dir) and blob_size + 100,
))
return suite
Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py 2011-06-15 17:29:09 UTC (rev 121951)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py 2011-06-15 17:40:02 UTC (rev 121952)
@@ -203,6 +203,8 @@
test_undo=keep_history,
pack_test_name=pack_test_name,
test_blob_cache=(not shared_blob_dir),
+ # PostgreSQL blob chunks are max 2GB in size
+ large_blob_size=(not shared_blob_dir) and (1<<31) + 100,
))
return suite
More information about the checkins
mailing list