[Zope-Checkins] CVS: ZODB3/ZEO/tests - auth_plaintext.py:1.2.6.2 testAuth.py:1.3.6.2

Jeremy Hylton jeremy at zope.com
Fri Sep 19 15:01:59 EDT 2003


Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv25006/ZEO/tests

Modified Files:
      Tag: Zope-2_7-branch
	auth_plaintext.py testAuth.py 
Log Message:
Fix security problem in ZEO authentication code.

There were two serious bugs:

- The smac layer would accept a message without a MAC even after the
  session key was established.

- The client never initialized its session key, so it never checked
  incoming messages or created MACs for outgoing messags.

Fixed both, but still need to change the smac layer so that it has
separate HMAC objects for each end of the connection.


=== ZODB3/ZEO/tests/auth_plaintext.py 1.2.6.1 => 1.2.6.2 ===
--- ZODB3/ZEO/tests/auth_plaintext.py:1.2.6.1	Mon Sep 15 17:26:54 2003
+++ ZODB3/ZEO/tests/auth_plaintext.py	Fri Sep 19 15:01:28 2003
@@ -16,8 +16,8 @@
 password, and the SHA hashing is done on the server side.
 
 This mechanism offers *no network security at all*; the only security
-is provided by not storing plaintext passwords on disk.  (See the
-auth_srp module for a secure mechanism)"""
+is provided by not storing plaintext passwords on disk.
+"""
 
 import sha
 
@@ -25,6 +25,9 @@
 from ZEO.auth import register_module
 from ZEO.auth.base import Client, Database
 
+def session_key(username, realm, password):
+    return sha.new("%s:%s:%s" % (username, realm, password)).hexdigest()
+
 class StorageClass(ZEOStorage):
     def auth(self, username, password):
         try:
@@ -32,13 +35,21 @@
         except LookupError:
             return 0
 
-        password = sha.new(password).hexdigest()
-        return self.finish_auth(dbpw == password)
+        password_dig = sha.new(password).hexdigest()
+        if dbpw == password_dig:
+            self.connection.setSessionKey(session_key(username,
+                                                      self.database.realm,
+                                                      password))
+        return self.finish_auth(dbpw == password_dig)
+            
 
 class PlaintextClient(Client):
     extensions = ["auth"]
 
     def start(self, username, realm, password):
-        return self.stub.auth(username, password)
+        if self.stub.auth(username, password):
+            return session_key(username, realm, password)
+        else:
+            return None
 
 register_module("plaintext", StorageClass, PlaintextClient, Database)


=== ZODB3/ZEO/tests/testAuth.py 1.3.6.1 => 1.3.6.2 ===
--- ZODB3/ZEO/tests/testAuth.py:1.3.6.1	Mon Sep 15 17:26:54 2003
+++ ZODB3/ZEO/tests/testAuth.py	Fri Sep 19 15:01:28 2003
@@ -18,8 +18,11 @@
 import time
 import unittest
 
+import zLOG
+
 from ThreadedAsync import LoopCallback
 from ZEO.ClientStorage import ClientStorage
+from ZEO.Exceptions import ClientDisconnected
 from ZEO.StorageServer import StorageServer
 from ZEO.tests.ConnectionTests import CommonSetupTearDown
 
@@ -74,6 +77,8 @@
         self.assert_(self._storage._connection)
         self._storage._connection.poll()
         self.assert_(self._storage.is_connected())
+        # Make a call to make sure the mechanism is working
+        self._storage.versions()
 
     def testNOK(self):
         self._storage = self.openClientStorage(wait=0, username="foo",
@@ -82,6 +87,24 @@
         self.wait()
         # If the test established a connection, then it failed.
         self.failIf(self._storage._connection)
+
+    def testUnauthenticatedMessage(self):
+        # Test that an unauthenticated message is rejected by the server
+        # if it was sent after the connection was authenticated.
+        # Sleep for 0.2 seconds to give the server some time to start up
+        # seems to be needed before and after creating the storage
+        self._storage = self.openClientStorage(wait=0, username="foo",
+                                              password="bar", realm=self.realm)
+        self.wait()
+        self._storage.versions()
+        # Manually clear the state of the hmac connection
+        self._storage._connection._SizedMessageAsyncConnection__hmac = None
+        # Once the client stops using the hmac, it should be disconnected.
+        self.assertRaises(ClientDisconnected, self._storage.versions)
+
+    # Also need test to provoke message handling in different orders
+    # by client and server to demonstrate need for separate hmacs.
+        
 
 class PlainTextAuth(AuthTest):
     import ZEO.tests.auth_plaintext




More information about the Zope-Checkins mailing list