[Checkins] SVN: Sandbox/J1m/awsrecipes/src/zc/awsrecipes/ added recipe for creating volumes
jim
cvs-admin at zope.org
Fri Jul 6 20:11:09 UTC 2012
Log message for revision 127277:
added recipe for creating volumes
Changed:
A Sandbox/J1m/awsrecipes/src/zc/awsrecipes/
A Sandbox/J1m/awsrecipes/src/zc/awsrecipes/__init__.py
A Sandbox/J1m/awsrecipes/src/zc/awsrecipes/ebs.test
A Sandbox/J1m/awsrecipes/src/zc/awsrecipes/tests.py
-=-
Added: Sandbox/J1m/awsrecipes/src/zc/awsrecipes/__init__.py
===================================================================
--- Sandbox/J1m/awsrecipes/src/zc/awsrecipes/__init__.py (rev 0)
+++ Sandbox/J1m/awsrecipes/src/zc/awsrecipes/__init__.py 2012-07-06 20:11:04 UTC (rev 127277)
@@ -0,0 +1,21 @@
+import boto.ec2.connection
+
+
+class EBS:
+ def __init__(self, buildout, name, options):
+ self.size = options['size']
+ self.zone = options['zone']
+ self.vol_name = options['name']
+
+ self.conn = boto.ec2.connection.EC2Connection(
+ region=options['region']
+ )
+
+ def install(self):
+ '''Create a EBS volumen and set tags
+ '''
+ vol = self.conn.create_volume(int(self.size), self.zone)
+ self.conn.create_tags([vol.id], dict(Name=self.vol_name))
+ return ()
+
+ update = install
Added: Sandbox/J1m/awsrecipes/src/zc/awsrecipes/ebs.test
===================================================================
--- Sandbox/J1m/awsrecipes/src/zc/awsrecipes/ebs.test (rev 0)
+++ Sandbox/J1m/awsrecipes/src/zc/awsrecipes/ebs.test 2012-07-06 20:11:04 UTC (rev 127277)
@@ -0,0 +1,44 @@
+Recipe to create ebs volumes
+============================
+
+The recipe takes a the following options:
+
+region
+ AWS Region
+
+zone
+ The name of an aws zone.
+
+size
+ Size in GB
+
+name
+ A logical name for the volumes.
+
+Note that a volume is identified by name and an index.
+
+ >>> import zc.awsrecipes
+ >>> zc.awsrecipes.EBS({}, 'myvolume', dict(
+ ... region = 'funky',
+ ... zone = 'us-east-1b',
+ ... size = '50',
+ ... name = 'data',
+ ... )).install()
+ ()
+
+ >>> import boto.ec2.connection
+ >>> boto.ec2.connection.EC2Connection.assert_called_with(region='funky')
+ >>> conn = boto.ec2.connection.EC2Connection.return_value
+
+ >>> conn.create_volume.assert_called_with(50, 'us-east-1b')
+
+Now, if we look throug our ebs volumes, we should see 2 with a name of
+data:
+
+ >>> import boto.ec2.connection
+ >>> conn = conn = boto.ec2.connection.EC2Connection()
+ >>> for v in conn.get_all_volumes():
+ ... if v.tags['Name'] == 'data':
+ ... print v.size
+ 50
+
Added: Sandbox/J1m/awsrecipes/src/zc/awsrecipes/tests.py
===================================================================
--- Sandbox/J1m/awsrecipes/src/zc/awsrecipes/tests.py (rev 0)
+++ Sandbox/J1m/awsrecipes/src/zc/awsrecipes/tests.py 2012-07-06 20:11:04 UTC (rev 127277)
@@ -0,0 +1,66 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import doctest
+import unittest
+import mock
+from zope.testing import setupstack
+
+def side_effect(m, f=None):
+ if f is None:
+ return lambda f: side_effect(m, f)
+ m.side_effect = f
+
+def ebssetup(test):
+ volumes = []
+ EC2Connection = setupstack.context_manager(
+ test, mock.patch('boto.ec2.connection.EC2Connection'))
+
+ class Volume:
+ def __init__(self, id, size, zone):
+ self.id = id
+ self.size = size
+ self.zone = zone
+ self.tags = {}
+
+ @side_effect(EC2Connection.return_value.create_volume)
+ def create_volume(size, zone):
+ volume = Volume(str(len(volumes)), size, zone)
+ volumes.append(volume)
+ return volume
+
+ @side_effect(EC2Connection.return_value.create_tags)
+ def create_tags(ids, tags):
+ for v in volumes:
+ if v.id in ids:
+ v.tags.update(tags)
+
+ @side_effect(EC2Connection.return_value.get_all_volumes)
+ def get_all_volumes(ids=None):
+ if ids is None:
+ return list(volumes)
+ else:
+ return [v for v in volumes if v.id in ids]
+
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocFileSuite(
+ 'ebs.test',
+ setUp=ebssetup, tearDown=setupstack.tearDown),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
More information about the checkins
mailing list