[ZODB-Dev] Deadlock problem in ZODB (Not in Zope)
John D. Heintz
jheintz@isogen.com
Thu, 19 Apr 2001 11:20:58 -0500
This is a multi-part message in MIME format.
--------------020807060705000402050001
Content-Type: text/plain; charset=us-ascii; format=flowed
Content-Transfer-Encoding: 7bit
Hello all,
I have just now gotten some stack traces for a lock up problem our CORBA
ZODB application has been having.
<Python Rant>
Python makes it very hard to diagnose multi-threaded problems! No
multi-threaded debugger, not thread stack dumper!
</Python Rant>
I've attached the code I used to get this info in case it might help
anyone else. Every CORBA request calls tracer.start_trace() and
tracer.end_trace() around the actual processing. I ran my server in
interactive Python mode and waited for the lock up, then imported this
module and poked at the frames I found there.
I am going to start inspecting the code to see how this deadlock can
happen, but would sure like help. Oh, this output was done using ZEO,
but we were getting the same problem with a local FileStorage. I'll
repeat the test with only FileStorage if anyone thinks it's important.
I am using Andrew Kuchling's latest ZODB code from sourceforge with a
slight change to Connection.py, which I've also attached. Basically
I've taken out the LOG statement on line 552.
Here is the output I got printing the stack traces with the attached
code. Note: Last frame is printed first, different from most python
track tracing. (Easier to code this way...)
>>> pst(f1)
C:\Programs\ZODB\ZEO\ClientStorage.py 447
C:\Programs\ZODB\ZODB\Connection.py 617
C:\Programs\ZODB\ZODB\Transaction.py 299
corbaFrameworks\ZODBCorbaFramework.py 398
corbaFrameworks\ZODBCorbaFramework.py 390
c:\programs\cygwin\home\jheintz\src\bonnell\products\thor_corba\thorCorbaServer.py
120
corbaFrameworks\ZODBCorbaFramework.py 478
>>> pst(f2)
C:\Programs\ZODB\ZEO\ClientStorage.py 447
C:\Programs\ZODB\ZODB\Connection.py 617
C:\Programs\ZODB\ZODB\Transaction.py 299
corbaFrameworks\ZODBCorbaFramework.py 398
corbaFrameworks\ZODBCorbaFramework.py 390
c:\programs\cygwin\home\jheintz\src\bonnell\products\thor_corba\thorCorbaServer.py
120
corbaFrameworks\ZODBCorbaFramework.py 478
>>> pst(f3)
C:\Programs\ZODB\ZEO\ClientStorage.py 447
C:\Programs\ZODB\ZODB\Connection.py 617
C:\Programs\ZODB\ZODB\Transaction.py 299
corbaFrameworks\ZODBCorbaFramework.py 398
corbaFrameworks\ZODBCorbaFramework.py 390
c:\programs\cygwin\home\jheintz\src\bonnell\products\thor_corba\thorCorbaServer.py
120
corbaFrameworks\ZODBCorbaFramework.py 478
>>> pst(f4)
C:\Programs\ZODB\ZODB\DB.py 321
C:\Programs\ZODB\ZODB\Connection.py 660
C:\Programs\ZODB\ZEO\ClientStorage.py 480
C:\Programs\ZODB\ZODB\Connection.py 653
C:\Programs\ZODB\ZODB\Transaction.py 323
corbaFrameworks\ZODBCorbaFramework.py 398
corbaFrameworks\ZODBCorbaFramework.py 390
c:\programs\cygwin\home\jheintz\src\bonnell\products\thor_corba\thorCorbaServer.py
120
corbaFrameworks\ZODBCorbaFramework.py 478
--------------020807060705000402050001
Content-Type: text/plain;
name="tracer.py"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
filename="tracer.py"
import sys
import thread
map = {}
ignore = {}
get_ident = thread.get_ident
verbose = 0
def _trace(a, b, c):
i = get_ident()
if ignore.has_key(i):
if verbose:
print "Ignoring %s" % i
return None
map[i] = (a, b, c)
if verbose:
print i, a, b, c
return _trace
def start_trace():
i = get_ident()
if ignore.has_key(i):
del ignore[i]
sys.settrace(_trace)
def end_trace():
i = get_ident()
ignore[i] = 1
del map[i]
def print_stack(f):
while f is not None:
print f.f_code.co_filename, f.f_lineno
f = f.f_back
--------------020807060705000402050001
Content-Type: text/plain;
name="Connection.py"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
filename="Connection.py"
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.5 2001/03/31 02:09:00 akuchling Exp $"""
__version__='$Revision: 1.5 $'[11:-2]
from cPickleCache import PickleCache
from POSException import ConflictError, ExportError
from cStringIO import StringIO
from cPickle import Unpickler, Pickler
from ExtensionClass import Base
from time import time
import Transaction, string, ExportImport, sys, traceback, TmpStore
from zLOG import LOG, ERROR, BLATHER
from coptimizations import new_persistent_id
from ConflictResolution import ResolvedSerial
ExtensionKlass=Base.__class__
class HelperClass: pass
ClassType=type(HelperClass)
class Connection(ExportImport.ExportImport):
"""Object managers for individual object space.
An object space is a version of collection of objects. In a
multi-threaded application, each thread get's it's own object
space.
The Connection manages movement of objects in and out of object storage.
"""
_tmp=None
_debug_info=()
_opened=None
# Experimental. Other connections can register to be closed
# when we close by putting something here.
def __init__(self, version='', cache_size=400,
cache_deactivate_after=60):
"""Create a new Connection"""
self._version=version
self._cache=cache=PickleCache(self, cache_size, cache_deactivate_after)
self._incrgc=self.cacheGC=cache.incrgc
self._invalidated=d={}
self._invalid=d.has_key
self._committed=[]
def _breakcr(self):
try: del self._cache
except: pass
try: del self._incrgc
except: pass
try: del self.cacheGC
except: pass
def __getitem__(self, oid,
tt=type(()), ct=type(HelperClass)):
cache=self._cache
if cache.has_key(oid): return cache[oid]
__traceback_info__ = (oid)
p, serial = self._storage.load(oid, self._version)
__traceback_info__ = (oid, p)
file=StringIO(p)
unpickler=Unpickler(file)
unpickler.persistent_load=self._persistent_load
try:
object = unpickler.load()
except:
raise "Could not load oid %s, pickled data in traceback info may\
contain clues" % (oid)
klass, args = object
if type(klass) is tt:
module, name = klass
klass=self._db._classFactory(self, module, name)
if (args is None or
not args and not hasattr(klass,'__getinitargs__')):
object=klass.__basicnew__()
else:
object=apply(klass,args)
if klass is not ExtensionKlass:
object.__dict__.clear()
object._p_oid=oid
object._p_jar=self
object._p_changed=None
object._p_serial=serial
cache[oid]=object
if oid=='\0\0\0\0\0\0\0\0': self._root_=object # keep a ref
return object
def _persistent_load(self,oid,
d={'__builtins__':{}},
tt=type(()), st=type(''), ct=type(HelperClass)):
__traceback_info__=oid
cache=self._cache
if type(oid) is tt:
# Quick instance reference. We know all we need to know
# to create the instance wo hitting the db, so go for it!
oid, klass = oid
if cache.has_key(oid): return cache[oid]
if type(klass) is tt:
module, name = klass
try: klass=self._db._classFactory(self, module, name)
except:
# Eek, we couldn't get the class. Hm.
# Maybe their's more current data in the
# object's actual record!
return self[oid]
object=klass.__basicnew__()
object._p_oid=oid
object._p_jar=self
object._p_changed=None
cache[oid]=object
return object
if cache.has_key(oid): return cache[oid]
return self[oid]
def _setDB(self, odb):
"""Begin a new transaction.
Any objects modified since the last transaction are invalidated.
"""
self._db=odb
self._storage=s=odb._storage
self.new_oid=s.new_oid
self._cache.invalidate(self._invalidated)
self._opened=time()
return self
def abort(self, object, transaction):
"""Abort the object in the transaction.
This just deactivates the thing.
"""
if object is self:
self._cache.invalidate(self._invalidated)
else:
self._cache.invalidate(object._p_oid)
def cacheFullSweep(self, dt=0): self._cache.full_sweep(dt)
def cacheMinimize(self, dt=0): self._cache.minimize(dt)
__onCloseCallbacks=()
def onCloseCallback(self, f):
self.__onCloseCallbacks=self.__onCloseCallbacks+(f,)
def close(self):
self._incrgc() # This is a good time to do some GC
db=self._db
# Call the close callbacks.
for f in self.__onCloseCallbacks:
try: f()
except:
f=getattr(f, 'im_self', f)
LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
error=sys.exc_info())
self.__onCloseCallbacks=()
self._db=self._storage=self._tmp=self.new_oid=self._opened=None
self._debug_info=()
# Return the connection to the pool.
db._closeConnection(self)
def commit(self, object, transaction, _type=type, _st=type('')):
if object is self:
return # we registered ourself
oid=object._p_oid
invalid=self._invalid
if oid is None or object._p_jar is not self:
# new object
oid = self.new_oid()
object._p_jar=self
object._p_oid=oid
self._creating.append(oid)
elif object._p_changed:
if (
(invalid(oid) and not hasattr(object, '_p_resolveConflict'))
or
invalid(None)
):
raise ConflictError, `oid`
self._invalidating.append(oid)
else:
# Nothing to do
return
stack=[object]
# Create a special persistent_id that passes T and the subobject
# stack along:
#
# def persistent_id(object,
# self=self,
# stackup=stackup, new_oid=self.new_oid):
# if (not hasattr(object, '_p_oid') or
# type(object) is ClassType): return None
#
# oid=object._p_oid
#
# if oid is None or object._p_jar is not self:
# oid = self.new_oid()
# object._p_jar=self
# object._p_oid=oid
# stackup(object)
#
# klass=object.__class__
#
# if klass is ExtensionKlass: return oid
#
# if hasattr(klass, '__getinitargs__'): return oid
#
# module=getattr(klass,'__module__','')
# if module: klass=module, klass.__name__
#
# return oid, klass
file=StringIO()
seek=file.seek
pickler=Pickler(file,1)
pickler.persistent_id=new_persistent_id(self, stack.append)
dbstore=self._storage.store
file=file.getvalue
cache=self._cache
get=cache.get
dump=pickler.dump
clear_memo=pickler.clear_memo
version=self._version
while stack:
object=stack[-1]
del stack[-1]
oid=object._p_oid
serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0')
if serial == '\0\0\0\0\0\0\0\0':
# new object
self._creating.append(oid)
else:
#XXX We should never get here
if (
(invalid(oid) and
not hasattr(object, '_p_resolveConflict'))
or
invalid(None)
):
raise ConflictError, `oid`
self._invalidating.append(oid)
klass = object.__class__
if klass is ExtensionKlass:
# Yee Ha!
dict={}
dict.update(object.__dict__)
del dict['_p_jar']
args=object.__name__, object.__bases__, dict
state=None
else:
if hasattr(klass, '__getinitargs__'):
args = object.__getinitargs__()
len(args) # XXX Assert it's a sequence
else:
args = None # New no-constructor protocol!
module=getattr(klass,'__module__','')
if module: klass=module, klass.__name__
__traceback_info__=klass, oid, self._version
state=object.__getstate__()
seek(0)
clear_memo()
dump((klass,args))
dump(state)
p=file(1)
s=dbstore(oid,serial,p,version,transaction)
if s:
# Note that if s is false, then the storage defered the return
if _type(s) is _st:
# normal case
if s == ResolvedSerial:
# resolved conflict
object._p_changed=None
else:
object._p_serial=s
object._p_changed=0
else:
# defered returns
for oi, s in s:
if _type(s) is not _st: raise s
o=get(oi, oi)
if o is not oi:
if s == ResolvedSerial:
o._p_changed=None
else:
o._p_serial=s
o._p_changed=0
elif oi == oid:
if s == ResolvedSerial:
object._p_changed=None
else:
object._p_serial=s
object._p_changed=0
try: cache[oid]=object
except:
# Dang, I bet its wrapped:
if hasattr(object, 'aq_base'):
cache[oid]=object.aq_base
else:
raise
def commit_sub(self, t,
_type=type, _st=type(''), _None=None):
tmp=self._tmp
if tmp is _None: return
src=self._storage
LOG('ZODB', BLATHER,
'Commiting subtransaction of size %s' % src.getSize())
self._storage=tmp
self._tmp=_None
tmp.tpc_begin(t)
load=src.load
store=tmp.store
dest=self._version
get=self._cache.get
oids=src._index.keys()
# Copy invalidating and creating info from temporary storage:
invalidating=self._invalidating
invalidating[len(invalidating):]=oids
creating=self._creating
creating[len(creating):]=src._creating
for oid in oids:
data, serial = load(oid, src)
s=store(oid, serial, data, dest, t)
if s:
if _type(s) is _st:
o=get(oid, _None)
if o is not _None: o._p_serial=s
else:
for oid, s in s:
if _type(s) is not _st: raise s
o=get(oid, _None)
if o is not _None: o._p_serial=s
def abort_sub(self, t):
"""Abort work done in subtransactions"""
tmp=self._tmp
if tmp is None: return
src=self._storage
self._tmp=None
self._storage=tmp
self._cache.invalidate(src._index.keys())
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
"""Dissown any objects newly saved in an uncommitted transaction.
"""
if creating is None:
creating=self._creating
self._creating=[]
cache=self._cache
cache_get=cache.get
for oid in creating:
o=cache_get(oid, None)
if o is not None:
del o._p_jar
del o._p_oid
del cache[oid]
#XXX
def db(self): return self._db
def getVersion(self): return self._version
def invalidate(self, oid):
"""Invalidate a particular oid
This marks the oid as invalid, but doesn't actually invalidate
it. The object data will be actually invalidated at certain
transaction boundaries.
"""
self._invalidated[oid]=1
def modifiedInVersion(self, oid):
try: return self._db.modifiedInVersion(oid)
except KeyError:
return self._version
def root(self): return self['\0\0\0\0\0\0\0\0']
def setstate(self,object):
try:
oid=object._p_oid
p, serial = self._storage.load(oid, self._version)
# XXX this is quite conservative!
# We need, however, to avoid reading data from a transaction
# that committed after the current "session" started, as
# that might lead to mixing of cached data from earlier
# transactions and new inconsistent data.
#
# Note that we (carefully) wait until after we call the
# storage to make sure that we don't miss an invaildation
# notifications between the time we check and the time we
# read.
invalid=self._invalid
if invalid(oid) or invalid(None):
if not hasattr(object.__class__, '_p_independent'):
get_transaction().register(self)
raise ConflictError(`oid`, `object.__class__`)
invalid=1
else:
invalid=0
file=StringIO(p)
unpickler=Unpickler(file)
unpickler.persistent_load=self._persistent_load
unpickler.load()
state = unpickler.load()
if hasattr(object, '__setstate__'):
object.__setstate__(state)
else:
d=object.__dict__
for k,v in state.items(): d[k]=v
object._p_serial=serial
if invalid:
if object._p_independent():
try: del self._invalidated[oid]
except KeyError: pass
else:
get_transaction().register(self)
raise ConflictError(`oid`, `object.__class__`)
except:
raise
def oldstate(self, object, serial):
oid=object._p_oid
p = self._storage.loadSerial(oid, serial)
file=StringIO(p)
unpickler=Unpickler(file)
unpickler.persistent_load=self._persistent_load
unpickler.load()
return unpickler.load()
def setklassstate(self, object,
tt=type(()), ct=type(HelperClass)):
try:
oid=object._p_oid
__traceback_info__=oid
p, serial = self._storage.load(oid, self._version)
file=StringIO(p)
unpickler=Unpickler(file)
unpickler.persistent_load=self._persistent_load
copy = unpickler.load()
klass, args = copy
if klass is not ExtensionKlass:
LOG('ZODB',ERROR,
"Unexpected klass when setting class state on %s"
% getattr(object,'__name__','(?)'))
return
copy=apply(klass,args)
object.__dict__.clear()
object.__dict__.update(copy.__dict__)
object._p_oid=oid
object._p_jar=self
object._p_changed=0
object._p_serial=serial
except:
LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
raise
def tpc_abort(self, transaction):
self._storage.tpc_abort(transaction)
cache=self._cache
cache.invalidate(self._invalidated)
cache.invalidate(self._invalidating)
self._invalidate_creating()
def tpc_begin(self, transaction, sub=None):
if self._invalid(None): # Some nitwit invalidated everything!
raise ConflictError, "transaction already invalidated"
self._invalidating=[]
self._creating=[]
if sub:
# Sub-transaction!
_tmp=self._tmp
if _tmp is None:
_tmp=TmpStore.TmpStore(self._version)
self._tmp=self._storage
self._storage=_tmp
_tmp.registerDB(self._db, 0)
self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction,
_type=type, _st=type('')):
try: vote=self._storage.tpc_vote
except: return
s=vote(transaction)
if s:
get=self._cache.get
for oid, s in s:
o=get(oid, oid)
if o is not oid:
if _type(s) is not _st: raise s
if s == ResolvedSerial:
o._p_changed=None
else:
o._p_serial=s
o._p_changed=0
def tpc_finish(self, transaction):
# It's important that the storage call the function we pass
# (self.tpc_finish_) while it still has it's lock. We don't
# want another thread to be able to read any updated data
# until we've had a chance to send an invalidation message to
# all of the other connections!
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
self._storage.tpc_finish(transaction, self._invalidate_sub)
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
self._storage.tpc_finish(transaction,
self._invalidate_invalidating)
self._cache.invalidate(self._invalidated)
self._incrgc() # This is a good time to do some GC
def _invalidate_invalidating(self):
invalidate=self._db.invalidate
for oid in self._invalidating: invalidate(oid, self)
def _invalidate_sub(self):
# There's no point in invalidating any objects in a subtransaction
#
# Because we may ultimately abort the containing transaction.
pass
def sync(self):
get_transaction().abort()
sync=getattr(self._storage, 'sync', 0)
if sync != 0: sync()
self._cache.invalidate(self._invalidated)
self._incrgc() # This is a good time to do some GC
def getDebugInfo(self): return self._debug_info
def setDebugInfo(self, *args): self._debug_info=self._debug_info+args
######################################################################
# Just plain weird. Don't try this at home kids.
def exchange(self, old, new):
oid=old._p_oid
new._p_oid=oid
new._p_jar=self
new._p_changed=1
get_transaction().register(new)
self._cache[oid]=new
class tConnection(Connection):
def close(self):
self._breakcr()
--------------020807060705000402050001--