[Zope-Checkins] CVS: Products/DCOracle2/test - dbapi20.py:1.1 procdef.sql:1.16

Matthew T. Kromer matt at zope.com
Wed Sep 24 11:51:04 EDT 2003


Update of /cvs-repository/Products/DCOracle2/test
In directory cvs.zope.org:/tmp/cvs-serv27663

Modified Files:
	procdef.sql 
Added Files:
	dbapi20.py 
Log Message:
Add an old copy of Stuart Bishop's dbapi test suite


=== Added File Products/DCOracle2/test/dbapi20.py ===
#!/usr/bin/env python
'''
$Id: dbapi20.py,v 1.1 2003/09/24 15:51:03 matt Exp $
'''

__rcs_id__  = '$Id: dbapi20.py,v 1.1 2003/09/24 15:51:03 matt Exp $'
__version__ = '$Revision: 1.1 $'[11:-2]
__author__ = 'Stuart Bishop <zen at shangri-la.dropbear.id.au>'

import unittest
import time

class test_DBAPI20:
    ''' Test a database self.driver for DB API 2.0 compatibility.
        This implementation tests Gadfly, but the TestCase
        is structured so that other self.drivers can subclass this 
        test case to ensure compiliance with the DB-API. It is 
        expected that this TestCase may be expanded in the future
        if ambiguities or edge conditions are discovered.

        The 'Optional Extensions' are not yet being tested.

        self.drivers should subclass this test, overriding setUp, tearDown,
        self.driver, connect_args and connect_kw_args. Class specification
        should be as follows:
        import dbapi20
        class mytest(dbapi20.test_DBAPI20,unittest.TestCase):
           [...] 
    '''

    # The self.driver module. This should be the module where the 'connect'
    # method is to be found
    driver = None
    connect_args = () # List of arguments to pass to connect
    connect_kw_args = {} # Keyword arguments for connect

    ddl1 = 'create table booze (name varchar)'
    ddl2 = 'create table barflys (name varchar)'
    xddl1 = 'drop table booze'
    xddl2 = 'drop table barflys'
        
    def setUp(self):
        ''' self.drivers should override this method to perform required setup
            if any is necessary, such as creating the database.
        '''
        pass

    def tearDown(self):
        ''' self.drivers should override this method to perform required cleanup
            if any is necessary, such as deleting the dest database.
            The default drops the tables that may be created.
        '''
        con = self._connect()
        try:
            cur = con.cursor()
            for ddl in (self.xddl1,self.xddl2):
                try: 
                    cur.execute(ddl)
                except self.driver.Error: 
                    # Assume table didn't exist. Other tests will check if
                    # execute is busted.
                    pass
        finally:
            con.close()

    def _connect(self):
        try:
            return self.driver.connect(
                *self.connect_args,**self.connect_kw_args
                )
        except AttributeError:
            self.fail("No connect method found in self.driver module")

    def test_connect(self):
        con = self._connect()
        con.close()

    def test_apilevel(self):
        try:
            # Must exist
            apilevel = self.driver.apilevel
            # Must equal 2.0
            self.assertEqual(apilevel,'2.0')
        except AttributeError:
            self.fail("Driver doesn't define apilevel")

    def test_threadsafety(self):
        try:
            # Must exist
            threadsafety = self.driver.threadsafety
            # Must be a valid value
            self.failUnless(threadsafety in (0,1,2,3))
        except AttributeError:
            self.fail("Driver doesn't define threadsafety")

    def test_paramstyle(self):
        try:
            # Must exist
            paramstyle = self.driver.paramstyle
            # Must be a valid value
            self.failUnless(paramstyle in (
                'qmark','numeric','named','format','pyformat'
                ))
        except AttributeError:
            self.fail("Driver doesn't define paramstyle")

    def test_Exceptions(self):
        # Make sure required exceptions exist, and are in the
        # defined heirarchy.
        self.failUnless(issubclass(self.driver.Warning,StandardError))
        self.failUnless(issubclass(self.driver.Error,StandardError))
        self.failUnless(issubclass(self.driver.InterfaceError,StandardError))
        self.failUnless(issubclass(self.driver.DatabaseError,StandardError))
        self.failUnless(issubclass(self.driver.OperationalError,StandardError))
        self.failUnless(issubclass(self.driver.IntegrityError,StandardError))
        self.failUnless(issubclass(self.driver.InternalError,StandardError))
        self.failUnless(issubclass(self.driver.ProgrammingError,StandardError))
        self.failUnless(issubclass(self.driver.NotSupportedError,StandardError))

    def test_commit(self):
        con = self._connect()
        try:
            # Commit must work, even if it doesn't do anything
            con.commit()
        finally:
            con.close()

    def test_rollback(self):
        con = self._connect()
        # If rollback is defined, it should either work or throw
        # the documented exception
        if hasattr(con,'rollback'):
            try:
                con.rollback()
            except NotSupportedError:
                pass
    
    def test_cursor(self):
        con = self._connect()
        try:
            cur = con.cursor()
        finally:
            con.close()

    def test_cursor_isolation(self):
        con = self._connect()
        try:
            # Make sure cursors created from the same connection have
            # the documented transaction isolation level
            cur1 = con.cursor()
            cur2 = con.cursor()
            cur1.execute(self.ddl1)
            cur1.execute("insert into booze values ('Victoria Bitter')")
            cur2.execute("select name from booze")
            booze = cur2.fetchall()
            self.assertEqual(len(booze),1)
            self.assertEqual(len(booze[0]),1)
            self.assertEqual(booze[0][0],'Victoria Bitter')
        finally:
            con.close()

    def test_description(self):
        con = self._connect()
        try:
            cur = con.cursor()
            cur.execute(self.ddl1)
            self.assertEqual(cur.description,None,
                'cursor.description should be none after executing a '
                'statement that can return no rows (such as DDL)'
                )
            cur.execute('select name from booze')
            self.assertEqual(len(cur.description),1,
                'cursor.description describes too many columns'
                )
            self.assertEqual(len(cur.description[0]),7,
                'cursor.description[x] tuples must have 7 elements'
                )
            self.assertEqual(cur.description[0][0].lower(),'name',
                'cursor.description[x][0] must return column name'
                )
            self.assertEqual(self.driver.STRING,cur.description[0][1],
                'cursor.description[x][1] must return column type. Got %r'
                    % cur.description[0][1]
                )

            # Make sure self.description gets reset
            cur.execute(self.ddl2)
            self.assertEqual(cur.description,None,
                'cursor.description not being set to None when executing '
                'no-result statements (eg. DDL)'
                )
        finally:
            con.close()

    def test_rowcount(self):
        con = self._connect()
        try:
            cur = con.cursor()
            cur.execute(self.ddl1)
            self.assertEqual(cur.rowcount,-1,
                'cursor.rowcount should be -1 after executing no-result '
                'statements'
                )
            cur.execute("insert into booze values ('Victoria Bitter')")
            self.failUnless(cur.rowcount in (-1,1),
                'cursor.rowcount should == number or rows inserted, or '
                'set to -1 after executing an insert statement'
                )
            cur.execute("select name from booze")
            self.failUnless(cur.rowcount in (-1,1),
                'cursor.rowcount should == number of rows returned, or '
                'set to -1 after executing a select statement'
                )
            cur.execute(self.ddl2)
            self.assertEqual(cur.rowcount,-1,
                'cursor.rowcount not being reset to -1 after executing '
                'no-result statements'
                )
        finally:
            con.close()

    def test_callproc(self):
        # Cannot write a generic test. self.driver's implementations of this
        # TestCase should implement this test. self.drivers that don't support
        # stored procedures should just 'pass'
        raise NotImplementedError,'Driver testcase should override this test'

    def test_close(self):
        con = self._connect()
        try:
            cur = con.cursor()
        finally:
            con.close()

        # cursor.execute should raise an Error if called after connection
        # closed
        self.assertRaises(self.driver.Error,cur.execute,self.ddl1)

        # connection.commit should raise an Error if called after connection'
        # closed.'
        self.assertRaises(self.driver.Error,con.commit)

        # connection.close should raise an Error if called more than once
        self.assertRaises(self.driver.Error,con.close)

    def test_execute(self):
        con = self._connect()
        try:
            cur = con.cursor()
            self._paraminsert(cur)
        finally:
            con.close()

    def _paraminsert(self,cur):
        cur.execute(self.ddl1)
        cur.execute("insert into booze values ('Victoria Bitter')")
        if self.driver.paramstyle == 'qmark':
            cur.execute('insert into booze values (?)',("Cooper's",))
        elif self.driver.paramstyle == 'numeric':
            cur.execute('insert into booze values (:1)',("Cooper's",))
        elif self.driver.paramstyle == 'named':
            cur.execute(
                'insert into booze values (:beer)', {'beer':"Cooper's"}
                )
        elif self.driver.paramstyle == 'format':
            cur.execute('insert into booze values (%s)',("Cooper's",))
        elif self.driver.paramstyle == 'pyformat':
            cur.execute(
                'insert into booze values (%(beer)s)',{'beer':"Cooper's"}
                )
        else:
            self.fail('Invalid paramstyle')
        cur.execute('select name from booze')
        res = cur.fetchall()
        self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
        beers = [res[0][0],res[1][0]]
        beers.sort()
        self.assertEqual(beers[0],"Cooper's",
            'cursor.fetchall retrieved invalid data, or data inserted '
            'incorrectly'
            )
        self.assertEqual(beers[1],"Victoria Bitter",
            'cursor.fetchall retrieved invalid data, or data inserted '
            'incorrectly'
            )

    def test_executemany(self):
        con = self._connect()
        try:
            cur = con.cursor()
            cur.execute(self.ddl1)
            largs = [ ("Cooper's",) , ("Boag's",) ]
            margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
            if self.driver.paramstyle == 'qmark':
                cur.executemany('insert into booze values (?)',largs)
            elif self.driver.paramstyle == 'numeric':
                cur.executemany('insert into booze values (:1)',largs)
            elif self.driver.paramstyle == 'named':
                cur.executemany('insert into booze values (:beer)',margs)
            elif self.driver.paramstyle == 'format':
                cur.executemany('insert into booze values (%s)',largs)
            elif self.driver.paramstyle == 'pyformat':
                cur.executemany('insert into booze values (%(beer)s)',margs)
            else:
                self.fail('Unknown paramstyle')
            cur.execute('select name from booze')
            res = cur.fetchall()
            self.assertEqual(len(res),2,
                'cursor.fetchall retrieved incorrect number of rows'
                )
            beers = [res[0][0],res[1][0]]
            beers.sort()
            self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
            self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
        finally:
            con.close()

    def test_fetchone(self):
        con = self._connect()
        try:
            cur = con.cursor()

            # cursor.fetchone should raise an Error if called before
            # executing a select-type query
            self.assertRaises(self.driver.Error,cur.fetchone)

            # cursor.fetchone should raise an Error if called after
            # executing a query that cannnot return rows
            cur.execute(self.ddl1)
            self.assertRaises(self.driver.Error,cur.fetchone)

            # cursor.fetchone should raise an Error if called after
            # executing a query that cannnot return rows
            cur.execute("insert into booze values ('Victoria Bitter')")
            self.assertRaises(self.driver.Error,cur.fetchone)

            cur.execute('select name from booze')
            r = cur.fetchone()
            self.assertEqual(len(r),1,
                'cursor.fetchone should have retrieved a single row'
                )
            self.assertEqual(r[0],'Victoria Bitter',
                'cursor.fetchone retrieved incorrect data'
                )
            self.assertEqual(cur.fetchone(),None,
                'cursor.fetchone should return None if no more rows available'
                )
        finally:
            con.close()

    samples = [
        'Carlton Cold',
        'Carlton Draft',
        'Mountain Goat',
        'Redback',
        'Victoria Bitter',
        'XXXX'
        ]

    populate = [ddl1,] +  \
        ["insert into booze values ('%s')" % s for s in samples]

    def test_fetchmany(self):
        con = self._connect()
        try:
            cur = con.cursor()

            # cursor.fetchmany should raise an Error if called without
            #issuing a query
            self.assertRaises(self.driver.Error,cur.fetchmany,4)

            for sql in self.populate:
                cur.execute(sql)

            cur.arraysize=10
            cur.execute('select name from booze')
            r = cur.fetchmany(4) # Should get 4 rows
            self.assertEqual(len(r),4,
                'cursor.fetchmany retrieved incorrect number of rows'
                )
            r = cur.fetchmany(4) # Should get 2 more
            self.assertEqual(len(r),2,
                'cursor.fetchmany retrieved incorrect number of rows'
                )
            r = cur.fetchmany(4) # Should be an empty sequence
            self.assertEqual(len(r),0,
                'cursor.fetchmany should return an empty sequence after '
                'results are exhausted'
            )

            # Same as above, using cursor.arraysize
            cur.arraysize=4
            cur.execute('select name from booze')
            r = cur.fetchmany() # Should get 4 rows
            self.assertEqual(len(r),4,
                'cursor.arraysize not being honoured by fetchmany'
                )
            r = cur.fetchmany() # Should get 2 more
            self.assertEqual(len(r),2)
            r = cur.fetchmany() # Should be an empty sequence
            self.assertEqual(len(r),0)

            cur.arraysize=6
            cur.execute('select name from booze')
            rows = cur.fetchmany() # Should get all rows
            self.assertEqual(len(rows),6)
            self.assertEqual(len(rows),6)
            rows = [r[0] for r in rows]
            rows.sort()
          
            # Make sure we get the right data back out
            for i in range(0,6):
                self.assertEqual(rows[i],self.samples[i],
                    'incorrect data retrieved by cursor.fetchmany'
                    )

            cur.execute(self.ddl2)
            cur.execute('select name from barflys')
            r = cur.fetchmany() # Should get empty sequence
            self.assertEqual(len(r),0,
                'cursor.fetchmany should return an empty sequence if '
                'query retrieved no rows'
                )

        finally:
            con.close()

    def test_fetchall(self):
        con = self._connect()
        try:
            cur = con.cursor()
            # cursor.fetchall should raise an Error if called
            # without executing a query that may return rows (such
            # as a select)
            self.assertRaises(self.driver.Error, cur.fetchall)

            for sql in self.populate:
                cur.execute(sql)

            # cursor.fetchall should raise an Error if called
            # after executing a a statement that cannot return rows
            self.assertRaises(self.driver.Error,cur.fetchall)

            cur.execute('select name from booze')
            rows = cur.fetchall()
            self.assertEqual(len(rows),len(self.samples),
                'cursor.fetchall did not retrieve all rows'
                )
            rows = [r[0] for r in rows]
            rows.sort()
            for i in range(0,len(self.samples)):
                self.assertEqual(rows[i],self.samples[i],
                'cursor.fetchall retrieved incorrect rows'
                )

            cur.execute(self.ddl2)
            cur.execute('select name from barflys')
            rows = cur.fetchall()
            self.assertEqual(len(rows),0,
                'cursor.fetchall should return an empty list if '
                'a select query returns no rows'
                )
            
        finally:
            con.close()
    
    def test_mixedfetch(self):
        con = self._connect()
        try:
            cur = con.cursor()
            for sql in self.populate:
                cur.execute(sql)

            cur.execute('select name from booze')
            rows1  = cur.fetchone()
            rows23 = cur.fetchmany(2)
            rows4  = cur.fetchone()
            rows56 = cur.fetchall()
            self.assertEqual(len(rows23),2,
                'fetchmany returned incorrect number of rows'
                )
            self.assertEqual(len(rows56),2,
                'fetchall returned incorrect number of rows'
                )

            rows = [rows1[0]]
            rows.extend([rows23[0][0],rows23[1][0]])
            rows.append(rows4[0])
            rows.extend([rows56[0][0],rows56[1][0]])
            rows.sort()
            for i in range(0,len(self.samples)):
                self.assertEqual(rows[i],self.samples[i],
                    'incorrect data retrieved or inserted'
                    )
        finally:
            con.close()

    def test_nextset(self):
        raise NotImplementedError,'Drivers need to override this test'

    def test_arraysize(self):
        # Not much here - rest of the tests for this are in test_fetchmany
        con = self._connect()
        try:
            cur = con.cursor()
            self.failUnless(hasattr(cur,'arraysize'),
                'cursor.arraysize must be defined'
                )
        finally:
            con.close()

    def test_setinputsizes(self):
        con = self._connect()
        try:
            cur = con.cursor()
            cur.execute(self.ddl1)
            cur.setinputsizes( (25,) )
            self._paraminsert(cur)
        finally:
            con.close()

    def test_setoutputsize_basic(self):
        # Basic test is to make sure it doesn't blow up
        con = self._connect()
        try:
            cur = con.cursor()
            cur.execute(self.ddl1)
            cur.setoutputsize(1000)
            cur.setoutputsize(2000,0)
            self._paraminsert(cur)
        finally:
            con.close()

    def test_setoutputsize(self):
        # Real test for setoutputsize is driver dependant
        raise NotImplementedError,'Driver need to override this test'

    def test_Date(self):
        d1 = self.driver.Date(2002,12,25)
        d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
        # Can we assume this? API doesn't specify, but it seems implied
        # self.assertEqual(str(d1),str(d2))

    def test_Time(self):
        t1 = self.driver.Time(13,45,30)
        t2 = self.driver.TimeFromTicks(time.mktime((0,0,0,13,45,30,0,0,0)))
        # Can we assume this? API doesn't specify, but it seems implied
        # self.assertEqual(str(t1),str(t2))

    def test_Timestamp(self):
        t1 = self.driver.Timestamp(2002,12,25,13,45,30)
        t2 = self.driver.TimestampFromTicks(
            time.mktime((2002,12,25,13,45,30,0,0,0))
            )
        # Can we assume this? API doesn't specify, but it seems implied
        # self.assertEqual(str(t1),str(t2))

    def test_Binary(self):
        b = self.driver.Binary('Something')
        b = self.driver.Binary('')

    def test_STRING(self):
        self.failUnless(hasattr(self.driver,'STRING'),
            'module.STRING must be defined'
            )

    def test_BINARY(self):
        self.failUnless(hasattr(self.driver,'BINARY'),
            'module.BINARY must be defined.'
            )

    def test_NUMBER(self):
        self.failUnless(hasattr(self.driver,'NUMBER'),
            'module.NUMBER must be defined.'
            )

    def test_DATETIME(self):
        self.failUnless(hasattr(self.driver,'DATETIME'),
            'module.DATETIME must be defined.'
            )

    def test_ROWID(self):
        self.failUnless(hasattr(self.driver,'ROWID'),
            'module.ROWID must be defined.'
            )



=== Products/DCOracle2/test/procdef.sql 1.15 => 1.16 ===
--- Products/DCOracle2/test/procdef.sql:1.15	Wed Jun 12 14:46:37 2002
+++ Products/DCOracle2/test/procdef.sql	Wed Sep 24 11:51:03 2003
@@ -10,6 +10,16 @@
 /
 show errors;
 
+create or replace procedure TESTINSERT (
+		I_NAME in varchar2,
+		I_ID in integer 
+	) as 
+	begin 
+		insert into TEST (NAME, ID) values (I_NAME, I_ID);
+	end TESTINSERT;
+/
+show errors;
+
 create or replace procedure FINDDEF (
 		EMPID in integer default 7902,
 		EMPNAME out varchar2




More information about the Zope-Checkins mailing list