############################################################################## # # Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors. # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE # ############################################################################## """RPC stubs for interface exported by StorageServer.""" # DM 2004-12-10 -- see below ZEO_MAX_RESPONSE_TIME = 60 # s, maybe controllable via an environment variable? from time import time ## # ZEO storage server. #

# Remote method calls can be synchronous or asynchronous. If the call # is synchronous, the client thread blocks until the call returns. A # single client can only have one synchronous request outstanding. If # several threads share a single client, threads other than the caller # will block only if the attempt to make another synchronous call. # An asynchronous call does not cause the client thread to block. An # exception raised by an asynchronous method is logged on the server, # but is not returned to the client. class StorageServer: """An RPC stub class for the interface exported by ClientStorage. This is the interface presented by the StorageServer to the ClientStorage; i.e. the ClientStorage calls these methods and they are executed in the StorageServer. See the StorageServer module for documentation on these methods, with the exception of _update(), which is documented here. """ def __init__(self, rpc): """Constructor. The argument is a connection: an instance of the zrpc.connection.Connection class. """ self.rpc = rpc # Wait until we know what version the other side is using. # DM 2004-12-10: this free running loop occasionally did # not finish (reasons still unknown) and consumed an excessive # amount of CPU time. We call "pending" with a timeout of # 30s (this is effective only for synchronous mode!) # and in addition check that we finish within "ZEO_MAX_RESPONSE_TIME". #while rpc.peer_protocol_version is None: #rpc.pending() st = time() while rpc.peer_protocol_version is None: if time() - st > ZEO_MAX_RESPONSE_TIME: # the server took too long to report its version rpc.close() raise EnvironmentError("ZEO server's initial response exceeded %ss" % ZEO_MAX_RESPONSE_TIME) rpc.pending(30) if rpc.peer_protocol_version == 'Z200': self.lastTransaction = lambda: None self.getInvalidations = lambda tid: None self.getAuthProtocol = lambda: None def extensionMethod(self, name): return ExtensionMethodWrapper(self.rpc, name).call ## # Register current connection with a storage and a mode. # In effect, it is like an open call. # @param storage_name a string naming the storage. This argument # is primarily for backwards compatibility with servers # that supported multiple storages. # @param read_only boolean # @exception ValueError unknown storage_name or already registered # @exception ReadOnlyError storage is read-only and a read-write # connectio was requested def register(self, storage_name, read_only): self.rpc.call('register', storage_name, read_only) ## # Return dictionary of meta-data about the storage. # @defreturn dict def get_info(self): return self.rpc.call('get_info') ## # Check whether the server requires authentication. Returns # the name of the protocol. # @defreturn string def getAuthProtocol(self): return self.rpc.call('getAuthProtocol') ## # Return id of the last committed transaction # @defreturn string def lastTransaction(self): # Not in protocol version 2.0.0; see __init__() return self.rpc.call('lastTransaction') ## # Return invalidations for all transactions after tid. # @param tid transaction id # @defreturn 2-tuple, (tid, list) # @return tuple containing the last committed transaction # and a list of oids that were invalidated. Returns # None and an empty list if the server does not have # the list of oids available. def getInvalidations(self, tid): # Not in protocol version 2.0.0; see __init__() return self.rpc.call('getInvalidations', tid) ## # Check whether serial numbers s and sv are current for oid. # If one or both of the serial numbers are not current, the # server will make an asynchronous invalidateVerify() call. # @param oid object id # @param s serial number on non-version data # @param sv serial number of version data or None # @defreturn async def zeoVerify(self, oid, s, sv): self.rpc.callAsync('zeoVerify', oid, s, sv) ## # Check whether current serial number is valid for oid and version. # If the serial number is not current, the server will make an # asynchronous invalidateVerify() call. # @param oid object id # @param version name of version for oid # @param serial client's current serial number # @defreturn async def verify(self, oid, version, serial): self.rpc.callAsync('verify', oid, version, serial) ## # Signal to the server that cache verification is done. # @defreturn async def endZeoVerify(self): self.rpc.callAsync('endZeoVerify') ## # Generate a new set of oids. # @param n number of new oids to return # @defreturn list # @return list of oids def new_oids(self, n=None): if n is None: return self.rpc.call('new_oids') else: return self.rpc.call('new_oids', n) ## # Pack the storage. # @param t pack time # @param wait optional, boolean. If true, the call will not # return until the pack is complete. def pack(self, t, wait=None): if wait is None: self.rpc.call('pack', t) else: self.rpc.call('pack', t, wait) ## # Return current data for oid. Version data is returned if # present. # @param oid object id # @defreturn 5-tuple # @return 5-tuple, current non-version data, serial number, # version name, version data, version data serial number # @exception KeyError if oid is not found def zeoLoad(self, oid): if _doLOG: s= time() r= self.rpc.call('zeoLoad', oid) dt= time() - s data= r[0] try: size= len(data) except: size= 'unknown' LOG('zeoServerStub',INFO, 'zeoLoad: oid=%s class=%s size=%s time=%.3fms' % (repr(oid), _getClass(data), size, dt * 1000, # ms ) ) return r return self.rpc.call('zeoLoad', oid) ## # Return current data for oid in version, the tid of the transaction that # wrote the most recent revision, and the name of the version for the # data returned. Versions make this hard to understand; in particular, # the version string returned may not equal the version string passed # in, and that's "a feature" I don't understand. Similarly, the tid # returned is the tid of the most recent revision of oid, and that may # not equal the tid of the transaction that wrote the data returned. # @param oid object id # @param version string, name of version # @defreturn 3-tuple # @return data, transaction id, version # where version is the name of the version the data came # from or "" for non-version data # @exception KeyError if oid is not found def loadEx(self, oid, version): if _doLOG: s= time() r= self.rpc.call('loadEx', oid, version) dt= time() - s data= r[0] try: size= len(data) except: size= 'unknown' LOG('zeoServerStub',INFO, 'loadEx: oid=%s class=%s size=%s time=%.3fms' % (repr(oid), _getClass(data), size, dt * 1000, # ms ) ) return r return self.rpc.call("loadEx", oid, version) ## # Return non-current data along with transaction ids that identify # the lifetime of the specific revision. # @param oid object id # @param tid a transaction id that provides an upper bound on # the lifetime of the revision. That is, loadBefore # returns the revision that was current before tid committed. # @defreturn 4-tuple # @return data, serial numbr, start transaction id, end transaction id def loadBefore(self, oid, tid): if _doLOG: s= time() r= self.rpc.call("loadBefore", oid, tid) dt= time() - s data= r[0] try: size= len(data) except: size= 'unknown' LOG('zeoServerStub',INFO, 'loadBefore: oid=%s class=%s size=%s time=%.3fms' % (repr(oid), _getClass(data), size, dt * 1000, # ms ) ) return r return self.rpc.call("loadBefore", oid, tid) ## # Storage new revision of oid. # @param oid object id # @param serial serial number that this transaction read # @param data new data record for oid # @param version name of version or "" # @param id id of current transaction # @defreturn async def storea(self, oid, serial, data, version, id): if _doLOG: try: size= len(data) except: size= 'unknown' LOG('zeoServerStub',INFO, 'storea: oid=%s class=%s size=%s' % (repr(oid), _getClass(data), size, ) ) self.rpc.callAsync('storea', oid, serial, data, version, id) ## # Start two-phase commit for a transaction # @param id id used by client to identify current transaction. The # only purpose of this argument is to distinguish among multiple # threads using a single ClientStorage. # @param user name of user committing transaction (can be "") # @param description string containing transaction metadata (can be "") # @param ext dictionary of extended metadata (?) # @param tid optional explicit tid to pass to underlying storage # @param status optional status character, e.g "p" for pack # @defreturn async def tpc_begin(self, id, user, descr, ext, tid, status): if _doLOG: s= time() r= self.rpc.call('tpc_begin', id, user, descr, ext, tid, status) dt= time() - s LOG('zeoServerStub',INFO, 'tpc_begin: tid=%s descr=%s time=%.3fms' % (repr(tid), repr(descr), dt * 1000, # ms ) ) return r return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status) def vote(self, trans_id): if _doLOG: s= time() r= self.rpc.call('vote', trans_id) dt= time() - s LOG('zeoServerStub',INFO, 'vote: tid=%s time=%.3fms' % (repr(trans_id), dt * 1000, # ms ) ) return r return self.rpc.call('vote', trans_id) def tpc_finish(self, id): if _doLOG: s= time() r= self.rpc.call('tpc_finish', id) dt= time() - s LOG('zeoServerStub',INFO, 'tpc_finish: tid=%s time=%.3fms' % (repr(id), dt * 1000, # ms ) ) return r return self.rpc.call('tpc_finish', id) def tpc_abort(self, id): if _doLOG: LOG('zeoServerStub',INFO, 'vote: tid= %s' % (repr(id), ) ) self.rpc.callAsync('tpc_abort', id) def abortVersion(self, src, id): return self.rpc.call('abortVersion', src, id) def commitVersion(self, src, dest, id): return self.rpc.call('commitVersion', src, dest, id) def history(self, oid, version, length=None): if length is None: return self.rpc.call('history', oid, version) else: return self.rpc.call('history', oid, version, length) def load(self, oid, version): if _doLOG: s= time() r= self.rpc.call('load', oid, version) dt= time() - s data= r[0] try: size= len(data) except: size= 'unknown' LOG('zeoServerStub',INFO, 'load: oid=%s version=%s class=%s size=%s time=%.3fms' % (repr(oid), repr(version), _getClass(data), size, dt * 1000, # ms ) ) return r return self.rpc.call('load', oid, version) def getSerial(self, oid): return self.rpc.call('getSerial', oid) def loadSerial(self, oid, serial): if _doLOG: s= time() r= self.rpc.call('loadSerial', oid, serial) dt= time() - s try: size= len(r) except: size= 'unknown' LOG('zeoServerStub',INFO, 'loadSerial: oid=%s serial=%s class=%s size=%s time=%.3fms' % (repr(oid), repr(serial), _getClass(r), size, dt * 1000, # ms ) ) return r return self.rpc.call('loadSerial', oid, serial) def modifiedInVersion(self, oid): return self.rpc.call('modifiedInVersion', oid) def new_oid(self): return self.rpc.call('new_oid') def store(self, oid, serial, data, version, trans): if _doLOG: s= time() r= self.rpc.call('store', oid, serial, data, version, trans) dt= time() - s LOG('zeoServerStub',INFO, 'store: oid=%s class=%s size=%d' % (repr(oid), _getClass(data), len(data), ) ) return r return self.rpc.call('store', oid, serial, data, version, trans) def undo(self, trans_id, trans): return self.rpc.call('undo', trans_id, trans) def undoLog(self, first, last): return self.rpc.call('undoLog', first, last) def undoInfo(self, first, last, spec): return self.rpc.call('undoInfo', first, last, spec) def versionEmpty(self, vers): return self.rpc.call('versionEmpty', vers) def versions(self, max=None): if max is None: return self.rpc.call('versions') else: return self.rpc.call('versions', max) # DM: extended logging support from os import environ _doLOG= not not environ.get('ZEO_LOG_CLIENT') if _doLOG: from zLOG import LOG, INFO from time import time from cPickle import loads from types import TupleType def _getClass(pickle, _tuple= TupleType): '''determine class of *pickle*.''' try: ci= loads(pickle)[0] if type(ci) is _tuple: ci= '.'.join(ci) return ci except: return '***exception***' class ExtensionMethodWrapper: def __init__(self, rpc, name): self.rpc = rpc self.name = name def call(self, *a, **kwa): return self.rpc.call(self.name, *a, **kwa)