[Zope-CVS] SVN: psycopgda/trunk/adapter.py Explicitly set transaction isolation level and client_encoding setting.

Stuart Bishop stuart at stuartbishop.net
Sun Feb 12 19:44:23 EST 2006


Log message for revision 41611:
  Explicitly set transaction isolation level and client_encoding setting.
  Catch deadlock and serialization exceptions and reraise them as Retry
  exceptions, causing Z3 to retry the transaction.
  
  

Changed:
  U   psycopgda/trunk/adapter.py

-=-
Modified: psycopgda/trunk/adapter.py
===================================================================
--- psycopgda/trunk/adapter.py	2006-02-12 18:07:46 UTC (rev 41610)
+++ psycopgda/trunk/adapter.py	2006-02-13 00:44:23 UTC (rev 41611)
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2002-2004 Zope Corporation and Contributors.
+# Copyright (c) 2002-2006 Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -15,12 +15,18 @@
 
 $Id$
 """
-from zope.app.rdb import ZopeDatabaseAdapter, parseDSN
+from zope.app.rdb import (
+        ZopeDatabaseAdapter, parseDSN, ZopeConnection, ZopeCursor
+        )
+from zope.app.rdb.interfaces import DatabaseException
+from zope.publisher.interfaces import Retry
 
 from datetime import date, time, datetime, timedelta
 import psycopg
 import re
+import sys
 
+PG_ENCODING = 'UTF-8'
 
 # These OIDs are taken from include/server/pg_type.h from PostgreSQL headers.
 # Unfortunatelly psycopg does not export them as constants, and
@@ -321,17 +327,86 @@
     it might be something like '1 month', which is a variable number of days.
     """
 
+    def connect(self):
+        if not self.isConnected():
+            try:
+                self._v_connection = PsycopgConnection(
+                        self._connection_factory(), self
+                        )
+            except psycopg.Error, error:
+                raise DatabaseException, str(error)
+
     def _connection_factory(self):
         """Create a Psycopg DBI connection based on the DSN"""
+        registerTypes(PG_ENCODING)
         conn_info = parseDSN(self.dsn)
         conn_list = []
         for dsnname, optname in dsn2option_mapping.iteritems():
             if conn_info[dsnname]:
                 conn_list.append('%s=%s' % (optname, conn_info[dsnname]))
         conn_str = ' '.join(conn_list)
-        self._registerTypes()
-        return psycopg.connect(conn_str)
+        connection = psycopg.connect(conn_str)
 
-    def _registerTypes(self):
-        """Register type conversions for psycopg"""
-        registerTypes(self.getEncoding())
+        # Ensure we are in SERIALIZABLE transaction isolation level.
+        # This is the default under psycopg1, but changed to READ COMMITTED
+        # under psycopg2. This should become an option if anyone wants
+        # different isolation levels.
+        connection.set_isolation_level(3)
+
+        # Ensure the client_encoding for this connection matches PG_ENCODING
+        cur = connection.cursor()
+        cur.execute("SET client_encoding TO UNICODE")
+        connection.commit()
+        return connection
+
+
+def _handle_psycopg_exception(error):
+    """Called from a exception handler for psycopg.Error.
+
+    If we have a serialization exception or a deadlock, we should retry the
+    transaction by raising a Retry exception. Otherwise, we reraise.
+    """
+    if not error.args:
+        raise
+    msg = error.args[0]
+    # These messages are from PostgreSQL 8.0. They may change between
+    # PostgreSQL releases - if so, the different messages should be added
+    # rather than the existing ones changed so this logic works with
+    # different versions.
+    if msg.startswith(
+            'ERROR:  could not serialize access due to concurrent update'
+            ):
+        raise Retry(sys.exc_info())
+    if msg.startswith('ERROR:  deadlock detected'):
+        raise Retry(sys.exc_info())
+    raise
+
+
+class PsycopgConnection(ZopeConnection):
+    def cursor(self):
+        """See IZopeConnection"""
+        return PsycopgCursor(self.conn.cursor(), self)
+
+    def commit(self):
+        try:
+            ZopeConnection.commit(self)
+        except psycopg.Error, error:
+            _handle_psycopg_exception(error)
+
+
+class PsycopgCursor(ZopeCursor):
+    def execute(self, operation, parameters=None):
+        """See IZopeCursor"""
+        try:
+            return ZopeCursor.execute(self, operation, parameters)
+        except psycopg.Error, error:
+            _handle_psycopg_exception(error)
+
+    def executemany(operation, seq_of_parameters=None):
+        """See IZopeCursor"""
+        raise RuntimeError, 'Oos'
+        try:
+            return ZopeCursor.execute(self, operation, seq_of_parameters)
+        except psycopg.Error, error:
+            _handle_psycopg_exception(error)
+



More information about the Zope-CVS mailing list