[Zope-CVS] SVN: ldapadapter/trunk/ Use simpler fakeldap recoded from the ground up. Improve tests.

Florent Guillaume fg at nuxeo.com
Sat Oct 9 12:54:44 EDT 2004


Log message for revision 27862:
  Use simpler fakeldap recoded from the ground up. Improve tests.


Changed:
  U   ldapadapter/trunk/README.txt
  D   ldapadapter/trunk/tests/FakeLDAP-COPYRIGHT.txt
  D   ldapadapter/trunk/tests/FakeLDAP-LICENSE.txt
  D   ldapadapter/trunk/tests/FakeLDAP.py
  A   ldapadapter/trunk/tests/fakeldap.py
  U   ldapadapter/trunk/tests/test_ldapadapter.py
  U   ldapadapter/trunk/utility.py


-=-
Modified: ldapadapter/trunk/README.txt
===================================================================
--- ldapadapter/trunk/README.txt	2004-10-09 16:19:22 UTC (rev 27861)
+++ ldapadapter/trunk/README.txt	2004-10-09 16:54:44 UTC (rev 27862)
@@ -32,13 +32,23 @@
 The fake implementation of ldap used for the tests returns a simpler
 result than a real server would:
 
-  >>> conn.search('o=test,dc=org', 'sub')
-  [(u'cn=yo', {'cn': [u'yo']})]
+  >>> conn.search('cn=yo,o=test,dc=org', 'sub')
+  [(u'cn=yo,o=test,dc=org', {'cn': [u'yo']})]
 
 Modify it.
 
-  XXX>>> conn.modify('cn=yo,o=test,dc=org', {'givenName': ['bob']})
+  >>> conn.modify('cn=yo,o=test,dc=org', {'givenName': ['bob']})
+  >>> conn.search('cn=yo,o=test,dc=org', 'sub')[0][1]['givenName']
+  [u'bob']
+  >>> conn.modify('cn=yo,o=test,dc=org', {'givenName': ['bob', 'john']})
+  >>> conn.search('cn=yo,o=test,dc=org', 'sub')[0][1]['givenName']
+  [u'bob', u'john']
+  >>> conn.modify('cn=yo,o=test,dc=org', {'givenName': []})
+  >>> conn.search('cn=yo,o=test,dc=org', 'sub')
+  [(u'cn=yo,o=test,dc=org', {'cn': [u'yo']})]
 
 Delete it.
 
   >>> conn.delete('cn=yo,o=test,dc=org')
+  >>> conn.search('cn=yo,o=test,dc=org', 'sub')
+  []

Deleted: ldapadapter/trunk/tests/FakeLDAP-COPYRIGHT.txt
===================================================================
--- ldapadapter/trunk/tests/FakeLDAP-COPYRIGHT.txt	2004-10-09 16:19:22 UTC (rev 27861)
+++ ldapadapter/trunk/tests/FakeLDAP-COPYRIGHT.txt	2004-10-09 16:54:44 UTC (rev 27862)
@@ -1,9 +0,0 @@
-Copyright (c) 2004 Jens Vagelpohl.
-All Rights Reserved.
-
-This software is subject to the provisions of the Zope Public License,
-Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-FOR A PARTICULAR PURPOSE.

Deleted: ldapadapter/trunk/tests/FakeLDAP-LICENSE.txt
===================================================================
--- ldapadapter/trunk/tests/FakeLDAP-LICENSE.txt	2004-10-09 16:19:22 UTC (rev 27861)
+++ ldapadapter/trunk/tests/FakeLDAP-LICENSE.txt	2004-10-09 16:54:44 UTC (rev 27862)
@@ -1,54 +0,0 @@
-Zope Public License (ZPL) Version 2.1
-
-A copyright notice accompanies this license document that
-identifies the copyright holders.
-
-This license has been certified as open source. It has also
-been designated as GPL compatible by the Free Software
-Foundation (FSF).
-
-Redistribution and use in source and binary forms, with or
-without modification, are permitted provided that the
-following conditions are met:
-
-1. Redistributions in source code must retain the
-   accompanying copyright notice, this list of conditions,
-   and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the accompanying
-   copyright notice, this list of conditions, and the
-   following disclaimer in the documentation and/or other
-   materials provided with the distribution.
-
-3. Names of the copyright holders must not be used to
-   endorse or promote products derived from this software
-   without prior written permission from the copyright
-   holders.
-
-4. The right to distribute this software or to use it for
-   any purpose does not give you the right to use
-   Servicemarks (sm) or Trademarks (tm) of the copyright
-   holders. Use of them is covered by separate agreement
-   with the copyright holders.
-
-5. If any files are modified, you must cause the modified
-   files to carry prominent notices stating that you changed
-   the files and the date of any change.
-
-Disclaimer
-
-  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS''
-  AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
-  NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-  AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN
-  NO EVENT SHALL THE COPYRIGHT HOLDERS BE
-  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
-  OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
-  DAMAGE.
-

Deleted: ldapadapter/trunk/tests/FakeLDAP.py
===================================================================
--- ldapadapter/trunk/tests/FakeLDAP.py	2004-10-09 16:19:22 UTC (rev 27861)
+++ ldapadapter/trunk/tests/FakeLDAP.py	2004-10-09 16:54:44 UTC (rev 27862)
@@ -1,266 +0,0 @@
-#####################################################################
-#
-# FakeLDAP      Fake LDAP interface to test LDAP functionality
-#               independently of a running LDAP server
-#
-# This software is governed by a license. See
-# FakeLDAP-LICENSE.txt, which is a copy of the LDAPUserFolder
-# LICENSE.txt,  for the terms of this license. 
-#
-#####################################################################
-__version__='$Revision: 1.7 $'[11:-2]
-
-import ldap, sha, base64, copy
-
-# Module-level stuff
-__version__ = '2.fake'
-
-SCOPE_BASE = getattr(ldap, 'SCOPE_BASE')
-SCOPE_ONELEVEL = getattr(ldap, 'SCOPE_ONELEVEL')
-SCOPE_SUBTREE = getattr(ldap, 'SCOPE_SUBTREE')
-
-MOD_ADD = getattr(ldap, 'MOD_ADD')
-MOD_REPLACE = getattr(ldap, 'MOD_REPLACE')
-MOD_DELETE = getattr(ldap, 'MOD_DELETE')
-
-OPT_PROTOCOL_VERSION = None
-OPT_REFERRALS = None
-VERSION2 = None
-VERSION3 = None
-
-class LDAPError(Exception): pass
-class SERVER_DOWN(Exception): pass
-class PROTOCOL_ERROR(Exception): pass
-class NO_SUCH_OBJECT(Exception): pass
-class INVALID_CREDENTIALS(Exception): pass
-class ALREADY_EXISTS(Exception): pass
-class SIZELIMIT_EXCEEDED(Exception): pass
-class PARTIAL_RESULTS(Exception): pass
-
-REFERRAL = None
-
-TREE = {}
-
-
-def initialize(conn_str):
-    """ Initialize a new connection """
-    return FakeLDAPConnection()
-
-def explode_dn(dn, *ign, **ignored):
-    """ Get a DN's elements """
-    return [x.strip() for x in dn.split(',')]
-
-def clearTree():
-    TREE.clear()
-
-def addTreeItems(dn):
-    """ Add structure directly to the tree given a DN """
-    elems = explode_dn(dn)
-    elems.reverse()
-    tree_pos = TREE
-
-    for elem in elems:
-        if not tree_pos.has_key(elem):
-            tree_pos[elem] = {}
-
-        tree_pos = tree_pos[elem]
-
-
-class FakeLDAPConnection:
-    
-    def __init__(self):
-        pass
-
-    def set_option(self, option, value):
-        pass
-
-    def simple_bind_s(self, binduid, bindpwd):
-        if binduid.find('Manager') != -1:
-            return 1
-
-        if bindpwd == '':
-            # Emulate LDAP mis-behavior
-            return 1
-
-        sha_obj = sha.new(bindpwd)
-        sha_dig = sha_obj.digest()
-        enc_bindpwd = '{SHA}%s' % base64.encodestring(sha_dig) 
-        enc_bindpwd = enc_bindpwd.strip()
-        rec = self.search_s(binduid)
-        rec_pwd = ''
-        for key, val_list in rec:
-            if key == 'userPassword':
-                rec_pwd = val_list[0]
-                break
-
-        if not rec_pwd:
-            raise INVALID_CREDENTIALS
-
-        if enc_bindpwd == rec_pwd:
-            return 1
-        else:
-            raise INVALID_CREDENTIALS
-
-
-    def search_s(self, base, scope=SCOPE_SUBTREE, query='(objectClass=*)', attrs=[]):
-        elems = explode_dn(base)
-        elems.reverse()
-        tree_pos = TREE
-
-        for elem in elems:
-            if tree_pos.has_key(elem):
-                tree_pos = tree_pos[elem]
-
-        if query == '(objectClass=*)':
-            if scope == SCOPE_BASE and tree_pos.get('dn', '') == base:
-                return (([base, tree_pos],))
-            else:
-                return tree_pos.items()
-
-        if query.find('objectClass=groupOfUniqueNames') != -1:
-            res = []
-            if query.find('uniqueMember=') == -1:
-                for key, vals in tree_pos.items():
-                    res.append(('%s,%s' % (key, base), vals))
-
-            else:
-                q_start = query.find('uniqueMember=') + 13
-                q_end = query.find(')', q_start)
-                q_val = query[q_start:q_end]
-                
-                for key, val in tree_pos.items():
-                    if ( val.has_key('uniqueMember') and 
-                         q_val in val['uniqueMember'] ):
-                        res.append(('%s,%s' % (key, base), val))
-
-            return res
-
-        elif query.find('unique') != -1:
-            res = []
-            if query.find('*') != -1:
-                for key, vals in tree_pos.items():
-                    res.append(('%s,%s' % (key, base), vals))
-            else:
-                q_start = query.lower().find('uniquemember=') + 13
-                q_end = query.find(')', q_start)
-                q_val = query[q_start:q_end]
-
-                for key, val in tree_pos.items():
-                    if ( val.has_key('uniqueMember') and
-                         q_val in val['uniqueMember'] ):
-                        res.append(('%s,%s' % (key, base), val))
-
-            return res
- 
-
-        else:
-            res = []
-            if query.startswith('('):
-                query = query[1:]
-
-            if query.endswith(')'):
-                query = query[:-1]
-
-            if query.startswith('&'):
-               # Convoluted query, gotta take it apart
-               query = query[2:]
-               query = query[:-1]
-               query_elems = query.split(')(')
-               query = query_elems[0]
-               query_elems.remove(query)
-
-            q_key, q_val = query.split('=')
-            for key, val in tree_pos.items():
-                if val.has_key(q_key) and q_val in val[q_key]:
-                    res.append(('%s,%s' % (key, base), val))
-
-            return res
-
-    def add_s(self, dn, attr_list):
-        elems = explode_dn(dn)
-        elems.reverse()
-        rdn = elems[-1]
-        base = elems[:-1]
-        tree_pos = TREE
-
-        for elem in base:
-            if tree_pos.has_key(elem):
-                tree_pos = tree_pos[elem]
-
-        if tree_pos.has_key(rdn):
-            raise ALREADY_EXISTS
-        else:
-            tree_pos[rdn] = {}
-            rec = tree_pos[rdn]
-
-            for key, val in attr_list:
-                rec[key] = val
-
-    def delete_s(self, dn):
-        elems = explode_dn(dn)
-        elems.reverse()
-        rdn = elems[-1]
-        base = elems[:-1]
-        tree_pos = TREE
-
-        for elem in base:
-            if tree_pos.has_key(elem):
-                tree_pos = tree_pos[elem] 
-
-        if tree_pos.has_key(rdn):
-            del tree_pos[rdn]
-
-    def modify_s(self, dn, mod_list):
-        elems = explode_dn(dn)
-        elems.reverse()
-        rdn = elems[-1]
-        base = elems[:-1]
-        tree_pos = TREE
-
-        for elem in base:
-            if tree_pos.has_key(elem):
-                tree_pos = tree_pos[elem]        
-
-        rec = copy.deepcopy(tree_pos.get(rdn))
-
-        for mod in mod_list:
-            if mod[0] == MOD_REPLACE:
-                rec[mod[1]] = mod[2]
-            elif mod[0] == MOD_ADD:
-                cur_val = rec[mod[1]]
-                cur_val.extend(mod[2])
-                rec[mod[1]] = cur_val
-            else:
-                if rec.has_key(mod[1]):
-                    cur_vals = rec[mod[1]]
-                    for removed in mod[2]:
-                        if removed in cur_vals:
-                            cur_vals.remove(removed)
-
-                    rec[mod[1]] = cur_vals
-
-        tree_pos[rdn] = rec
-
-    def modrdn_s(self, dn, new_rdn, *ign):
-        elems = explode_dn(dn)
-        elems.reverse()
-        rdn = elems[-1]
-        base = elems[:-1]
-        tree_pos = TREE
-
-        for elem in base:
-            if tree_pos.has_key(elem):
-                tree_pos = tree_pos[elem]
-
-        rec = tree_pos.get(rdn) 
-        
-        del tree_pos[rdn]
-        tree_pos[new_rdn] = rec
-
-
-class ldapobject:
-    class ReconnectLDAPObject(FakeLDAPConnection):
-        def __init__(self, *ignored):
-            pass
-
-

Added: ldapadapter/trunk/tests/fakeldap.py
===================================================================
--- ldapadapter/trunk/tests/fakeldap.py	2004-10-09 16:19:22 UTC (rev 27861)
+++ ldapadapter/trunk/tests/fakeldap.py	2004-10-09 16:54:44 UTC (rev 27862)
@@ -0,0 +1,199 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Fake LDAP module to test simple functionnality.
+
+$Id$
+"""
+
+import sha
+import base64
+from copy import deepcopy
+
+import ldap
+from ldap import OPT_PROTOCOL_VERSION
+from ldap import VERSION3
+from ldap import SCOPE_BASE
+from ldap import SCOPE_ONELEVEL
+from ldap import SCOPE_SUBTREE
+from ldap import MOD_ADD
+from ldap import MOD_DELETE
+from ldap import MOD_REPLACE
+
+class INVALID_CREDENTIALS(Exception): pass
+class ALREADY_EXISTS(Exception): pass
+class NO_SUCH_OBJECT(Exception): pass
+#class SERVER_DOWN(Exception): pass
+#class SIZELIMIT_EXCEEDED(Exception): pass
+
+
+the_data = {}
+# Data is a mapping of {dnl -> entry}, dnl is a tuple of rdns
+
+explode_dn = ldap.explode_dn
+
+
+class FakeLDAPObject(object):
+
+    def __init__(self):
+        pass
+
+    def set_option(self, option, value):
+        pass
+
+    def simple_bind_s(self, dn, password):
+        if dn.find('Manager') >= 0:
+            return 1
+
+        results = self.search_s(dn)
+        pwd = None
+        for key, values in results:
+            if key == 'userPassword':
+                pwd = values[0]
+                break
+
+        if pwd is None:
+            raise INVALID_CREDENTIALS
+
+        # Check password encoding
+        if pwd.startswith('{SHA}'):
+            s = sha.new(password)
+            enc = '{SHA}' + base64.encodestring(s.digest())
+        else:
+            enc = password
+
+        if pwd != enc:
+            raise INVALID_CREDENTIALS
+
+        return 1
+
+    def add_s(self, dn, attr_list):
+        dnl = tuple(dn.split(','))
+        if dnl in the_data:
+            raise ALREADY_EXISTS
+        entry = {}
+        for key, values in attr_list:
+            entry[key] = values
+        the_data[dnl] = entry
+
+    def delete_s(self, dn):
+        dnl = tuple(dn.split(','))
+        if dnl not in the_data:
+            raise NO_SUCH_OBJECT
+        del the_data[dnl]
+
+    def search_s(self, base, scope=SCOPE_SUBTREE, filter='(objectClass=*)',
+                 attrs=[]):
+        basel = tuple(base.split(','))
+        basellen = len(basel)
+
+        # Analyze the filter, assume simple query
+        orig_filter = filter
+        if not filter.startswith('(') or not filter.endswith(')'):
+            raise ValueError("Illegal filter syntax %s" % orig_filter)
+        filter = filter[1:-1]
+
+        # Assume simple filter
+        fkey, fval = filter.split('=')
+        if not fval:
+            raise ValueError("Illegal filter syntax %s" % orig_filter)
+
+        # Make objectclass=* work even on simple debug objects.
+        match_all = (filter.lower() == 'objectclass=*')
+
+        # Iterate on all entries and do the search
+        res = []
+        #import pdb; pdb.set_trace()
+        for dnl, entry in the_data.iteritems():
+            if dnl[-basellen:] != basel:
+                continue
+            dnllen = len(dnl)
+            if not ((scope == SCOPE_SUBTREE) or
+                    (scope == SCOPE_ONELEVEL and dnllen == basellen+1) or
+                    (scope == SCOPE_BASE and dnllen == basellen)):
+                continue
+            # Check filter match.
+            if not entry.has_key(fkey) and not match_all:
+                continue
+            values = entry.get(fkey, [])
+            ok = 0
+            if fval == '*' or match_all:
+                ok = 1
+            elif fval[0] == '*' and fval[-1] == '*':
+                fval = fval[1:-1]
+                for v in values:
+                    if fval in v:
+                        ok = 1
+                        break
+            elif fval[0] == '*':
+                fval = fval[1:]
+                for v in values:
+                    if v.endswith(fval):
+                        ok = 1
+                        break
+            elif fval[-1] == '*':
+                fval = fval[:-1]
+                for v in values:
+                    if v.startswith(fval):
+                        ok = 1
+                        break
+            else:
+                for v in values:
+                    if fval == v:
+                        ok = 1
+                        break
+            if not ok:
+                continue
+            dn = ','.join(dnl)
+            res.append((dn, deepcopy(entry)))
+        return res
+
+    def modify_s(self, dn, mod_list):
+        dnl = tuple(dn.split(','))
+        entry = the_data.get(dnl)
+        if entry is None:
+            raise NO_SUCH_OBJECT
+        for op, key, values in mod_list:
+            if op == MOD_ADD:
+                if not entry.has_key(key):
+                    if not values:
+                        raise ValueError("Illegal MOD_ADD of nothing")
+                    entry[key] = []
+                cur = entry[key]
+                for v in values:
+                    if v not in cur:
+                        cur.append(v)
+            elif op == MOD_DELETE:
+                if not entry.has_key(key):
+                    raise NO_SUCH_OBJECT # FIXME find real exception
+                if not values:
+                    cur = []
+                else:
+                    cur = entry[key]
+                    for v in values:
+                        try:
+                            cur.remove(v)
+                        except ValueError:
+                            raise NO_SUCH_OBJECT # FIXME
+                if not cur:
+                    del entry[key]
+            else: # op == MOD_REPLACE
+                if values:
+                    entry[key] = values
+                elif entry.has_key(key):
+                    del entry[ley]
+
+def initialize(conn_str):
+    """Initialize."""
+    return FakeLDAPObject()
+

Modified: ldapadapter/trunk/tests/test_ldapadapter.py
===================================================================
--- ldapadapter/trunk/tests/test_ldapadapter.py	2004-10-09 16:19:22 UTC (rev 27861)
+++ ldapadapter/trunk/tests/test_ldapadapter.py	2004-10-09 16:54:44 UTC (rev 27862)
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2004 Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -23,12 +23,8 @@
 from zope.app.event.tests.placelesssetup import getEvents
 
 
-def populateFake():
-    import FakeLDAP
-    FakeLDAP.addTreeItems('o=test,dc=org')
-
 def setUp(test):
-    import FakeLDAP
+    import fakeldap
     if sys.modules.has_key('_ldap'):
         test.old_uldap = sys.modules['_ldap']
         del sys.modules['_ldap']
@@ -39,9 +35,8 @@
         del sys.modules['ldap']
     else:
         test.old_ldap = None
-    sys.modules['ldap'] = FakeLDAP
+    sys.modules['ldap'] = fakeldap
     import ldap
-    populateFake()
 
 def tearDown(test):
     del sys.modules['ldap']

Modified: ldapadapter/trunk/utility.py
===================================================================
--- ldapadapter/trunk/utility.py	2004-10-09 16:19:22 UTC (rev 27861)
+++ ldapadapter/trunk/utility.py	2004-10-09 16:54:44 UTC (rev 27862)
@@ -134,7 +134,6 @@
                 values[:] = [unicode(v, 'utf-8') for v in values]
             results.append((dn, entry))
         return results
-
 
 
 class ManageableLDAPAdapter(LDAPAdapter, Persistent, Contained):



More information about the Zope-CVS mailing list