[Zope-CVS] CVS: Products/Sessions/tests - testBrowserIdManager.py:1.1 testSessionDataManager.py:1.1

Matthew T. Kromer matt@zope.com
Thu, 8 Nov 2001 16:22:08 -0500


Update of /cvs-repository/Products/Sessions/tests
In directory cvs.zope.org:/tmp/cvs-serv29032

Added Files:
	testBrowserIdManager.py testSessionDataManager.py 
Log Message:
Added Session test cases


=== Added File Products/Sessions/tests/testBrowserIdManager.py ===
##############################################################################
# 
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
# 
# Copyright (c) Digital Creations.  All rights reserved.
# 
# This license has been certified as Open Source(tm).
# 
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# 
# 1. Redistributions in source code must retain the above copyright
#    notice, this list of conditions, and the following disclaimer.
# 
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions, and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
# 
# 3. Digital Creations requests that attribution be given to Zope
#    in any manner possible. Zope includes a "Powered by Zope"
#    button that is installed by default. While it is not a license
#    violation to remove this button, it is requested that the
#    attribution remain. A significant investment has been put
#    into Zope, and this effort will continue if the Zope community
#    continues to grow. This is one way to assure that growth.
# 
# 4. All advertising materials and documentation mentioning
#    features derived from or use of this software must display
#    the following acknowledgement:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    In the event that the product being advertised includes an
#    intact Zope distribution (with copyright and license included)
#    then this clause is waived.
# 
# 5. Names associated with Zope or Digital Creations must not be used to
#    endorse or promote products derived from this software without
#    prior written permission from Digital Creations.
# 
# 6. Modified redistributions of any form whatsoever must retain
#    the following acknowledgment:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    Intact (re-)distributions of any official Zope release do not
#    require an external acknowledgement.
# 
# 7. Modifications are encouraged but must be packaged separately as
#    patches to official Zope releases.  Distributions that do not
#    clearly separate the patches from the original work must be clearly
#    labeled as unofficial distributions.  Modifications which do not
#    carry the name Zope may be packaged in any form, as long as they
#    conform to all of the clauses above.
# 
# 
# Disclaimer
# 
#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
#   SUCH DAMAGE.
# 
# 
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations.  Specific
# attributions are listed in the accompanying credits file.
# 
##############################################################################
"""
Test suite for session id manager.

$Id: testBrowserIdManager.py,v 1.1 2001/11/08 21:22:07 matt Exp $
"""
__version__ = "$Revision: 1.1 $"[11:-2]

import sys
if __name__ == "__main__":
    sys.path.insert(0, '../../..')
    sys.path.insert(0, '..')
import ZODB
from Products.Sessions.BrowserIdManager import BrowserIdManager, BrowserIdManagerErr
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from sys import stdin
from os import environ

class TestBrowserIdManager(TestCase):
    def setUp(self):
        self.m = BrowserIdManager('foo')
        resp = HTTPResponse()
        environ['SERVER_NAME']='fred'
        environ['SERVER_PORT']='80'
        req = HTTPRequest(stdin, environ, resp)
        self.m.REQUEST = req
        
    def tearDown(self):
        del self.m

    def testSetTokenKey(self):
        self.m.setTokenKey('foo')
        assert self.m.getTokenKey()== 'foo'

    def testSetBadKeyString(self):
        try:
            self.m.setTokenKey('')
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2
        try:
            self.m.setTokenKey(1)
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2
            
    def testSetBadNamespaces(self):
        d = {1:'gummy', 2:'froopy'}
        try:
            self.m.setTokenKeyNamespaces(d)
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2
            
    def testSetGoodNamespaces(self):
        d = {1:'cookies', 2:'form'}
        self.m.setTokenKeyNamespaces(d)
        assert self.m.getTokenKeyNamespaces() == d

    def testSetBadCookiePath(self):
        path = '/;'
        try:
            self.m.setCookiePath(path)
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2

    def testSetGoodCookiePath(self):
        self.m.setCookiePath('/foo')
        assert self.m.getCookiePath() == '/foo'

    def testSetBadCookieLifeDays(self):
        life = ''
        try:
            self.m.setCookieLifeDays('')
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2

    def testSetGoodCookieLifeDays(self):
        self.m.setCookieLifeDays(1)
        assert self.m.getCookieLifeDays() == 1

    def testSetBadCookieDomain(self):
        life = ''
        try:
            self.m.setCookieDomain('gubble')
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2

    def testSetGoodCookieLifeDays(self):
        self.m.setCookieLifeDays(1)
        assert self.m.getCookieLifeDays() == 1

    def testSetNoCookieDomain(self):
        domain = ''
        self.m.setCookieDomain(domain)
        setdomain = self.m.getCookieDomain()
        assert setdomain == domain, "%s" % setdomain

    def testSetBadCookieDomain(self):
        domain = 'zope.org' # not enough dots
        try:
            self.m.setCookieDomain(domain)
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2

        domain = {1:1} # must be stringtype
        try:
            self.m.setCookieDomain(domain)
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2
            
        domain = '.zope.org;' # semicolon follows
        try:
            self.m.setCookieDomain(domain)
        except BrowserIdManagerErr:
            pass
        else:
            assert 1 == 2

    def testSetGoodCookieDomain(self):
        self.m.setCookieDomain('.zope.org')
        setdomain = self.m.getCookieDomain()
        assert setdomain == '.zope.org', "%s" % setdomain
        
    def testSetCookieSecure(self):
        self.m.setCookieSecure(1)
        assert self.m.getCookieSecure() == 1

    def testDelegateToParent(self):
        self.m.turnOff()
        try:
            a = self.m.hasToken()
        except BrowserIdManagerErr:
            pass
        else:
            assert 1==2

    def testGetTokenCookie(self):
        token = self.m.getToken()
        self.m.REQUEST.browser_token_ = token
        self.m.REQUEST.browser_token_ns_ = 'cookies'
        tokenkey = self.m.getTokenKey()
        self.m.REQUEST.cookies[tokenkey] = token
        a = self.m.getToken()
        assert a == token, repr(a)
        assert self.m.isTokenFromCookie()

    def testSetSessionTokenDontCreate(self):
        a = self.m.getToken(0)
        assert a == None

    def testSetSessionTokenCreate(self):
        a = self.m.getToken(1)
        tokenkey = self.m.getTokenKey()
        b = self.m.REQUEST.RESPONSE.cookies[tokenkey]
        assert a == b['value'], (a, b)

    def testHasToken(self):
        assert not self.m.hasToken()
        a = self.m.getToken()
        assert self.m.hasToken()
        
    def testTokenIsNew(self):
        a = self.m.getToken()
        assert self.m.isTokenNew()

    def testIsTokenFromCookieFirst(self):
        token = self.m.getToken()
        self.m.REQUEST.browser_token_ = token
        self.m.REQUEST.browser_token_ns_ = 'cookies'
        tokenkey = self.m.getTokenKey()
        self.m.REQUEST.cookies[tokenkey] = token
        self.m.setTokenKeyNamespaces({1:'cookies', 2:'form'})
        a = self.m.getToken()
        assert self.m.isTokenFromCookie()

    def testIsTokenFromFormFirst(self):
        token = self.m.getToken()
        self.m.REQUEST.browser_token_ = token
        self.m.REQUEST.browser_token_ns_ = 'form'
        tokenkey = self.m.getTokenKey()
        self.m.REQUEST.form[tokenkey] = token
        self.m.setTokenKeyNamespaces({1:'form', 2:'cookies'})
        a = self.m.getToken()
        assert self.m.isTokenFromForm()

    def testIsTokenFromCookieOnly(self):
        token = self.m.getToken()
        self.m.REQUEST.browser_token_ = token
        self.m.REQUEST.browser_token_ns_ = 'cookies'
        tokenkey = self.m.getTokenKey()
        self.m.REQUEST.cookies[tokenkey] = token
        self.m.setTokenKeyNamespaces({1:'cookies'})
        a = self.m.getToken()
        assert self.m.isTokenFromCookie()
        assert not self.m.isTokenFromForm()

    def testIsTokenFromFormOnly(self):
        token = self.m.getToken()
        self.m.REQUEST.browser_token_ = token
        self.m.REQUEST.browser_token_ns_ = 'form'
        tokenkey = self.m.getTokenKey()
        self.m.REQUEST.form[tokenkey] = token
        self.m.setTokenKeyNamespaces({1:'form'})
        a = self.m.getToken()
        assert self.m.isTokenFromForm()
        assert not self.m.isTokenFromCookie()

    def testFlushTokenCookie(self):
        token = self.m.getToken()
        self.m.REQUEST.browser_token_ = token
        self.m.REQUEST.browser_token_ns_ = 'cookies'
        tokenkey = self.m.getTokenKey()
        self.m.REQUEST.cookies[tokenkey] = token
        a = self.m.getToken()
        assert a == token, repr(a)
        assert self.m.isTokenFromCookie()
        self.m.flushTokenCookie()
        c = self.m.REQUEST.RESPONSE.cookies[tokenkey]
        assert c['value'] == 'deleted', c
        
    def testDelegateToParentFail(self):
        self.m.turnOff()
        try:
            self.m.getToken()
        except BrowserIdManagerErr:
            pass
        else:
            assert 1==2

    def testDelegateToParentSucceed(self):
        self.m.turnOff()
        class foo:
            pass
        class bar:
            def getToken(unself, create=1):
                return 'worked'
        fooi = foo()
        bari = bar()
        setattr(fooi, self.m.id, bari)
        self.m.aq_parent = fooi
        assert self.m.getToken() == 'worked'

    def testEncodeUrl(self):
        keystring = self.m.getTokenKey()
        key = self.m.getToken()
        u = '/home/chrism/foo'
        r = self.m.encodeUrl(u)
        assert r == '%s?%s=%s' % (u, keystring, key)
        u = 'http://www.zope.org/Members/mcdonc?foo=bar&spam=eggs'
        r = self.m.encodeUrl(u)
        assert r == '%s&%s=%s' % (u, keystring, key)
            
        
if __name__ == '__main__':
    testsuite = makeSuite(TestBrowserIdManager, 'test')
    runner = TextTestRunner()
    runner.run(testsuite)


=== Added File Products/Sessions/tests/testSessionDataManager.py ===
##############################################################################
# 
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
# 
# Copyright (c) Digital Creations.  All rights reserved.
# 
# This license has been certified as Open Source(tm).
# 
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# 
# 1. Redistributions in source code must retain the above copyright
#    notice, this list of conditions, and the following disclaimer.
# 
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions, and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
# 
# 3. Digital Creations requests that attribution be given to Zope
#    in any manner possible. Zope includes a "Powered by Zope"
#    button that is installed by default. While it is not a license
#    violation to remove this button, it is requested that the
#    attribution remain. A significant investment has been put
#    into Zope, and this effort will continue if the Zope community
#    continues to grow. This is one way to assure that growth.
# 
# 4. All advertising materials and documentation mentioning
#    features derived from or use of this software must display
#    the following acknowledgement:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    In the event that the product being advertised includes an
#    intact Zope distribution (with copyright and license included)
#    then this clause is waived.
# 
# 5. Names associated with Zope or Digital Creations must not be used to
#    endorse or promote products derived from this software without
#    prior written permission from Digital Creations.
# 
# 6. Modified redistributions of any form whatsoever must retain
#    the following acknowledgment:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    Intact (re-)distributions of any official Zope release do not
#    require an external acknowledgement.
# 
# 7. Modifications are encouraged but must be packaged separately as
#    patches to official Zope releases.  Distributions that do not
#    clearly separate the patches from the original work must be clearly
#    labeled as unofficial distributions.  Modifications which do not
#    carry the name Zope may be packaged in any form, as long as they
#    conform to all of the clauses above.
# 
# 
# Disclaimer
# 
#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
#   SUCH DAMAGE.
# 
# 
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations.  Specific
# attributions are listed in the accompanying credits file.
# 
##############################################################################
import sys, os, time
if __name__ == "__main__":
    sys.path.insert(0, '../../..')
    #os.chdir('../../..')
from Testing import makerequest
import ZODB # in order to get Persistence.Persistent working
import Acquisition
from Acquisition import aq_base
from Products.Sessions.BrowserIdManager import BrowserIdManager
from Products.Sessions.SessionDataManager import \
    SessionDataManager, SessionDataManagerErr
from Products.Transience.Transience import \
    TransientObjectContainer, TransientObject
from ZODB.POSException import InvalidObjectReference
from DateTime import DateTime
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
import time, threading, whrandom
from cPickle import UnpickleableError

idmgr_name = 'browser_id_manager'
toc_name = 'temp_transient_container'

def f(sdo):
    pass

class Foo(Acquisition.Implicit): pass

class TestBase(TestCase):
    def setUp(self):
        import Zope
        self.app = makerequest.makerequest(Zope.app())
        del Zope
        timeout = self.timeout = 1
        #bidmgr = BrowserIdManager(idmgr_name)
        #toc = TransientObjectContainer(tocname, title='Temporary '
        #    'Transient Object Container', timeout_mins=20)
        #session_data_manager=SessionDataManager(id='session_data_manager', path='/'+toc_name, title='SessionThing')

        #try: self.app._delObject(idmgr_name)
        #except AttributeError: pass

        #try: self.app._delObject(toc_name)
        #except AttributeError: pass
        
        #try: self.app._delObject('session_data_manager')
        #except AttributeError: pass

        #self.app._setObject(idmgr_name, bidmgr)
        #self.app._setObject(toc_name, toc)
        #self.app._setObject('session_data_manager', session_data_manager)

        # leans on the fact that these things exist by app init


##         admin = self.app.acl_users.getUser('admin')
##         if admin is None:
##             raise "Need to define an 'admin' user before running these tests"
##         admin = admin.__of__(self.app.acl_users)
##         self.app.session_data_manager.changeOwnership(admin)

    def tearDown(self):
        get_transaction().abort()
        self.app._p_jar.close()
        self.app = None
        del self.app

class TestSessionManager(TestBase):
    def testHasId(self):
        assert self.app.session_data_manager.id == 'session_data_manager'

    def testHasTitle(self):
        assert self.app.session_data_manager.title == 'Session Data Manager'

    def testGetSessionDataNoCreate(self):
        sd = self.app.session_data_manager.getSessionData(0)
        assert sd is None, repr(sd)

    def testGetSessionDataCreate(self):
        sd = self.app.session_data_manager.getSessionData(1)
        assert sd.__class__ is TransientObject, repr(sd)

    def testHasSessionData(self):
        sd = self.app.session_data_manager.getSessionData()
        assert self.app.session_data_manager.hasSessionData()

    def testNewSessionDataObjectIsValid(self):
        sdType = type(TransientObject(1))
        sd = self.app.session_data_manager.getSessionData()
        assert type(getattr(sd, 'aq_base', sd)) is sdType
        assert not hasattr(sd, '_invalid')

    def testInvalidateSessionDataObject(self):
        sd = self.app.session_data_manager.getSessionData()
        sd.invalidate()
        assert hasattr(sd, '_invalid')
        
    def testSessionTokenIsSet(self):
        sd = self.app.session_data_manager.getSessionData()
        mgr = getattr(self.app, idmgr_name)
        assert mgr.hasToken()

    def testGetSessionDataByKey(self):
        sd = self.app.session_data_manager.getSessionData()
        mgr = getattr(self.app, idmgr_name)
        token = mgr.getToken()
        bykeysd = self.app.session_data_manager.getSessionDataByKey(token)
        assert sd == bykeysd, (sd, bykeysd, token)

    def testBadExternalSDCPath(self):
        self.app.session_data_manager.REQUEST['REQUEST_METHOD'] = 'GET' # fake out webdav
        self.app.session_data_manager.setContainerPath('/fudgeffoloo')
        try:
            self.app.session_data_manager.getSessionData()
        except SessionDataManagerErr:
            pass
        else:
            assert 1 == 2, self.app.session_data_manager.getSessionDataContainerPath()

    def testInvalidateSessionDataObject(self):
        sd = self.app.session_data_manager.getSessionData()
        sd['test'] = 'Its alive!  Alive!'
        sd.invalidate()
        assert not self.app.session_data_manager.getSessionData().has_key('test')

    def testGhostUnghostSessionManager(self):
        get_transaction().commit()
        sd = self.app.session_data_manager.getSessionData()
        sd.set('foo', 'bar')
        self.app.session_data_manager._p_changed = None
        get_transaction().commit()
        assert self.app.session_data_manager.getSessionData().get('foo') == 'bar'

    def testSubcommit(self):
        sd = self.app.session_data_manager.getSessionData()
        sd.set('foo', 'bar')
        assert get_transaction().commit(1) == None
        
    def testForeignObject(self):
        self.assertRaises(InvalidObjectReference, self._foreignAdd)

    def _foreignAdd(self):
        ob = self.app.session_data_manager
        sd = self.app.session_data_manager.getSessionData()
        sd.set('foo', ob)
        get_transaction().commit()

    def testAqWrappedObjectsFail(self):
        a = Foo()
        b = Foo()
        aq_wrapped = a.__of__(b)
        sd = self.app.session_data_manager.getSessionData()
        sd.set('foo', aq_wrapped)
        self.assertRaises(UnpickleableError, get_transaction().commit)

class TestMultiThread(TestBase):
    def testNonOverlappingSids(self):
        readers = []
        writers = []
        readiters = 20
        writeiters = 5
        readout = []
        writeout = []
        numreaders = 20
        numwriters = 5
        rlock = threading.Lock()
        session_data_managername = 'session_data_manager'
        for i in range(numreaders):
            mgr = getattr(self.app, idmgr_name)
            sid = mgr._getNewToken()
            app = aq_base(self.app)
            thread = ReaderThread(sid, app, readiters, session_data_managername)
            readers.append(thread)
        for i in range(numwriters):
            mgr = getattr(self.app, idmgr_name)
            sid = mgr._getNewToken()
            app = aq_base(self.app)
            thread = WriterThread(sid, app, readiters, session_data_managername)
            writers.append(thread)
        for thread in readers:
            thread.start()
            time.sleep(0.1)
        for thread in writers:
            thread.start()
            time.sleep(0.1)
        while threading.activeCount() > 1:
            time.sleep(1)
        
        for thread in readers:
            assert thread.out == [], thread.out

    def testOverlappingSids(self):
        readers = []
        writers = []
        readiters = 20
        writeiters = 5
        readout = []
        writeout = []
        numreaders = 20
        numwriters = 5
        rlock = threading.Lock()
        session_data_managername = 'session_data_manager'
        mgr = getattr(self.app, idmgr_name)
        sid = mgr._getNewToken()
        for i in range(numreaders):
            app = aq_base(self.app)
            thread = ReaderThread(sid, app, readiters, session_data_managername)
            readers.append(thread)
        for i in range(numwriters):
            app = aq_base(self.app)
            thread = WriterThread(sid, app, readiters, session_data_managername)
            writers.append(thread)
        for thread in readers:
            thread.start()
            time.sleep(0.1)
        for thread in writers:
            thread.start()
            time.sleep(0.1)
        while threading.activeCount() > 1:
            time.sleep(1)
        
        for thread in readers:
            assert thread.out == [], thread.out

class ReaderThread(threading.Thread):
    def __init__(self, sid, app, iters, session_data_managername):
        self.sid = sid
        self.app = app
        self.iters = iters
        self.session_data_managername = session_data_managername
        self.out = []
        threading.Thread.__init__(self)

    def run(self):
        self.app = makerequest.makerequest(self.app)
        self.app.REQUEST.session_token_ = self.sid
        session_data_manager = getattr(self.app, self.session_data_managername)
        data = session_data_manager.getSessionData(create=1)
        t = time.time()
        data[t] = 1
        get_transaction().commit()
        for i in range(self.iters):
            data = session_data_manager.getSessionData()
            if not data.has_key(t): self.out.append(1)
            time.sleep(whrandom.choice(range(3)))
            get_transaction().commit()
            
class WriterThread(threading.Thread):
    def __init__(self, sid, app, iters, session_data_managername):
        self.sid = sid
        self.app = app
        self.iters = iters
        self.session_data_managername = session_data_managername
        threading.Thread.__init__(self)

    def run(self):
        self.app = makerequest.makerequest(self.app)
        self.app.REQUEST.session_token_ = self.sid
        session_data_manager = getattr(self.app, self.session_data_managername)
        for i in range(self.iters):
            data = session_data_manager.getSessionData()
            data[time.time()] = 1
            n = whrandom.choice(range(8))
            time.sleep(n)
            if n % 2 == 0:
                get_transaction().commit()
            else:
                get_transaction().abort()
                
def test_suite():
    test_datamgr = makeSuite(TestSessionManager, 'test')
    test_multithread = makeSuite(TestMultiThread, 'test')
    suite = TestSuite((test_datamgr, test_multithread))
    return suite

if __name__ == '__main__':
    runner = TextTestRunner(verbosity=9, descriptions=9)
    runner.run(test_suite())