[Checkins] SVN: Sandbox/J1m/zc.zk/ Initial

Jim Fulton jim at zope.com
Sun Nov 27 19:44:54 UTC 2011


Log message for revision 123488:
  Initial
  

Changed:
  U   Sandbox/J1m/zc.zk/buildout.cfg
  U   Sandbox/J1m/zc.zk/setup.py
  A   Sandbox/J1m/zc.zk/src/zc/zk/
  A   Sandbox/J1m/zc.zk/src/zc/zk/README.txt
  A   Sandbox/J1m/zc.zk/src/zc/zk/__init__.py
  A   Sandbox/J1m/zc.zk/src/zc/zk/tests.py

-=-
Modified: Sandbox/J1m/zc.zk/buildout.cfg
===================================================================
--- Sandbox/J1m/zc.zk/buildout.cfg	2011-11-27 18:47:08 UTC (rev 123487)
+++ Sandbox/J1m/zc.zk/buildout.cfg	2011-11-27 19:44:53 UTC (rev 123488)
@@ -1,12 +1,12 @@
 [buildout]
-develop = .
-parts = test py
+develop = . ../thread
+parts = test
 
-[test]
-recipe = zc.recipe.testrunner
-eggs = 
-
 [py]
 recipe = zc.recipe.egg
-eggs = ${test:eggs}
+eggs = zc.zk [test]
 interpreter = py
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = ${py:eggs}

Modified: Sandbox/J1m/zc.zk/setup.py
===================================================================
--- Sandbox/J1m/zc.zk/setup.py	2011-11-27 18:47:08 UTC (rev 123487)
+++ Sandbox/J1m/zc.zk/setup.py	2011-11-27 19:44:53 UTC (rev 123488)
@@ -11,15 +11,18 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-name, version = 'zc.', '0'
+name, version = 'zc.zk', '0'
 
-install_requires = ['setuptools']
-extras_require = dict(test=['zope.testing'])
+install_requires = ['setuptools', 'zc.thread']
+extras_require = dict(
+    test=['zope.testing', 'zookeeper-static', 'mock', 'pytest'])
 
 entry_points = """
 """
 
 from setuptools import setup
+import os
+readme = open(os.path.join('src', 'zc', 'zk', 'README.txt')).read()
 
 setup(
     author = 'Jim Fulton',
@@ -27,8 +30,8 @@
     license = 'ZPL 2.1',
 
     name = name, version = version,
-    long_description=open('README.txt').read(),
-    description = open('README.txt').read().strip().split('\n')[0],
+    long_description=readme,
+    description = readme.strip().split('\n')[0],
     packages = [name.split('.')[0], name],
     namespace_packages = [name.split('.')[0]],
     package_dir = {'': 'src'},

Added: Sandbox/J1m/zc.zk/src/zc/zk/README.txt
===================================================================
--- Sandbox/J1m/zc.zk/src/zc/zk/README.txt	                        (rev 0)
+++ Sandbox/J1m/zc.zk/src/zc/zk/README.txt	2011-11-27 19:44:53 UTC (rev 123488)
@@ -0,0 +1,316 @@
+ZooKeeper Helpers
+=================
+
+The zc.zk package provides some high-level interfaces to the low-level
+zookeeper extension.  It's not complete, in that it doesn't try, at
+this time, to be a complete high-level interface. Rather, it provides
+facilities we need to use Zookeeeper to services together:
+
+- ZODB database clients and servers
+- HTTP-based clients and services
+- Load balencers
+
+The current (initial) use cases are:
+
+- Register a server providing a service.
+- Get the addresses of servers providing a service.
+- Get abd set service configuration data.
+
+This package makes no effort to support Windows.  (Patches to support
+Windows might be accepted if they don't add much complexity.)
+
+.. contents::
+
+Installation
+------------
+
+You can install this as you would any other distribution. Note,
+however, that you must also install the Python ZooKeeper binding
+provided with ZooKeeper.  Because this binding is packaged a number of
+different ways, it isn't listed as a distribution requirement.
+
+Instantiating a ZooKeeper helper
+--------------------------------
+
+To use the helper API, create a ZooKeeper instance:
+
+.. test
+
+    >>> import zookeeper
+    >>> @side_effect(init)
+    ... def _(addr, func):
+    ...     global session_watch
+    ...     session_watch = func
+    ...     func(0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+    ...     assert_(addr=='zookeeper.example.com:2181', addr)
+
+    >>> @side_effect(state)
+    ... def _(handle):
+    ...     assert_(handle==0)
+    ...     return zookeeper.CONNECTED_STATE
+
+::
+
+    >>> import zc.zk
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+
+The ZooKeeper constructor takes a ZooKeeper connection string, which is a
+comma-separated list of addresses of the form HOST:PORT.  It defaults
+to '127.0.0.1:2181', which is convenient during development.
+
+Register a server providing a service.
+--------------------------------------
+
+To register a server, use the ``register_server`` method, which takes
+a service path and the address a server is listing on
+
+.. test
+
+    >>> import os, json, zookeeper
+    >>> path = '/fooservice/servers'
+    >>> addrs = []
+    >>> child_handler = None
+    >>> @side_effect(create)
+    ... def _(handle, path_, data, acl, flags):
+    ...     assert_(handle == 0)
+    ...     path_, addr = path_.rsplit('/', 1)
+    ...     assert_(path_ == path)
+    ...     assert_(json.loads(data) == dict(pid=os.getpid()))
+    ...     addrs.append(addr)
+    ...     assert_(acl == [zc.zk.world_permission()])
+    ...     assert_(flags == zookeeper.EPHEMERAL)
+    ...     global child_handler
+    ...     if child_handler is not None:
+    ...         child_handler(handle, zookeeper.CHILD_EVENT,
+    ...                       zookeeper.CONNECTED_STATE, path_)
+    ...         child_handler = None
+
+::
+
+    >>> zk.register_server('/fooservice/servers', ('192.168.0.42', 8080))
+
+
+``register_server`` creates a read-only ephemeral ZooKeeper node as a
+child of the given service path.  The name of the new node is the
+given address. This allows clients to get the list of addresses by
+just getting the list of the names of children of the service path.
+
+Ephemeral nodes have the useful property that they're automatically
+removed when a ZooKeeper session is closed or when the process
+containing it dies.  De-deregistration is automatic.
+
+When registering a server, you can optionally provide server (node)
+data as additional keyword arguments to register_server.  By default,
+the process id is set as the ``pid`` server key.  This is useful to
+tracking down the server process.
+
+Get the addresses of servers providing a service.
+-------------------------------------------------
+
+Getting the adresses providing a service is accomplished by getting the
+children of a service node.
+
+.. test
+
+    >>> @side_effect(get_children)
+    ... def _(handle, path, handler):
+    ...     assert_(handle == 0, handle)
+    ...     assert_(path == '/fooservice/servers', path)
+    ...     global child_handler
+    ...     child_handler = handler
+    ...     return addrs
+
+::
+
+    >>> addresses = zk.children('/fooservice/servers')
+    >>> sorted(addresses)
+    ['192.168.0.42:8080']
+
+The ``children`` method returns an iterable of names of child nodes of
+the node specified by the given path.  The iterable is automatically
+updated when new servers are registered::
+
+    >>> zk.register_server('/fooservice/servers', ('192.168.0.42', 8081))
+    >>> sorted(addresses)
+    ['192.168.0.42:8080', '192.168.0.42:8081']
+
+You can call the iterable with a callback function that is called
+whenenever the list of children changes::
+
+    >>> @zk.children('/fooservice/servers')
+    ... def addresses_updated(addresses):
+    ...     print 'addresses changed'
+    ...     print sorted(addresses)
+    addresses changed
+    ['192.168.0.42:8080', '192.168.0.42:8081']
+
+The callback is called immediately with the children.  When we add
+another child, it'll be called again::
+
+    >>> zk.register_server('/fooservice/servers', ('192.168.0.42', 8082))
+    addresses changed
+    ['192.168.0.42:8080', '192.168.0.42:8081', '192.168.0.42:8082']
+
+Get service configuration data.
+-------------------------------
+
+You get service configuration data by getting data associated with a
+ZooKeeper node.  The interface for getting data is similar to the
+interface for getting children:
+
+
+.. test
+
+    >>> node_data = json.dumps(dict(
+    ...     database = "/databases/foomain",
+    ...     threads = 1,
+    ...     favorite_color= "red"))
+    >>> @side_effect(get)
+    ... def _(handle, path, handler):
+    ...     assert_(handle == 0)
+    ...     assert_(path == '/fooservice')
+    ...     global get_handler
+    ...     get_handler = handler
+    ...     return node_data, {}
+
+::
+
+    >>> data = zk.properties('/fooservice')
+    >>> data['database']
+    u'/databases/foomain'
+    >>> data['threads']
+    1
+
+The ``properties`` method returns a mapping object that provides access to
+node data.  (ZooKeeper only stores string data for nodes. ``zc.zk``
+provides a higher-level data interface by storing JSON strings.)
+
+The properties objects can be called with callback functions and used
+as function decorators to get update notification:
+
+    >>> @zk.properties('/fooservice')
+    ... def data_updated(data):
+    ...     print 'data updated'
+    ...     for item in sorted(data.items()):
+    ...         print '%s: %r' % item
+    data updated
+    database: u'/databases/foomain'
+    favorite_color: u'red'
+    threads: 1
+
+The callback is called immediately. It'll also be called when data are
+updated.
+
+Updating node data
+------------------
+
+You can't set data properties, but you can update data by calling it's
+``update`` method:
+
+.. test
+
+    >>> @side_effect(set)
+    ... def _(handle, path, data):
+    ...     global node_data
+    ...     node_data = data
+    ...     get_handler(handle, zookeeper.CHANGED_EVENT,
+    ...                 zookeeper.CONNECTED_STATE, path)
+
+::
+
+    >>> data.update(threads=2, secret='123')
+    data updated
+    database: u'/databases/foomain'
+    favorite_color: u'red'
+    secret: u'123'
+    threads: 2
+
+or by calling it's ``set`` method, which removes keys not listed::
+
+    >>> data.set(threads=3, secret='1234')
+    data updated
+    secret: u'1234'
+    threads: 3
+
+ZooKeeper Session Management
+----------------------------
+
+``zc.zk`` takes care of ZooKeeper session management for you. It
+establishes and, if necessary, reestablishes sessions for you.  In
+particular, it takes care of reestablishing ZooKeeper watches when a
+session is reestablished.
+
+ZooKeeper logging
+-----------------
+
+``zc.zk`` bridges the low-level ZooKeeper logging API and the Python
+logging API.  ZooKeeper log messages are forwarded to the Python
+``'ZooKeeper'`` logger.
+
+``zc.zk.ZooKeeper``
+-------------------
+
+``zc.zk.ZooKeeper(connection_string)``
+    Return a new instance given a ZooKeeper connection string.
+
+``children(path)``
+   Return a `zc.zk.Children`_ for the path.
+
+``properties(path)``
+   Return a `zc.zk.Properties`_ for the path.
+
+``handle``
+    The ZooKeeper session handle
+
+    This attribute can be used to call the lower-level API provided by
+    the ``zookeeper`` extension.
+
+``register_server(path, address, **data)``
+    Register a server at a path with the address.
+
+    An ephemeral child node of ``path`` will be created with name equal
+    to the string representation (HOST:PORT) of the given address.
+
+    ``address`` must be a host and port tuple.
+
+    Optional node properties can be provided as keyword arguments.
+
+zc.zk.Children
+--------------
+
+``__iter__()``
+    Return an iterator over the child names.
+
+``__call__(callable)``
+    Register a callback to be called whenever a child node is added or
+    removed.
+
+    The callback is passed the children instance when a child node is
+    added or removed.
+
+zc.zk.Properties
+----------------
+
+Properties objects provide the usual read-only mapping methods,
+__getitem__, __len__, etc..
+
+``set(**properties)``
+   Set the properties for the node, replacing existing data.
+
+``update(**properties)``
+   Update the properties for the node.
+
+``__call__(callable)``
+    Register a callback to be called whenever a node's properties are changed.
+
+    The callback is passed the properties instance when properties are
+    changed.
+
+Node deletion
+-------------
+
+If a node is deleted and ``Children`` or ``Properties`` instances have
+been created for it, the instances' data will be cleared.  Attempts to
+update properties will fail.  If callbacks have been registered, they
+will be called without arguments, if possible.  It would be bad, in
+practice, to remove a node that processes are watching.


Property changes on: Sandbox/J1m/zc.zk/src/zc/zk/README.txt
___________________________________________________________________
Added: svn:eol-style
   + native

Added: Sandbox/J1m/zc.zk/src/zc/zk/__init__.py
===================================================================
--- Sandbox/J1m/zc.zk/src/zc/zk/__init__.py	                        (rev 0)
+++ Sandbox/J1m/zc.zk/src/zc/zk/__init__.py	2011-11-27 19:44:53 UTC (rev 123488)
@@ -0,0 +1,262 @@
+import collections
+import json
+import logging
+import os
+import sys
+import threading
+import zc.thread
+import zookeeper
+
+logger = logging.getLogger(__name__)
+
+ at zc.thread.Thread
+def loggingthread():
+    r, w = os.pipe()
+    zookeeper.set_log_stream(os.fdopen(w, 'w'))
+    log = logging.getLogger('ZooKeeper').log
+    f = os.fdopen(r)
+    levels = dict(ZOO_INFO = logging.INFO,
+                  ZOO_WARN = logging.WARNING,
+                  ZOO_ERROR = logging.ERROR,
+                  ZOO_DEBUG = logging.DEBUG,
+                  )
+    while 1:
+        line = f.readline().strip()
+        try:
+            if '@' in line:
+                level, message = line.split('@', 1)
+                level = levels.get(level.split(':')[-1])
+            else:
+                level = None
+
+            if level is None:
+                log(logging.INFO, line)
+            else:
+                log(level, message)
+        except Exception, v:
+            logging.getLogger('ZooKeeper').exception("Logging error: %s", v)
+
+
+def parse_addr(addr):
+    host, port = addr.split(':')
+    return host, int(port)
+
+def world_permission(perms=zookeeper.PERM_READ):
+    return dict(perms=perms, scheme='world', id='anyone')
+
+class CancelWatch(Exception):
+    pass
+
+class ZooKeeper:
+
+    def __init__(self, zkaddr=2181):
+        if isinstance(zkaddr, int):
+            zkaddr = "127.0.0.1:%s" % zkaddr
+        self.zkaddr = zkaddr
+        self.watches = set()
+        self.connected = threading.Event()
+        zookeeper.init(zkaddr, self._watch_session)
+        self.connected.wait()
+
+    handle = None
+    def _watch_session(self, handle, event_type, state, path):
+        assert event_type == zookeeper.SESSION_EVENT
+        assert not path
+        if state == zookeeper.CONNECTED_STATE:
+            if self.handle is None:
+                self.handle = handle
+                if self.watches:
+                    # reestablish after session reestablished
+                    watches = self.watches
+                    self.watches = set()
+                    for watch in watches:
+                        self._watch(watch, False)
+            else:
+                assert handle == self.handle
+            self.connected.set()
+            logger.info('connected %s', handle)
+        elif state == zookeeper.CONNECTING_STATE:
+            self.connected.clear()
+        elif state == zookeeper.EXPIRED_SESSION_STATE:
+            self.connected.clear()
+            zookeeper.close(self.handle)
+            self.handle = None
+            zookeeper.init(self.zkaddr, self._watch_session)
+        else:
+            logger.critical('unexpected session event %s %s', handle, state)
+
+    def register_server(self, path, addr, **kw):
+        kw['pid'] = os.getpid()
+        self.connected.wait()
+        zookeeper.create(self.handle, path + '/%s:%s' % addr, json.dumps(kw),
+                         [world_permission()], zookeeper.EPHEMERAL)
+
+    def _watch(self, watch, wait=True):
+        if wait:
+            self.connected.wait()
+        self.watches.add(watch)
+
+        def handler(h, t, state, p):
+            if watch not in self.watches:
+                return
+            assert h == self.handle
+            assert state == zookeeper.CONNECTED_STATE
+            assert p == watch.path
+            if t == zookeeper.DELETED_EVENT:
+                watch._deleted()
+                self.watches.remove(watch)
+            else:
+                assert t == watch.event_type
+                zkfunc = getattr(zookeeper, watch.zkfunc)
+                watch._notify(zkfunc(self.handle, watch.path, handler))
+
+        handler(self.handle, watch.event_type, self.state, watch.path)
+
+    def children(self, path):
+        return Children(self, path)
+
+    def properties(self, path):
+        return Properties(self, path)
+
+    def _set(self, path, data):
+        self.connected.wait()
+        return zookeeper.set(self.handle, path, data)
+
+    def print_tree(self, path='/', indent=0):
+        self.connected.wait()
+        prefix = ' '*indent
+        print prefix + path.split('/')[-1]+'/'
+        indent += 2
+        prefix += '  '
+        data = zookeeper.get(self.handle, path)[0].strip()
+        if data:
+            if data.startswith('{') and data.endswith('}'):
+                data = json.loads(data)
+                import pprint
+                print prefix+pprint.pformat(data).replace(
+                    '\n', prefix+'\n')
+            else:
+                print prefix + repr(data)
+        for p in zookeeper.get_children(self.handle, path):
+            if not path.endswith('/'):
+                p = '/'+p
+            self.print_tree(path+p, indent)
+
+    def close(self):
+        zookeeper.close(self.handle)
+        del self.handle
+
+    @property
+    def state(self):
+        if self.handle is None:
+            return zookeeper.CONNECTING_STATE
+        return zookeeper.state(self.handle)
+
+
+class NodeInfo:
+
+    def __init__(self, session, path):
+        self.session = session
+        self.path = path
+        self.callbacks = set()
+        session._watch(self)
+
+    def setData(self, data):
+        self.data = data
+
+    deleted = False
+    def _deleted(self):
+        self.deleted = True
+        self.data = {}
+        for callback in self.callbacks:
+            try:
+                callback()
+            except TypeError:
+                pass
+            except:
+                logger.exception('Error %r calling %r', self, callback)
+
+    def __repr__(self):
+        return "%s.%s(%s, %s)" % (
+            self.__class__.__module__, self.__class__.__name__,
+            self.session.handle, self.path)
+
+    def _notify(self, data):
+        self.setData(data)
+        for callback in list(self.callbacks):
+            try:
+                callback(self)
+            except Exception, v:
+                self.callbacks.remove(callback)
+                if isinstance(v, CancelWatch):
+                    logger.debug("cancelled watch(%r, %r)", self, callback)
+                else:
+                    logger.exception("watch(%r, %r)", self, callback)
+
+    def __call__(self, func):
+        func(self)
+        self.callbacks.add(func)
+        return func
+
+    def __iter__(self):
+        return iter(self.data)
+
+class Children(NodeInfo):
+
+    event_type = zookeeper.CHILD_EVENT
+    zkfunc = 'get_children'
+
+class Properties(NodeInfo, collections.Mapping):
+
+    event_type = zookeeper.CHANGED_EVENT
+    zkfunc = 'get'
+
+    def setData(self, data):
+        sdata, self.meta_data = data
+        s = sdata.strip()
+        if not s:
+            data = {}
+        elif s.startswith('{') and s.endswith('}'):
+            try:
+                data = json.loads(s)
+            except:
+                logger.exception('bad json data in node at %r', self.path)
+                data = dict(string_value = sdata)
+        else:
+            data = dict(string_value = sdata)
+
+        self.data = data
+
+    def __getitem__(self, key):
+        return self.data[key]
+
+    def __len__(self):
+        return len(self.data)
+
+    def __contains__(self, key):
+        return key in self.data
+
+    def copy(self):
+        return self.data.copy()
+
+    def _set(self, data):
+        if not data:
+            sdata = ''
+        elif len(data) == 1 and 'string_value' in data:
+            sdata = data['string_value']
+        else:
+            sdata = json.dumps(data)
+        self.data = data
+        zookeeper.set(self.session.handle, self.path, sdata)
+
+    def set(self, **data):
+        self._set(data)
+
+    def update(self, **updates):
+        data = self.data.copy()
+        data.update(updates)
+        self._set(data)
+
+    def __hash__(self):
+        # Gaaaa, collections.Mapping
+        return hash(id(self))


Property changes on: Sandbox/J1m/zc.zk/src/zc/zk/__init__.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: Sandbox/J1m/zc.zk/src/zc/zk/tests.py
===================================================================
--- Sandbox/J1m/zc.zk/src/zc/zk/tests.py	                        (rev 0)
+++ Sandbox/J1m/zc.zk/src/zc/zk/tests.py	2011-11-27 19:44:53 UTC (rev 123488)
@@ -0,0 +1,361 @@
+import doctest
+import json
+import logging
+import mock
+import os
+import StringIO
+import time
+import zc.zk
+import zc.thread
+import zookeeper
+import zope.testing.loggingsupport
+import zope.testing.setupstack
+import unittest
+
+def wait_until(func, timeout=9):
+    if func():
+        return
+    deadline = time.time()+timeout
+    while not func():
+        time.sleep(.01)
+        if time.time() > deadline:
+            raise AssertionError('timeout')
+
+class LoggingTests(unittest.TestCase):
+
+    def test_logging(self):
+        logger = logging.getLogger('ZooKeeper')
+        f = StringIO.StringIO()
+        h = logging.StreamHandler(f)
+        logger.addHandler(h)
+        logger.setLevel(logging.ERROR)
+        handle = zookeeper.init('zookeeper.example.com:2181')
+        wait_until(lambda : 'error' in f.getvalue())
+        zookeeper.close(handle)
+        logger.setLevel(logging.NOTSET)
+        logger.removeHandler(h)
+
+def side_effect(mock):
+    return lambda func: setattr(mock, 'side_effect', func)
+
+class Tests(unittest.TestCase):
+
+    @mock.patch('zookeeper.init')
+    def setUp(self, init):
+        @zc.thread.Thread
+        def getzk():
+            zk = zc.zk.ZooKeeper()
+            return zk
+
+        wait_until(lambda : init.call_args)
+        (zkaddr, self.__session_watcher), kw = init.call_args
+        self.assertEqual((zkaddr, kw), ('127.0.0.1:2181', {}))
+        self.__session_watcher(
+            0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+        getzk.join(1)
+        self.__zk = getzk.value
+        self.assertEqual(self.__zk.handle, 0)
+
+    def state_side_effect(self, handle):
+        self.assertEqual(handle, self.__zk.handle)
+        return zookeeper.CONNECTED_STATE
+
+    @mock.patch('zookeeper.create')
+    def test_register_server(self, create):
+        @side_effect(create)
+        def _(handle, path_, data, acl, flags):
+            self.assertEqual((handle, path_), (0, '/foo/127.0.0.1:8080'))
+            self.assertEqual(json.loads(data), dict(pid=os.getpid(), a=1))
+            self.assertEqual(acl, [zc.zk.world_permission()])
+            self.assertEqual(flags, zookeeper.EPHEMERAL)
+
+        self.__zk.register_server('/foo', ('127.0.0.1', 8080), a=1)
+
+    @mock.patch('zookeeper.close')
+    @mock.patch('zookeeper.init')
+    @mock.patch('zookeeper.state')
+    @mock.patch('zookeeper.get_children')
+    def test_children(self, get_children, state, init, close):
+        state.side_effect = self.state_side_effect
+
+        path = '/test'
+        @side_effect(get_children)
+        def _(handle, path_, handler):
+            self.__handler = handler
+            self.assertEqual((handle, path_), (0, path))
+            return data
+
+        # Get the data the first time
+        data = []
+        children = self.__zk.children(path)
+        self.assertEqual(list(children), data)
+
+        # When tree updates, children are updated
+        data = ['a']
+        self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+                       path)
+        self.assertEqual(list(children), data)
+
+        # callbacks are called too:
+        cb = children(mock.Mock())
+        cb.assert_called_with(children)
+        cb.reset_mock()
+        self.assertEqual(len(children.callbacks), 1)
+        data = ['a', 'b']
+        self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+                       path)
+        self.assertEqual(list(children), data)
+        cb.assert_called_with(children)
+
+        # if a callback raises an exception, the exception is logged
+        # and callback is discarded
+        h = zope.testing.loggingsupport.Handler('zc.zk', level=logging.DEBUG)
+        h.install()
+        cb.side_effect = ValueError
+        data = ['a']
+        self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+                       path)
+        self.assertEqual(list(children), data)
+        self.assertEqual(len(children.callbacks), 0)
+        self.assertEqual(h.records[0].name, 'zc.zk')
+        self.assertEqual(h.records[0].levelno, logging.ERROR)
+        h.clear()
+
+        # if a callback raises zc.zk.CancelWatch, the cancel is logged
+        # and callback is discarded
+        cb = children(mock.Mock())
+        self.assertEqual(len(children.callbacks), 1)
+        cb.side_effect = zc.zk.CancelWatch
+        data = []
+        self.__handler(0, zookeeper.CHILD_EVENT, zookeeper.CONNECTED_STATE,
+                       path)
+        self.assertEqual(list(children), data)
+        self.assertEqual(len(children.callbacks), 0)
+        self.assertEqual(h.records[0].name, 'zc.zk')
+        self.assertEqual(h.records[0].levelno, logging.DEBUG)
+        h.clear()
+
+        h.uninstall()
+
+        # If a session expires, it will be reestablished with watches intact.
+        cb = children(mock.Mock())
+        self.__session_watcher(
+            0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
+        close.assert_called_with(0)
+        self.assertEqual(self.__zk.handle, None)
+        data = ['test']
+        self.__session_watcher(
+            0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, "")
+        self.assertEqual(list(children), data)
+        cb.assert_called_with(children)
+
+
+    @mock.patch('zookeeper.close')
+    @mock.patch('zookeeper.init')
+    @mock.patch('zookeeper.state')
+    @mock.patch('zookeeper.get')
+    def test_get_properties(self, get, state, init, close):
+        state.side_effect = self.state_side_effect
+
+        path = '/test'
+        @side_effect(get)
+        def _(handle, path_, handler):
+            self.__handler = handler
+            self.assertEqual((handle, path_), (0, path))
+            return json.dumps(data), {}
+
+        # Get the data the first time
+        data = {}
+        properties = self.__zk.properties(path)
+        self.assertEqual(dict(properties), data)
+
+        # When node updates, properties are updated
+        data = dict(a=1)
+        self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+                     path)
+        self.assertEqual(dict(properties), data)
+
+        # callbacks are called too:
+        cb = properties(mock.Mock())
+        cb.assert_called_with(properties)
+        cb.reset_mock()
+        self.assertEqual(len(properties.callbacks), 1)
+        data = dict(a=1, b=2)
+        self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+                     path)
+        self.assertEqual(dict(properties), data)
+        cb.assert_called_with(properties)
+
+        # if a callback raises an exception, the exception is logged
+        # and callback is discarded
+        h = zope.testing.loggingsupport.Handler('zc.zk', level=logging.DEBUG)
+        h.install()
+        cb.side_effect = ValueError
+        data = dict(a=1)
+        self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+                     path)
+        self.assertEqual(dict(properties), data)
+        self.assertEqual(len(properties.callbacks), 0)
+        self.assertEqual(h.records[0].name, 'zc.zk')
+        self.assertEqual(h.records[0].levelno, logging.ERROR)
+        h.clear()
+
+        # if a callback raises zc.zk.CancelWatch, the cancel is logged
+        # and callback is discarded
+        cb = properties(mock.Mock())
+        self.assertEqual(len(properties.callbacks), 1)
+        cb.side_effect = zc.zk.CancelWatch
+        data = {}
+        self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
+                     path)
+        self.assertEqual(dict(properties), data)
+        self.assertEqual(len(properties.callbacks), 0)
+        self.assertEqual(h.records[0].name, 'zc.zk')
+        self.assertEqual(h.records[0].levelno, logging.DEBUG)
+        h.clear()
+
+        h.uninstall()
+
+        # If a session expires, it will be reestablished with watches intact.
+        cb = properties(mock.Mock())
+        self.__session_watcher(
+            0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
+        close.assert_called_with(0)
+        self.assertEqual(self.__zk.handle, None)
+        data = dict(test=1)
+        self.__session_watcher(
+            0, zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, "")
+        self.assertEqual(dict(properties), data)
+        cb.assert_called_with(properties)
+
+    @mock.patch('zookeeper.state')
+    @mock.patch('zookeeper.get')
+    @mock.patch('zookeeper.set')
+    def test_set_properties(self, set, get, state):
+        state.side_effect = self.state_side_effect
+
+        path = '/test'
+        @side_effect(get)
+        def _(handle, path_, handler):
+            self.__handler = handler
+            self.assertEqual((handle, path_), (0, path))
+            return json.dumps(data), {}
+
+        data = dict(a=1)
+        properties = self.__zk.properties(path)
+        self.assertEqual(dict(properties), data)
+
+        @side_effect(set)
+        def _(handle, path_, data):
+            self.__set_data = json.loads(data)
+            self.assertEqual((handle, path_), (0, path))
+
+        properties.update(b=2)
+        self.assertEqual(self.__set_data, dict(a=1, b=2))
+        self.assertEqual(dict(properties), self.__set_data)
+
+        properties.set(c=3)
+        self.assertEqual(self.__set_data, dict(c=3))
+        self.assertEqual(dict(properties), self.__set_data)
+
+    @mock.patch('zookeeper.state')
+    @mock.patch('zookeeper.get')
+    @mock.patch('zookeeper.set')
+    def test_special_values(self, set, get, state):
+        state.side_effect = self.state_side_effect
+
+        path = '/test'
+        @side_effect(get)
+        def _(handle, path_, handler):
+            self.__handler = handler
+            self.assertEqual((handle, path_), (0, path))
+            return data, {}
+
+        data = ''
+        properties = self.__zk.properties(path)
+        self.assertEqual(dict(properties), {})
+
+        data = 'xxx'
+        properties = self.__zk.properties(path)
+        self.assertEqual(dict(properties), dict(string_value='xxx'))
+
+        data = '{xxx}'
+        properties = self.__zk.properties(path)
+        self.assertEqual(dict(properties), dict(string_value='{xxx}'))
+
+        data = '\n{xxx}\n'
+        properties = self.__zk.properties(path)
+        self.assertEqual(dict(properties), dict(string_value='\n{xxx}\n'))
+
+        @side_effect(set)
+        def _(handle, path_, data):
+            self.__set_data = data
+            self.assertEqual((handle, path_), (0, path))
+
+        properties.set(b=2)
+        self.assertEqual(self.__set_data, '{"b": 2}')
+        properties.set()
+        self.assertEqual(self.__set_data, '')
+        properties.set(string_value='xxx')
+        self.assertEqual(self.__set_data, 'xxx')
+
+    @mock.patch('zookeeper.state')
+    @mock.patch('zookeeper.get')
+    @mock.patch('zookeeper.get_children')
+    def test_deleted_node_with_watchers(self, get_children, get, state):
+        state.side_effect = self.state_side_effect
+        path = '/test'
+        @side_effect(get)
+        def _(handle, path_, handler):
+            self.__get_handler = handler
+            return '{"a": 1}', {}
+        @side_effect(get_children)
+        def _(handle, path_, handler):
+            self.__child_handler = handler
+            return ['x']
+
+        children = self.__zk.children(path)
+        self.assertEqual(list(children), ['x'])
+        cb = children(mock.Mock())
+        cb.side_effect = lambda x: None
+        ccb = children(mock.Mock())
+        ccb.assert_called_with(children)
+
+        properties = self.__zk.properties(path)
+        self.assertEqual(dict(properties), dict(a=1))
+        cb = properties(mock.Mock())
+        cb.side_effect = lambda x: None
+        pcb = properties(mock.Mock())
+        pcb.assert_called_with(properties)
+
+        self.__get_handler(
+            0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
+        self.assertEqual(dict(properties), {})
+        pcb.assert_called_with()
+
+        self.__child_handler(
+            0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
+        self.assertEqual(list(children), [])
+        ccb.assert_called_with()
+
+def assert_(cond, mess=''):
+    if not cond:
+        print 'assertion failed: ', mess
+
+def setup(test):
+    test.globs['side_effect'] = side_effect
+    test.globs['assert_'] = assert_
+    for name in 'state', 'init', 'create', 'get', 'set', 'get_children':
+        cm = mock.patch('zookeeper.'+name)
+        test.globs[name] = cm.__enter__()
+        zope.testing.setupstack.register(test, cm.__exit__)
+
+def test_suite():
+    return unittest.TestSuite((
+        unittest.makeSuite(LoggingTests),
+        unittest.makeSuite(Tests),
+        doctest.DocFileSuite(
+            'README.txt',
+            setUp=setup, tearDown=zope.testing.setupstack.tearDown
+            ),
+        ))


Property changes on: Sandbox/J1m/zc.zk/src/zc/zk/tests.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native



More information about the checkins mailing list