[Checkins] SVN: Sandbox/J1m/zc.zk/ Initial
Jim Fulton
jim at zope.com
Sun Nov 27 19:44:54 UTC 2011
Log message for revision 123488:
Initial
Changed:
U Sandbox/J1m/zc.zk/buildout.cfg
U Sandbox/J1m/zc.zk/setup.py
A Sandbox/J1m/zc.zk/src/zc/zk/
A Sandbox/J1m/zc.zk/src/zc/zk/README.txt
A Sandbox/J1m/zc.zk/src/zc/zk/__init__.py
A Sandbox/J1m/zc.zk/src/zc/zk/tests.py
-=-
Modified: Sandbox/J1m/zc.zk/buildout.cfg
===================================================================
--- Sandbox/J1m/zc.zk/buildout.cfg 2011-11-27 18:47:08 UTC (rev 123487)
+++ Sandbox/J1m/zc.zk/buildout.cfg 2011-11-27 19:44:53 UTC (rev 123488)
@@ -1,12 +1,12 @@
[buildout]
-develop = .
-parts = test py
+develop = . ../thread
+parts = test
-[test]
-recipe = zc.recipe.testrunner
-eggs =
-
[py]
recipe = zc.recipe.egg
-eggs = ${test:eggs}
+eggs = zc.zk [test]
interpreter = py
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = ${py:eggs}
Modified: Sandbox/J1m/zc.zk/setup.py
===================================================================
--- Sandbox/J1m/zc.zk/setup.py 2011-11-27 18:47:08 UTC (rev 123487)
+++ Sandbox/J1m/zc.zk/setup.py 2011-11-27 19:44:53 UTC (rev 123488)
@@ -11,15 +11,18 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-name, version = 'zc.', '0'
+name, version = 'zc.zk', '0'
-install_requires = ['setuptools']
-extras_require = dict(test=['zope.testing'])
+install_requires = ['setuptools', 'zc.thread']
+extras_require = dict(
+ test=['zope.testing', 'zookeeper-static', 'mock', 'pytest'])
entry_points = """
"""
from setuptools import setup
+import os
+readme = open(os.path.join('src', 'zc', 'zk', 'README.txt')).read()
setup(
author = 'Jim Fulton',
@@ -27,8 +30,8 @@
license = 'ZPL 2.1',
name = name, version = version,
- long_description=open('README.txt').read(),
- description = open('README.txt').read().strip().split('\n')[0],
+ long_description=readme,
+ description = readme.strip().split('\n')[0],
packages = [name.split('.')[0], name],
namespace_packages = [name.split('.')[0]],
package_dir = {'': 'src'},
Added: Sandbox/J1m/zc.zk/src/zc/zk/README.txt
===================================================================
--- Sandbox/J1m/zc.zk/src/zc/zk/README.txt (rev 0)
+++ Sandbox/J1m/zc.zk/src/zc/zk/README.txt 2011-11-27 19:44:53 UTC (rev 123488)
@@ -0,0 +1,316 @@
+ZooKeeper Helpers
+=================
+
+The zc.zk package provides some high-level interfaces to the low-level
+zookeeper extension. It's not complete, in that it doesn't try, at
+this time, to be a complete high-level interface. Rather, it provides
+facilities we need to use Zookeeeper to services together:
+
+- ZODB database clients and servers
+- HTTP-based clients and services
+- Load balencers
+
+The current (initial) use cases are:
+
+- Register a server providing a service.
+- Get the addresses of servers providing a service.
+- Get abd set service configuration data.
+
+This package makes no effort to support Windows. (Patches to support
+Windows might be accepted if they don't add much complexity.)
+
+.. contents::
+
+Installation
+------------
+
+You can install this as you would any other distribution. Note,
+however, that you must also install the Python ZooKeeper binding
+provided with ZooKeeper. Because this binding is packaged a number of
+different ways, it isn't listed as a distribution requirement.
+
+Instantiating a ZooKeeper helper
+--------------------------------
+
+To use the helper API, create a ZooKeeper instance:
+
+.. test
+
+ >>> import zookeeper
+ >>> @side_effect(init)
+ ... def _(addr, func):
+ ... global session_watch
+ ... session_watch = func
+ ... func(0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+ ... assert_(addr=='zookeeper.example.com:2181', addr)
+
+ >>> @side_effect(state)
+ ... def _(handle):
+ ... assert_(handle==0)
+ ... return zookeeper.CONNECTED_STATE
+
+::
+
+ >>> import zc.zk
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+
+The ZooKeeper constructor takes a ZooKeeper connection string, which is a
+comma-separated list of addresses of the form HOST:PORT. It defaults
+to '127.0.0.1:2181', which is convenient during development.
+
+Register a server providing a service.
+--------------------------------------
+
+To register a server, use the ``register_server`` method, which takes
+a service path and the address a server is listing on
+
+.. test
+
+ >>> import os, json, zookeeper
+ >>> path = '/fooservice/servers'
+ >>> addrs = []
+ >>> child_handler = None
+ >>> @side_effect(create)
+ ... def _(handle, path_, data, acl, flags):
+ ... assert_(handle == 0)
+ ... path_, addr = path_.rsplit('/', 1)
+ ... assert_(path_ == path)
+ ... assert_(json.loads(data) == dict(pid=os.getpid()))
+ ... addrs.append(addr)
+ ... assert_(acl == [zc.zk.world_permission()])
+ ... assert_(flags == zookeeper.EPHEMERAL)
+ ... global child_handler
+ ... if child_handler is not None:
+ ... child_handler(handle, zookeeper.CHILD_EVENT,
+ ... zookeeper.CONNECTED_STATE, path_)
+ ... child_handler = None
+
+::
+
+ >>> zk.register_server('/fooservice/servers', ('192.168.0.42', 8080))
+
+
+``register_server`` creates a read-only ephemeral ZooKeeper node as a
+child of the given service path. The name of the new node is the
+given address. This allows clients to get the list of addresses by
+just getting the list of the names of children of the service path.
+
+Ephemeral nodes have the useful property that they're automatically
+removed when a ZooKeeper session is closed or when the process
+containing it dies. De-deregistration is automatic.
+
+When registering a server, you can optionally provide server (node)
+data as additional keyword arguments to register_server. By default,
+the process id is set as the ``pid`` server key. This is useful to
+tracking down the server process.
+
+Get the addresses of servers providing a service.
+-------------------------------------------------
+
+Getting the adresses providing a service is accomplished by getting the
+children of a service node.
+
+.. test
+
+ >>> @side_effect(get_children)
+ ... def _(handle, path, handler):
+ ... assert_(handle == 0, handle)
+ ... assert_(path == '/fooservice/servers', path)
+ ... global child_handler
+ ... child_handler = handler
+ ... return addrs
+
+::
+
+ >>> addresses = zk.children('/fooservice/servers')
+ >>> sorted(addresses)
+ ['192.168.0.42:8080']
+
+The ``children`` method returns an iterable of names of child nodes of
+the node specified by the given path. The iterable is automatically
+updated when new servers are registered::
+
+ >>> zk.register_server('/fooservice/servers', ('192.168.0.42', 8081))
+ >>> sorted(addresses)
+ ['192.168.0.42:8080', '192.168.0.42:8081']
+
+You can call the iterable with a callback function that is called
+whenenever the list of children changes::
+
+ >>> @zk.children('/fooservice/servers')
+ ... def addresses_updated(addresses):
+ ... print 'addresses changed'
+ ... print sorted(addresses)
+ addresses changed
+ ['192.168.0.42:8080', '192.168.0.42:8081']
+
+The callback is called immediately with the children. When we add
+another child, it'll be called again::
+
+ >>> zk.register_server('/fooservice/servers', ('192.168.0.42', 8082))
+ addresses changed
+ ['192.168.0.42:8080', '192.168.0.42:8081', '192.168.0.42:8082']
+
+Get service configuration data.
+-------------------------------
+
+You get service configuration data by getting data associated with a
+ZooKeeper node. The interface for getting data is similar to the
+interface for getting children:
+
+
+.. test
+
+ >>> node_data = json.dumps(dict(
+ ... database = "/databases/foomain",
+ ... threads = 1,
+ ... favorite_color= "red"))
+ >>> @side_effect(get)
+ ... def _(handle, path, handler):
+ ... assert_(handle == 0)
+ ... assert_(path == '/fooservice')
+ ... global get_handler
+ ... get_handler = handler
+ ... return node_data, {}
+
+::
+
+ >>> data = zk.properties('/fooservice')
+ >>> data['database']
+ u'/databases/foomain'
+ >>> data['threads']
+ 1
+
+The ``properties`` method returns a mapping object that provides access to
+node data. (ZooKeeper only stores string data for nodes. ``zc.zk``
+provides a higher-level data interface by storing JSON strings.)
+
+The properties objects can be called with callback functions and used
+as function decorators to get update notification:
+
+ >>> @zk.properties('/fooservice')
+ ... def data_updated(data):
+ ... print 'data updated'
+ ... for item in sorted(data.items()):
+ ... print '%s: %r' % item
+ data updated
+ database: u'/databases/foomain'
+ favorite_color: u'red'
+ threads: 1
+
+The callback is called immediately. It'll also be called when data are
+updated.
+
+Updating node data
+------------------
+
+You can't set data properties, but you can update data by calling it's
+``update`` method:
+
+.. test
+
+ >>> @side_effect(set)
+ ... def _(handle, path, data):
+ ... global node_data
+ ... node_data = data
+ ... get_handler(handle, zookeeper.CHANGED_EVENT,
+ ... zookeeper.CONNECTED_STATE, path)
+
+::
+
+ >>> data.update(threads=2, secret='123')
+ data updated
+ database: u'/databases/foomain'
+ favorite_color: u'red'
+ secret: u'123'
+ threads: 2
+
+or by calling it's ``set`` method, which removes keys not listed::
+
+ >>> data.set(threads=3, secret='1234')
+ data updated
+ secret: u'1234'
+ threads: 3
+
+ZooKeeper Session Management
+----------------------------
+
+``zc.zk`` takes care of ZooKeeper session management for you. It
+establishes and, if necessary, reestablishes sessions for you. In
+particular, it takes care of reestablishing ZooKeeper watches when a
+session is reestablished.
+
+ZooKeeper logging
+-----------------
+
+``zc.zk`` bridges the low-level ZooKeeper logging API and the Python
+logging API. ZooKeeper log messages are forwarded to the Python
+``'ZooKeeper'`` logger.
+
+``zc.zk.ZooKeeper``
+-------------------
+
+``zc.zk.ZooKeeper(connection_string)``
+ Return a new instance given a ZooKeeper connection string.
+
+``children(path)``
+ Return a `zc.zk.Children`_ for the path.
+
+``properties(path)``
+ Return a `zc.zk.Properties`_ for the path.
+
+``handle``
+ The ZooKeeper session handle
+
+ This attribute can be used to call the lower-level API provided by
+ the ``zookeeper`` extension.
+
+``register_server(path, address, **data)``
+ Register a server at a path with the address.
+
+ An ephemeral child node of ``path`` will be created with name equal
+ to the string representation (HOST:PORT) of the given address.
+
+ ``address`` must be a host and port tuple.
+
+ Optional node properties can be provided as keyword arguments.
+
+zc.zk.Children
+--------------
+
+``__iter__()``
+ Return an iterator over the child names.
+
+``__call__(callable)``
+ Register a callback to be called whenever a child node is added or
+ removed.
+
+ The callback is passed the children instance when a child node is
+ added or removed.
+
+zc.zk.Properties
+----------------
+
+Properties objects provide the usual read-only mapping methods,
+__getitem__, __len__, etc..
+
+``set(**properties)``
+ Set the properties for the node, replacing existing data.
+
+``update(**properties)``
+ Update the properties for the node.
+
+``__call__(callable)``
+ Register a callback to be called whenever a node's properties are changed.
+
+ The callback is passed the properties instance when properties are
+ changed.
+
+Node deletion
+-------------
+
+If a node is deleted and ``Children`` or ``Properties`` instances have
+been created for it, the instances' data will be cleared. Attempts to
+update properties will fail. If callbacks have been registered, they
+will be called without arguments, if possible. It would be bad, in
+practice, to remove a node that processes are watching.
Property changes on: Sandbox/J1m/zc.zk/src/zc/zk/README.txt
___________________________________________________________________
Added: svn:eol-style
+ native
Added: Sandbox/J1m/zc.zk/src/zc/zk/__init__.py
===================================================================
--- Sandbox/J1m/zc.zk/src/zc/zk/__init__.py (rev 0)
+++ Sandbox/J1m/zc.zk/src/zc/zk/__init__.py 2011-11-27 19:44:53 UTC (rev 123488)
@@ -0,0 +1,262 @@
+import collections
+import json
+import logging
+import os
+import sys
+import threading
+import zc.thread
+import zookeeper
+
+logger = logging.getLogger(__name__)
+
+ at zc.thread.Thread
+def loggingthread():
+ r, w = os.pipe()
+ zookeeper.set_log_stream(os.fdopen(w, 'w'))
+ log = logging.getLogger('ZooKeeper').log
+ f = os.fdopen(r)
+ levels = dict(ZOO_INFO = logging.INFO,
+ ZOO_WARN = logging.WARNING,
+ ZOO_ERROR = logging.ERROR,
+ ZOO_DEBUG = logging.DEBUG,
+ )
+ while 1:
+ line = f.readline().strip()
+ try:
+ if '@' in line:
+ level, message = line.split('@', 1)
+ level = levels.get(level.split(':')[-1])
+ else:
+ level = None
+
+ if level is None:
+ log(logging.INFO, line)
+ else:
+ log(level, message)
+ except Exception, v:
+ logging.getLogger('ZooKeeper').exception("Logging error: %s", v)
+
+
+def parse_addr(addr):
+ host, port = addr.split(':')
+ return host, int(port)
+
+def world_permission(perms=zookeeper.PERM_READ):
+ return dict(perms=perms, scheme='world', id='anyone')
+
+class CancelWatch(Exception):
+ pass
+
+class ZooKeeper:
+
+ def __init__(self, zkaddr=2181):
+ if isinstance(zkaddr, int):
+ zkaddr = "127.0.0.1:%s" % zkaddr
+ self.zkaddr = zkaddr
+ self.watches = set()
+ self.connected = threading.Event()
+ zookeeper.init(zkaddr, self._watch_session)
+ self.connected.wait()
+
+ handle = None
+ def _watch_session(self, handle, event_type, state, path):
+ assert event_type == zookeeper.SESSION_EVENT
+ assert not path
+ if state == zookeeper.CONNECTED_STATE:
+ if self.handle is None:
+ self.handle = handle
+ if self.watches:
+ # reestablish after session reestablished
+ watches = self.watches
+ self.watches = set()
+ for watch in watches:
+ self._watch(watch, False)
+ else:
+ assert handle == self.handle
+ self.connected.set()
+ logger.info('connected %s', handle)
+ elif state == zookeeper.CONNECTING_STATE:
+ self.connected.clear()
+ elif state == zookeeper.EXPIRED_SESSION_STATE:
+ self.connected.clear()
+ zookeeper.close(self.handle)
+ self.handle = None
+ zookeeper.init(self.zkaddr, self._watch_session)
+ else:
+ logger.critical('unexpected session event %s %s', handle, state)
+
+ def register_server(self, path, addr, **kw):
+ kw['pid'] = os.getpid()
+ self.connected.wait()
+ zookeeper.create(self.handle, path + '/%s:%s' % addr, json.dumps(kw),
+ [world_permission()], zookeeper.EPHEMERAL)
+
+ def _watch(self, watch, wait=True):
+ if wait:
+ self.connected.wait()
+ self.watches.add(watch)
+
+ def handler(h, t, state, p):
+ if watch not in self.watches:
+ return
+ assert h == self.handle
+ assert state == zookeeper.CONNECTED_STATE
+ assert p == watch.path
+ if t == zookeeper.DELETED_EVENT:
+ watch._deleted()
+ self.watches.remove(watch)
+ else:
+ assert t == watch.event_type
+ zkfunc = getattr(zookeeper, watch.zkfunc)
+ watch._notify(zkfunc(self.handle, watch.path, handler))
+
+ handler(self.handle, watch.event_type, self.state, watch.path)
+
+ def children(self, path):
+ return Children(self, path)
+
+ def properties(self, path):
+ return Properties(self, path)
+
+ def _set(self, path, data):
+ self.connected.wait()
+ return zookeeper.set(self.handle, path, data)
+
+ def print_tree(self, path='/', indent=0):
+ self.connected.wait()
+ prefix = ' '*indent
+ print prefix + path.split('/')[-1]+'/'
+ indent += 2
+ prefix += ' '
+ data = zookeeper.get(self.handle, path)[0].strip()
+ if data:
+ if data.startswith('{') and data.endswith('}'):
+ data = json.loads(data)
+ import pprint
+ print prefix+pprint.pformat(data).replace(
+ '\n', prefix+'\n')
+ else:
+ print prefix + repr(data)
+ for p in zookeeper.get_children(self.handle, path):
+ if not path.endswith('/'):
+ p = '/'+p
+ self.print_tree(path+p, indent)
+
+ def close(self):
+ zookeeper.close(self.handle)
+ del self.handle
+
+ @property
+ def state(self):
+ if self.handle is None:
+ return zookeeper.CONNECTING_STATE
+ return zookeeper.state(self.handle)
+
+
+class NodeInfo:
+
+ def __init__(self, session, path):
+ self.session = session
+ self.path = path
+ self.callbacks = set()
+ session._watch(self)
+
+ def setData(self, data):
+ self.data = data
+
+ deleted = False
+ def _deleted(self):
+ self.deleted = True
+ self.data = {}
+ for callback in self.callbacks:
+ try:
+ callback()
+ except TypeError:
+ pass
+ except:
+ logger.exception('Error %r calling %r', self, callback)
+
+ def __repr__(self):
+ return "%s.%s(%s, %s)" % (
+ self.__class__.__module__, self.__class__.__name__,
+ self.session.handle, self.path)
+
+ def _notify(self, data):
+ self.setData(data)
+ for callback in list(self.callbacks):
+ try:
+ callback(self)
+ except Exception, v:
+ self.callbacks.remove(callback)
+ if isinstance(v, CancelWatch):
+ logger.debug("cancelled watch(%r, %r)", self, callback)
+ else:
+ logger.exception("watch(%r, %r)", self, callback)
+
+ def __call__(self, func):
+ func(self)
+ self.callbacks.add(func)
+ return func
+
+ def __iter__(self):
+ return iter(self.data)
+
+class Children(NodeInfo):
+
+ event_type = zookeeper.CHILD_EVENT
+ zkfunc = 'get_children'
+
+class Properties(NodeInfo, collections.Mapping):
+
+ event_type = zookeeper.CHANGED_EVENT
+ zkfunc = 'get'
+
+ def setData(self, data):
+ sdata, self.meta_data = data
+ s = sdata.strip()
+ if not s:
+ data = {}
+ elif s.startswith('{') and s.endswith('}'):
+ try:
+ data = json.loads(s)
+ except:
+ logger.exception('bad json data in node at %r', self.path)
+ data = dict(string_value = sdata)
+ else:
+ data = dict(string_value = sdata)
+
+ self.data = data
+
+ def __getitem__(self, key):
+ return self.data[key]
+
+ def __len__(self):
+ return len(self.data)
+
+ def __contains__(self, key):
+ return key in self.data
+
+ def copy(self):
+ return self.data.copy()
+
+ def _set(self, data):
+ if not data:
+ sdata = ''
+ elif len(data) == 1 and 'string_value' in data:
+ sdata = data['string_value']
+ else:
+ sdata = json.dumps(data)
+ self.data = data
+ zookeeper.set(self.session.handle, self.path, sdata)
+
+ def set(self, **data):
+ self._set(data)
+
+ def update(self, **updates):
+ data = self.data.copy()
+ data.update(updates)
+ self._set(data)
+
+ def __hash__(self):
+ # Gaaaa, collections.Mapping
+ return hash(id(self))
Property changes on: Sandbox/J1m/zc.zk/src/zc/zk/__init__.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/zc.zk/src/zc/zk/tests.py
===================================================================
--- Sandbox/J1m/zc.zk/src/zc/zk/tests.py (rev 0)
+++ Sandbox/J1m/zc.zk/src/zc/zk/tests.py 2011-11-27 19:44:53 UTC (rev 123488)
@@ -0,0 +1,361 @@
+import doctest
+import json
+import logging
+import mock
+import os
+import StringIO
+import time
+import zc.zk
+import zc.thread
+import zookeeper
+import zope.testing.loggingsupport
+import zope.testing.setupstack
+import unittest
+
+def wait_until(func, timeout=9):
+ if func():
+ return
+ deadline = time.time()+timeout
+ while not func():
+ time.sleep(.01)
+ if time.time() > deadline:
+ raise AssertionError('timeout')
+
+class LoggingTests(unittest.TestCase):
+
+ def test_logging(self):
+ logger = logging.getLogger('ZooKeeper')
+ f = StringIO.StringIO()
+ h = logging.StreamHandler(f)
+ logger.addHandler(h)
+ logger.setLevel(logging.ERROR)
+ handle = zookeeper.init('zookeeper.example.com:2181')
+ wait_until(lambda : 'error' in f.getvalue())
+ zookeeper.close(handle)
+ logger.setLevel(logging.NOTSET)
+ logger.removeHandler(h)
+
+def side_effect(mock):
+ return lambda func: setattr(mock, 'side_effect', func)
+
+class Tests(unittest.TestCase):
+
+ @mock.patch('zookeeper.init')
+ def setUp(self, init):
+ @zc.thread.Thread
+ def getzk():
+ zk = zc.zk.ZooKeeper()
+ return zk
+
+ wait_until(lambda : init.call_args)
+ (zkaddr, self.__session_watcher), kw = init.call_args
+ self.assertEqual((zkaddr, kw), ('127.0.0.1:2181', {}))
+ self.__session_watcher(
+ 0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+ getzk.join(1)
+ self.__zk = getzk.value
+ self.assertEqual(self.__zk.handle, 0)
+
+ def state_side_effect(self, handle):
+ self.assertEqual(handle, self.__zk.handle)
+ return zookeeper.CONNECTED_STATE
+
+ @mock.patch('zookeeper.create')
+ def test_register_server(self, create):
+ @side_effect(create)
+ def _(handle, path_, data, acl, flags):
+ self.assertEqual((handle, path_), (0, '/foo/127.0.0.1:8080'))
+ self.assertEqual(json.loads(data), dict(pid=os.getpid(), a=1))
+ self.assertEqual(acl, [zc.zk.world_permission()])
+ self.assertEqual(flags, zookeeper.EPHEMERAL)
+
+ self.__zk.register_server('/foo', ('127.0.0.1', 8080), a=1)
+
+ @mock.patch('zookeeper.close')
+ @mock.patch('zookeeper.init')
+ @mock.patch('zookeeper.state')
+ @mock.patch('zookeeper.get_children')
+ def test_children(self, get_children, state, init, close):
+ state.side_effect = self.state_side_effect
+
+ path = '/test'
+ @side_effect(get_children)
+ def _(handle, path_, handler):
+ self.__handler = handler
+ self.assertEqual((handle, path_), (0, path))
+ return data
+
+ # Get the data the first time
+ data = []
+ children = self.__zk.children(path)
+ self.assertEqual(list(children), data)
+
+ # When tree updates, children are updated
+ data = ['a']
+ self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(list(children), data)
+
+ # callbacks are called too:
+ cb = children(mock.Mock())
+ cb.assert_called_with(children)
+ cb.reset_mock()
+ self.assertEqual(len(children.callbacks), 1)
+ data = ['a', 'b']
+ self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(list(children), data)
+ cb.assert_called_with(children)
+
+ # if a callback raises an exception, the exception is logged
+ # and callback is discarded
+ h = zope.testing.loggingsupport.Handler('zc.zk', level=logging.DEBUG)
+ h.install()
+ cb.side_effect = ValueError
+ data = ['a']
+ self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(list(children), data)
+ self.assertEqual(len(children.callbacks), 0)
+ self.assertEqual(h.records[0].name, 'zc.zk')
+ self.assertEqual(h.records[0].levelno, logging.ERROR)
+ h.clear()
+
+ # if a callback raises zc.zk.CancelWatch, the cancel is logged
+ # and callback is discarded
+ cb = children(mock.Mock())
+ self.assertEqual(len(children.callbacks), 1)
+ cb.side_effect = zc.zk.CancelWatch
+ data = []
+ self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(list(children), data)
+ self.assertEqual(len(children.callbacks), 0)
+ self.assertEqual(h.records[0].name, 'zc.zk')
+ self.assertEqual(h.records[0].levelno, logging.DEBUG)
+ h.clear()
+
+ h.uninstall()
+
+ # If a session expires, it will be reestablished with watches intact.
+ cb = children(mock.Mock())
+ self.__session_watcher(
+ 0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
+ close.assert_called_with(0)
+ self.assertEqual(self.__zk.handle, None)
+ data = ['test']
+ self.__session_watcher(
+ 0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, "")
+ self.assertEqual(list(children), data)
+ cb.assert_called_with(children)
+
+
+ @mock.patch('zookeeper.close')
+ @mock.patch('zookeeper.init')
+ @mock.patch('zookeeper.state')
+ @mock.patch('zookeeper.get')
+ def test_get_properties(self, get, state, init, close):
+ state.side_effect = self.state_side_effect
+
+ path = '/test'
+ @side_effect(get)
+ def _(handle, path_, handler):
+ self.__handler = handler
+ self.assertEqual((handle, path_), (0, path))
+ return json.dumps(data), {}
+
+ # Get the data the first time
+ data = {}
+ properties = self.__zk.properties(path)
+ self.assertEqual(dict(properties), data)
+
+ # When node updates, properties are updated
+ data = dict(a=1)
+ self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(dict(properties), data)
+
+ # callbacks are called too:
+ cb = properties(mock.Mock())
+ cb.assert_called_with(properties)
+ cb.reset_mock()
+ self.assertEqual(len(properties.callbacks), 1)
+ data = dict(a=1, b=2)
+ self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(dict(properties), data)
+ cb.assert_called_with(properties)
+
+ # if a callback raises an exception, the exception is logged
+ # and callback is discarded
+ h = zope.testing.loggingsupport.Handler('zc.zk', level=logging.DEBUG)
+ h.install()
+ cb.side_effect = ValueError
+ data = dict(a=1)
+ self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(dict(properties), data)
+ self.assertEqual(len(properties.callbacks), 0)
+ self.assertEqual(h.records[0].name, 'zc.zk')
+ self.assertEqual(h.records[0].levelno, logging.ERROR)
+ h.clear()
+
+ # if a callback raises zc.zk.CancelWatch, the cancel is logged
+ # and callback is discarded
+ cb = properties(mock.Mock())
+ self.assertEqual(len(properties.callbacks), 1)
+ cb.side_effect = zc.zk.CancelWatch
+ data = {}
+ self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+ path)
+ self.assertEqual(dict(properties), data)
+ self.assertEqual(len(properties.callbacks), 0)
+ self.assertEqual(h.records[0].name, 'zc.zk')
+ self.assertEqual(h.records[0].levelno, logging.DEBUG)
+ h.clear()
+
+ h.uninstall()
+
+ # If a session expires, it will be reestablished with watches intact.
+ cb = properties(mock.Mock())
+ self.__session_watcher(
+ 0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
+ close.assert_called_with(0)
+ self.assertEqual(self.__zk.handle, None)
+ data = dict(test=1)
+ self.__session_watcher(
+ 0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, "")
+ self.assertEqual(dict(properties), data)
+ cb.assert_called_with(properties)
+
+ @mock.patch('zookeeper.state')
+ @mock.patch('zookeeper.get')
+ @mock.patch('zookeeper.set')
+ def test_set_properties(self, set, get, state):
+ state.side_effect = self.state_side_effect
+
+ path = '/test'
+ @side_effect(get)
+ def _(handle, path_, handler):
+ self.__handler = handler
+ self.assertEqual((handle, path_), (0, path))
+ return json.dumps(data), {}
+
+ data = dict(a=1)
+ properties = self.__zk.properties(path)
+ self.assertEqual(dict(properties), data)
+
+ @side_effect(set)
+ def _(handle, path_, data):
+ self.__set_data = json.loads(data)
+ self.assertEqual((handle, path_), (0, path))
+
+ properties.update(b=2)
+ self.assertEqual(self.__set_data, dict(a=1, b=2))
+ self.assertEqual(dict(properties), self.__set_data)
+
+ properties.set(c=3)
+ self.assertEqual(self.__set_data, dict(c=3))
+ self.assertEqual(dict(properties), self.__set_data)
+
+ @mock.patch('zookeeper.state')
+ @mock.patch('zookeeper.get')
+ @mock.patch('zookeeper.set')
+ def test_special_values(self, set, get, state):
+ state.side_effect = self.state_side_effect
+
+ path = '/test'
+ @side_effect(get)
+ def _(handle, path_, handler):
+ self.__handler = handler
+ self.assertEqual((handle, path_), (0, path))
+ return data, {}
+
+ data = ''
+ properties = self.__zk.properties(path)
+ self.assertEqual(dict(properties), {})
+
+ data = 'xxx'
+ properties = self.__zk.properties(path)
+ self.assertEqual(dict(properties), dict(string_value='xxx'))
+
+ data = '{xxx}'
+ properties = self.__zk.properties(path)
+ self.assertEqual(dict(properties), dict(string_value='{xxx}'))
+
+ data = '\n{xxx}\n'
+ properties = self.__zk.properties(path)
+ self.assertEqual(dict(properties), dict(string_value='\n{xxx}\n'))
+
+ @side_effect(set)
+ def _(handle, path_, data):
+ self.__set_data = data
+ self.assertEqual((handle, path_), (0, path))
+
+ properties.set(b=2)
+ self.assertEqual(self.__set_data, '{"b": 2}')
+ properties.set()
+ self.assertEqual(self.__set_data, '')
+ properties.set(string_value='xxx')
+ self.assertEqual(self.__set_data, 'xxx')
+
+ @mock.patch('zookeeper.state')
+ @mock.patch('zookeeper.get')
+ @mock.patch('zookeeper.get_children')
+ def test_deleted_node_with_watchers(self, get_children, get, state):
+ state.side_effect = self.state_side_effect
+ path = '/test'
+ @side_effect(get)
+ def _(handle, path_, handler):
+ self.__get_handler = handler
+ return '{"a": 1}', {}
+ @side_effect(get_children)
+ def _(handle, path_, handler):
+ self.__child_handler = handler
+ return ['x']
+
+ children = self.__zk.children(path)
+ self.assertEqual(list(children), ['x'])
+ cb = children(mock.Mock())
+ cb.side_effect = lambda x: None
+ ccb = children(mock.Mock())
+ ccb.assert_called_with(children)
+
+ properties = self.__zk.properties(path)
+ self.assertEqual(dict(properties), dict(a=1))
+ cb = properties(mock.Mock())
+ cb.side_effect = lambda x: None
+ pcb = properties(mock.Mock())
+ pcb.assert_called_with(properties)
+
+ self.__get_handler(
+ 0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
+ self.assertEqual(dict(properties), {})
+ pcb.assert_called_with()
+
+ self.__child_handler(
+ 0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
+ self.assertEqual(list(children), [])
+ ccb.assert_called_with()
+
+def assert_(cond, mess=''):
+ if not cond:
+ print 'assertion failed: ', mess
+
+def setup(test):
+ test.globs['side_effect'] = side_effect
+ test.globs['assert_'] = assert_
+ for name in 'state', 'init', 'create', 'get', 'set', 'get_children':
+ cm = mock.patch('zookeeper.'+name)
+ test.globs[name] = cm.__enter__()
+ zope.testing.setupstack.register(test, cm.__exit__)
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(LoggingTests),
+ unittest.makeSuite(Tests),
+ doctest.DocFileSuite(
+ 'README.txt',
+ setUp=setup, tearDown=zope.testing.setupstack.tearDown
+ ),
+ ))
Property changes on: Sandbox/J1m/zc.zk/src/zc/zk/tests.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
More information about the checkins
mailing list