[Zope3-checkins] SVN: Zope3/trunk/src/zope/app/rdb/ Now SQL operation parameters will be properly encoded before executing

Dmitry Vasiliev "dima athlabs.spb.ru" at zope.org
Sat Aug 6 08:34:44 EDT 2005


Log message for revision 37757:
  Now SQL operation parameters will be properly encoded before executing
  

Changed:
  U   Zope3/trunk/src/zope/app/rdb/__init__.py
  U   Zope3/trunk/src/zope/app/rdb/tests/stubs.py
  U   Zope3/trunk/src/zope/app/rdb/tests/test_zopecursor.py

-=-
Modified: Zope3/trunk/src/zope/app/rdb/__init__.py
===================================================================
--- Zope3/trunk/src/zope/app/rdb/__init__.py	2005-08-06 04:53:36 UTC (rev 37756)
+++ Zope3/trunk/src/zope/app/rdb/__init__.py	2005-08-06 12:34:43 UTC (rev 37757)
@@ -286,15 +286,27 @@
     def execute(self, operation, parameters=None):
         """Executes an operation, registering the underlying
         connection with the transaction system.  """
-
-        if isinstance(operation, unicode):
-            encoding = self.connection.getTypeInfo().getEncoding()
-            operation = operation.encode(encoding)
+        operation, parameters = self._prepareOperation(operation, parameters)
         self.connection.registerForTxn()
         if parameters is None:
             return self.cursor.execute(operation)
         return self.cursor.execute(operation, parameters)
 
+    def _prepareOperation(self, operation, parameters):
+        encoding = self.connection.getTypeInfo().getEncoding()
+        if isinstance(operation, unicode):
+            operation = operation.encode(encoding)
+        if isinstance(parameters, (tuple, list)):
+            parameters = list(parameters)
+            for i, v in enumerate(parameters):
+                if isinstance(v, unicode):
+                    parameters[i] = v.encode(encoding)
+        elif isinstance(parameters, dict):
+            for k, v in parameters.items():
+                if isinstance(v, unicode):
+                    parameters[k] = v.encode(encoding)
+        return operation, parameters
+
     def __getattr__(self, key):
         return getattr(self.cursor, key)
 

Modified: Zope3/trunk/src/zope/app/rdb/tests/stubs.py
===================================================================
--- Zope3/trunk/src/zope/app/rdb/tests/stubs.py	2005-08-06 04:53:36 UTC (rev 37756)
+++ Zope3/trunk/src/zope/app/rdb/tests/stubs.py	2005-08-06 12:34:43 UTC (rev 37757)
@@ -47,5 +47,11 @@
     threadsafety = 0
     encoding = 'utf-8'
 
+    def setEncoding(self, encoding):
+        self.encoding = encoding
+
+    def getEncoding(self):
+        return self.encoding
+
     def getConverter(self, type):
         return lambda x: x

Modified: Zope3/trunk/src/zope/app/rdb/tests/test_zopecursor.py
===================================================================
--- Zope3/trunk/src/zope/app/rdb/tests/test_zopecursor.py	2005-08-06 04:53:36 UTC (rev 37756)
+++ Zope3/trunk/src/zope/app/rdb/tests/test_zopecursor.py	2005-08-06 12:34:43 UTC (rev 37757)
@@ -20,6 +20,7 @@
 from zope.app.rdb import ZopeCursor
 from zope.app.rdb.tests.stubs import *
 
+
 class MyConnectionStub(ConnectionStub):
     def cursor(self):
         return MyCursorStub()
@@ -42,6 +43,10 @@
 
     description = ((None, 'string'), (None, 'int'), (None, 'foo'))
 
+    def execute(self, query, args=None):
+        self.query = query
+        self.args = args
+
     def fetchone(self):
         if self._raw:
             return self._raw[0]
@@ -85,7 +90,8 @@
 class ZopeCursorTests(TestCase):
 
     def setUp(self):
-        zc = ZopeConnection(MyConnectionStub(), MyTypeInfoStub())
+        self.typeInfo = MyTypeInfoStub()
+        zc = ZopeConnection(MyConnectionStub(), self.typeInfo)
         self.cursor = ZopeCursor(zc.conn.cursor(), zc)
 
     def test_cursor_fetchone(self):
@@ -119,7 +125,34 @@
                    'got      %r,\n'
                    'expected %r' % (results, expected))
 
+    def test_cursor_query_encoding(self):
+        self.cursor.execute(u'\u0422\u0435\u0441\u0442')
+        self.assertEqual('\xd0\xa2\xd0\xb5\xd1\x81\xd1\x82',
+            self.cursor.cursor.query)
 
+        self.typeInfo.setEncoding("windows-1251")
+        self.cursor.execute(u'\u0422\u0435\u0441\u0442')
+        self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.query)
+
+    def test_cursor_tuple_args_encoding(self):
+        self.typeInfo.setEncoding("windows-1251")
+        self.cursor.execute("SELECT * FROM table",
+            (u'\u0422\u0435\u0441\u0442',))
+        self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.args[0])
+
+    def test_cursor_list_args_encoding(self):
+        self.typeInfo.setEncoding("windows-1251")
+        self.cursor.execute("SELECT * FROM table",
+            [u'\u0422\u0435\u0441\u0442'])
+        self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.args[0])
+
+    def test_cursor_dict_args_encoding(self):
+        self.typeInfo.setEncoding("windows-1251")
+        self.cursor.execute("SELECT * FROM table",
+            {"value": u'\u0422\u0435\u0441\u0442'})
+        self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.args["value"])
+
+
 def test_suite():
     return makeSuite(ZopeCursorTests)
 



More information about the Zope3-Checkins mailing list