[Checkins] SVN: relstorage/trunk/relstorage/adapters/ PostgreSQL 8.1 doesn't support multi-row insert, but 8.2+ does.
Shane Hathaway
shane at hathawaymix.org
Thu Oct 8 12:11:38 EDT 2009
Log message for revision 104939:
PostgreSQL 8.1 doesn't support multi-row insert, but 8.2+ does.
Changed:
U relstorage/trunk/relstorage/adapters/batch.py
U relstorage/trunk/relstorage/adapters/locker.py
U relstorage/trunk/relstorage/adapters/mover.py
U relstorage/trunk/relstorage/adapters/postgresql.py
-=-
Modified: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py 2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/batch.py 2009-10-08 16:11:37 UTC (rev 104939)
@@ -25,6 +25,7 @@
row_limit = 100
size_limit = 1<<20
database_name = None
+ support_batch_insert = True
def __init__(self, cursor):
self.cursor = cursor
@@ -76,17 +77,35 @@
def do_inserts(self):
items = sorted(self.inserts.items())
for (command, header, row_schema), rows in items:
- parts = []
- params = []
- s = "(%s)" % row_schema
- for row in rows.values():
- parts.append(s)
- params.extend(row)
- parts = ',\n'.join(parts)
- stmt = "%s INTO %s VALUES %s" % (command, header, parts)
- self.cursor.execute(stmt, tuple(params))
+ if self.support_batch_insert:
+ parts = []
+ params = []
+ s = "(%s)" % row_schema
+ for row in rows.values():
+ parts.append(s)
+ params.extend(row)
+ parts = ',\n'.join(parts)
+ stmt = "%s INTO %s VALUES %s" % (command, header, parts)
+ self.cursor.execute(stmt, tuple(params))
+ else:
+ for row in rows.values():
+ stmt = "%s INTO %s VALUES (%s)" % (
+ command, header, row_schema)
+ self.cursor.execute(stmt, tuple(row))
+class PostgreSQLRowBatcher(RowBatcher):
+
+ def __init__(self, cursor, version_detector):
+ super(PostgreSQLRowBatcher, self).__init__(cursor)
+ self.support_batch_insert = (
+ version_detector.get_version(cursor) >= (8, 2))
+
+
+class MySQLRowBatcher(RowBatcher):
+ pass
+
+
oracle_rowvar_re = re.compile(":([a-zA-Z0-9_]+)")
class OracleRowBatcher(RowBatcher):
@@ -137,3 +156,4 @@
if stmt_inputsizes:
self.cursor.setinputsizes(**stmt_inputsizes)
self.cursor.execute(stmt, params)
+
Modified: relstorage/trunk/relstorage/adapters/locker.py
===================================================================
--- relstorage/trunk/relstorage/adapters/locker.py 2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/locker.py 2009-10-08 16:11:37 UTC (rev 104939)
@@ -17,7 +17,6 @@
from relstorage.adapters.interfaces import ILocker
from ZODB.POSException import StorageError
from zope.interface import implements
-import re
commit_lock_timeout = 30
@@ -32,6 +31,11 @@
class PostgreSQLLocker(Locker):
implements(ILocker)
+ def __init__(self, keep_history, lock_exceptions, version_detector):
+ super(PostgreSQLLocker, self).__init__(
+ keep_history=keep_history, lock_exceptions=lock_exceptions)
+ self.version_detector = version_detector
+
def hold_commit_lock(self, cursor, ensure_current=False):
if ensure_current:
# Hold commit_lock to prevent concurrent commits
@@ -57,19 +61,9 @@
# no action needed
pass
- def _pg_version(self, cursor):
- """Return the (major, minor) version of PostgreSQL"""
- cursor.execute("SELECT version()")
- v = cursor.fetchone()[0]
- m = re.search(r"([0-9]+)[.]([0-9]+)", v)
- if m is None:
- raise AssertionError("Unable to detect PostgreSQL version: " + v)
- else:
- return int(m.group(1)), int(m.group(2))
-
def _pg_has_advisory_locks(self, cursor):
"""Return true if this version of PostgreSQL supports advisory locks"""
- return self._pg_version(cursor) >= (8, 2)
+ return self.version_detector.get_version(cursor) >= (8, 2)
def create_pack_lock(self, cursor):
if not self._pg_has_advisory_locks(cursor):
Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py 2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/mover.py 2009-10-08 16:11:37 UTC (rev 104939)
@@ -17,8 +17,9 @@
from base64 import decodestring
from base64 import encodestring
from relstorage.adapters.interfaces import IObjectMover
+from relstorage.adapters.batch import MySQLRowBatcher
from relstorage.adapters.batch import OracleRowBatcher
-from relstorage.adapters.batch import RowBatcher
+from relstorage.adapters.batch import PostgreSQLRowBatcher
from zope.interface import implements
try:
@@ -45,6 +46,7 @@
'load_before',
'get_object_tid_after',
'on_store_opened',
+ 'make_batcher',
'store_temp',
'restore',
'detect_conflict',
@@ -54,7 +56,8 @@
)
def __init__(self, database_name, keep_history, runner=None,
- Binary=None, inputsize_BLOB=None, inputsize_BINARY=None):
+ Binary=None, inputsize_BLOB=None, inputsize_BINARY=None,
+ version_detector=None):
# The inputsize parameters are for Oracle only.
self.database_name = database_name
self.keep_history = keep_history
@@ -65,20 +68,15 @@
'blobdata': inputsize_BLOB,
'rawdata': inputsize_BINARY,
}
+ self.version_detector = version_detector
for method_name in self._method_names:
method = getattr(self, '%s_%s' % (database_name, method_name))
setattr(self, method_name, method)
- def make_batcher(self, cursor):
- if self.database_name == 'oracle':
- return OracleRowBatcher(cursor, self.inputsizes)
- else:
- return RowBatcher(cursor)
-
def postgresql_load_current(self, cursor, oid):
"""Returns the current pickle and integer tid for an object.
@@ -392,6 +390,18 @@
+ def postgresql_make_batcher(self, cursor):
+ return PostgreSQLRowBatcher(cursor, self.version_detector)
+
+ def mysql_make_batcher(self, cursor):
+ return MySQLRowBatcher(cursor)
+
+ def oracle_make_batcher(self, cursor):
+ return OracleRowBatcher(cursor, self.inputsizes)
+
+
+
+
def postgresql_store_temp(self, cursor, batcher, oid, prev_tid, data):
"""Store an object in the temporary table."""
if self.keep_history:
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2009-10-08 16:11:37 UTC (rev 104939)
@@ -16,6 +16,7 @@
import logging
import psycopg2
import psycopg2.extensions
+import re
from zope.interface import implements
from relstorage.adapters.connmanager import AbstractConnectionManager
@@ -60,6 +61,7 @@
self.keep_history = options.keep_history
else:
self.keep_history = True
+ self.version_detector = PostgreSQLVersionDetector()
self.connmanager = Psycopg2ConnectionManager(
dsn=dsn,
options=options,
@@ -69,6 +71,7 @@
self.locker = PostgreSQLLocker(
keep_history=self.keep_history,
lock_exceptions=(psycopg2.DatabaseError,),
+ version_detector=self.version_detector,
)
self.schema = PostgreSQLSchemaInstaller(
connmanager=self.connmanager,
@@ -80,6 +83,7 @@
database_name='postgresql',
keep_history=self.keep_history,
runner=self.runner,
+ version_detector=self.version_detector,
)
self.connmanager.set_on_store_opened(self.mover.on_store_opened)
self.oidallocator = PostgreSQLOIDAllocator()
@@ -221,7 +225,22 @@
FROM object_state
ORDER BY tid DESC
LIMIT 1
- """
+ """
cursor.execute(stmt)
return conn, cursor
+
+class PostgreSQLVersionDetector(object):
+
+ version = None
+
+ def get_version(self, cursor):
+ """Return the (major, minor) version of the database"""
+ if self.version is None:
+ cursor.execute("SELECT version()")
+ v = cursor.fetchone()[0]
+ m = re.search(r"([0-9]+)[.]([0-9]+)", v)
+ if m is None:
+ raise AssertionError("Unable to detect database version: " + v)
+ self.version = int(m.group(1)), int(m.group(2))
+ return self.version
More information about the checkins
mailing list