[Checkins] SVN: relstorage/trunk/relstorage/adapters/ PostgreSQL 8.1 doesn't support multi-row insert, but 8.2+ does.

Shane Hathaway shane at hathawaymix.org
Thu Oct 8 12:11:38 EDT 2009


Log message for revision 104939:
  PostgreSQL 8.1 doesn't support multi-row insert, but 8.2+ does.
  

Changed:
  U   relstorage/trunk/relstorage/adapters/batch.py
  U   relstorage/trunk/relstorage/adapters/locker.py
  U   relstorage/trunk/relstorage/adapters/mover.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py

-=-
Modified: relstorage/trunk/relstorage/adapters/batch.py
===================================================================
--- relstorage/trunk/relstorage/adapters/batch.py	2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/batch.py	2009-10-08 16:11:37 UTC (rev 104939)
@@ -25,6 +25,7 @@
     row_limit = 100
     size_limit = 1<<20
     database_name = None
+    support_batch_insert = True
 
     def __init__(self, cursor):
         self.cursor = cursor
@@ -76,17 +77,35 @@
     def do_inserts(self):
         items = sorted(self.inserts.items())
         for (command, header, row_schema), rows in items:
-            parts = []
-            params = []
-            s = "(%s)" % row_schema
-            for row in rows.values():
-                parts.append(s)
-                params.extend(row)
-            parts = ',\n'.join(parts)
-            stmt = "%s INTO %s VALUES %s" % (command, header, parts)
-            self.cursor.execute(stmt, tuple(params))
+            if self.support_batch_insert:
+                parts = []
+                params = []
+                s = "(%s)" % row_schema
+                for row in rows.values():
+                    parts.append(s)
+                    params.extend(row)
+                parts = ',\n'.join(parts)
+                stmt = "%s INTO %s VALUES %s" % (command, header, parts)
+                self.cursor.execute(stmt, tuple(params))
+            else:
+                for row in rows.values():
+                    stmt = "%s INTO %s VALUES (%s)" % (
+                        command, header, row_schema)
+                    self.cursor.execute(stmt, tuple(row))
 
 
+class PostgreSQLRowBatcher(RowBatcher):
+
+    def __init__(self, cursor, version_detector):
+        super(PostgreSQLRowBatcher, self).__init__(cursor)
+        self.support_batch_insert = (
+            version_detector.get_version(cursor) >= (8, 2))
+
+
+class MySQLRowBatcher(RowBatcher):
+    pass
+
+
 oracle_rowvar_re = re.compile(":([a-zA-Z0-9_]+)")
 
 class OracleRowBatcher(RowBatcher):
@@ -137,3 +156,4 @@
                 if stmt_inputsizes:
                     self.cursor.setinputsizes(**stmt_inputsizes)
                 self.cursor.execute(stmt, params)
+

Modified: relstorage/trunk/relstorage/adapters/locker.py
===================================================================
--- relstorage/trunk/relstorage/adapters/locker.py	2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/locker.py	2009-10-08 16:11:37 UTC (rev 104939)
@@ -17,7 +17,6 @@
 from relstorage.adapters.interfaces import ILocker
 from ZODB.POSException import StorageError
 from zope.interface import implements
-import re
 
 commit_lock_timeout = 30
 
@@ -32,6 +31,11 @@
 class PostgreSQLLocker(Locker):
     implements(ILocker)
 
+    def __init__(self, keep_history, lock_exceptions, version_detector):
+        super(PostgreSQLLocker, self).__init__(
+            keep_history=keep_history, lock_exceptions=lock_exceptions)
+        self.version_detector = version_detector
+
     def hold_commit_lock(self, cursor, ensure_current=False):
         if ensure_current:
             # Hold commit_lock to prevent concurrent commits
@@ -57,19 +61,9 @@
         # no action needed
         pass
 
-    def _pg_version(self, cursor):
-        """Return the (major, minor) version of PostgreSQL"""
-        cursor.execute("SELECT version()")
-        v = cursor.fetchone()[0]
-        m = re.search(r"([0-9]+)[.]([0-9]+)", v)
-        if m is None:
-            raise AssertionError("Unable to detect PostgreSQL version: " + v)
-        else:
-            return int(m.group(1)), int(m.group(2))
-
     def _pg_has_advisory_locks(self, cursor):
         """Return true if this version of PostgreSQL supports advisory locks"""
-        return self._pg_version(cursor) >= (8, 2)
+        return self.version_detector.get_version(cursor) >= (8, 2)
 
     def create_pack_lock(self, cursor):
         if not self._pg_has_advisory_locks(cursor):

Modified: relstorage/trunk/relstorage/adapters/mover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py	2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/mover.py	2009-10-08 16:11:37 UTC (rev 104939)
@@ -17,8 +17,9 @@
 from base64 import decodestring
 from base64 import encodestring
 from relstorage.adapters.interfaces import IObjectMover
+from relstorage.adapters.batch import MySQLRowBatcher
 from relstorage.adapters.batch import OracleRowBatcher
-from relstorage.adapters.batch import RowBatcher
+from relstorage.adapters.batch import PostgreSQLRowBatcher
 from zope.interface import implements
 
 try:
@@ -45,6 +46,7 @@
         'load_before',
         'get_object_tid_after',
         'on_store_opened',
+        'make_batcher',
         'store_temp',
         'restore',
         'detect_conflict',
@@ -54,7 +56,8 @@
         )
 
     def __init__(self, database_name, keep_history, runner=None,
-            Binary=None, inputsize_BLOB=None, inputsize_BINARY=None):
+            Binary=None, inputsize_BLOB=None, inputsize_BINARY=None,
+            version_detector=None):
         # The inputsize parameters are for Oracle only.
         self.database_name = database_name
         self.keep_history = keep_history
@@ -65,20 +68,15 @@
             'blobdata': inputsize_BLOB,
             'rawdata': inputsize_BINARY,
             }
+        self.version_detector = version_detector
 
         for method_name in self._method_names:
             method = getattr(self, '%s_%s' % (database_name, method_name))
             setattr(self, method_name, method)
 
-    def make_batcher(self, cursor):
-        if self.database_name == 'oracle':
-            return OracleRowBatcher(cursor, self.inputsizes)
-        else:
-            return RowBatcher(cursor)
 
 
 
-
     def postgresql_load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 
@@ -392,6 +390,18 @@
 
 
 
+    def postgresql_make_batcher(self, cursor):
+        return PostgreSQLRowBatcher(cursor, self.version_detector)
+
+    def mysql_make_batcher(self, cursor):
+        return MySQLRowBatcher(cursor)
+
+    def oracle_make_batcher(self, cursor):
+        return OracleRowBatcher(cursor, self.inputsizes)
+
+
+
+
     def postgresql_store_temp(self, cursor, batcher, oid, prev_tid, data):
         """Store an object in the temporary table."""
         if self.keep_history:

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2009-10-08 16:10:03 UTC (rev 104938)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2009-10-08 16:11:37 UTC (rev 104939)
@@ -16,6 +16,7 @@
 import logging
 import psycopg2
 import psycopg2.extensions
+import re
 from zope.interface import implements
 
 from relstorage.adapters.connmanager import AbstractConnectionManager
@@ -60,6 +61,7 @@
             self.keep_history = options.keep_history
         else:
             self.keep_history = True
+        self.version_detector = PostgreSQLVersionDetector()
         self.connmanager = Psycopg2ConnectionManager(
             dsn=dsn,
             options=options,
@@ -69,6 +71,7 @@
         self.locker = PostgreSQLLocker(
             keep_history=self.keep_history,
             lock_exceptions=(psycopg2.DatabaseError,),
+            version_detector=self.version_detector,
             )
         self.schema = PostgreSQLSchemaInstaller(
             connmanager=self.connmanager,
@@ -80,6 +83,7 @@
             database_name='postgresql',
             keep_history=self.keep_history,
             runner=self.runner,
+            version_detector=self.version_detector,
             )
         self.connmanager.set_on_store_opened(self.mover.on_store_opened)
         self.oidallocator = PostgreSQLOIDAllocator()
@@ -221,7 +225,22 @@
             FROM object_state
             ORDER BY tid DESC
             LIMIT 1
-            """            
+            """
         cursor.execute(stmt)
         return conn, cursor
 
+
+class PostgreSQLVersionDetector(object):
+
+    version = None
+
+    def get_version(self, cursor):
+        """Return the (major, minor) version of the database"""
+        if self.version is None:
+            cursor.execute("SELECT version()")
+            v = cursor.fetchone()[0]
+            m = re.search(r"([0-9]+)[.]([0-9]+)", v)
+            if m is None:
+                raise AssertionError("Unable to detect database version: " + v)
+            self.version = int(m.group(1)), int(m.group(2))
+        return self.version



More information about the checkins mailing list