A few more changes to the smoeketests. Allows smoketests to find the nova package from the checkout. Adds smoketests for security groups. Also fixes a couple of typos.
This commit is contained in:
@@ -19,10 +19,17 @@
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
import zipfile
|
||||
|
||||
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
from nova import adminclient
|
||||
from smoketests import flags
|
||||
from smoketests import base
|
||||
|
||||
@@ -24,6 +24,14 @@ import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
from smoketests import flags
|
||||
from smoketests import base
|
||||
|
||||
@@ -40,6 +48,7 @@ flags.DEFINE_string('bundle_image', 'openwrt-x86-ext2.image',
|
||||
TEST_PREFIX = 'test%s' % int (random.random()*1000000)
|
||||
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
|
||||
TEST_KEY = '%s_key' % TEST_PREFIX
|
||||
TEST_GROUP = '%s_group' % TEST_PREFIX
|
||||
TEST_DATA = {}
|
||||
|
||||
|
||||
@@ -137,7 +146,7 @@ class InstanceTests(UserSmokeTestCase):
|
||||
self.data['instance_id'] = reservation.instances[0].id
|
||||
|
||||
def test_003_instance_runs_within_60_seconds(self):
|
||||
reservations = self.conn.get_all_instances([data['instance_id']])
|
||||
reservations = self.conn.get_all_instances([self.data['instance_id']])
|
||||
instance = reservations[0].instances[0]
|
||||
# allow 60 seconds to exit pending with IP
|
||||
for x in xrange(60):
|
||||
@@ -207,7 +216,7 @@ class InstanceTests(UserSmokeTestCase):
|
||||
def test_999_tearDown(self):
|
||||
self.delete_key_pair(self.conn, TEST_KEY)
|
||||
if self.data.has_key('instance_id'):
|
||||
self.conn.terminate_instances([data['instance_id']])
|
||||
self.conn.terminate_instances([self.data['instance_id']])
|
||||
|
||||
|
||||
class VolumeTests(UserSmokeTestCase):
|
||||
@@ -319,8 +328,80 @@ class VolumeTests(UserSmokeTestCase):
|
||||
self.conn.delete_key_pair(TEST_KEY)
|
||||
|
||||
|
||||
class SecurityGroupTests(UserSmokeTestCase):
|
||||
|
||||
def __public_instance_is_accessible(self):
|
||||
id_url = "latest/meta-data/instance-id"
|
||||
options = "-s --max-time 1"
|
||||
command = "curl %s %s/%s" % (options, self.data['public_ip'], id_url)
|
||||
instance_id = commands.getoutput(command).strip()
|
||||
if not instance_id:
|
||||
return False
|
||||
if instance_id != self.data['instance_id']:
|
||||
raise Exception("Wrong instance id")
|
||||
return True
|
||||
|
||||
def test_001_can_create_security_group(self):
|
||||
self.conn.create_security_group(TEST_GROUP, description='test')
|
||||
|
||||
groups = self.conn.get_all_security_groups()
|
||||
self.assertTrue(TEST_GROUP in [group.name for group in groups])
|
||||
|
||||
def test_002_can_launch_instance_in_security_group(self):
|
||||
self.create_key_pair(self.conn, TEST_KEY)
|
||||
reservation = self.conn.run_instances(FLAGS.test_image,
|
||||
key_name=TEST_KEY,
|
||||
security_groups=[TEST_GROUP],
|
||||
instance_type='m1.tiny')
|
||||
|
||||
self.data['instance_id'] = reservation.instances[0].id
|
||||
|
||||
def test_003_can_authorize_security_group_ingress(self):
|
||||
self.assertTrue(self.conn.authorize_security_group(TEST_GROUP,
|
||||
ip_protocol='tcp',
|
||||
from_port=80,
|
||||
to_port=80))
|
||||
|
||||
def test_004_can_access_instance_over_public_ip(self):
|
||||
result = self.conn.allocate_address()
|
||||
self.assertTrue(hasattr(result, 'public_ip'))
|
||||
self.data['public_ip'] = result.public_ip
|
||||
|
||||
result = self.conn.associate_address(self.data['instance_id'],
|
||||
self.data['public_ip'])
|
||||
start_time = time.time()
|
||||
while not self.__public_instance_is_accessible():
|
||||
# 1 minute to launch
|
||||
if time.time() - start_time > 60:
|
||||
raise Exception("Timeout")
|
||||
time.sleep(1)
|
||||
|
||||
def test_005_can_revoke_security_group_ingress(self):
|
||||
self.assertTrue(self.conn.revoke_security_group(TEST_GROUP,
|
||||
ip_protocol='tcp',
|
||||
from_port=80,
|
||||
to_port=80))
|
||||
start_time = time.time()
|
||||
while self.__public_instance_is_accessible():
|
||||
# 1 minute to teardown
|
||||
if time.time() - start_time > 60:
|
||||
raise Exception("Timeout")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def test_999_tearDown(self):
|
||||
self.conn.delete_key_pair(TEST_KEY)
|
||||
self.conn.delete_security_group(TEST_GROUP)
|
||||
groups = self.conn.get_all_security_groups()
|
||||
self.assertFalse(TEST_GROUP in [group.name for group in groups])
|
||||
self.conn.terminate_instances([self.data['instance_id']])
|
||||
self.assertTrue(self.conn.release_address(self.data['public_ip']))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
suites = {'image': unittest.makeSuite(ImageTests),
|
||||
'instance': unittest.makeSuite(InstanceTests),
|
||||
'volume': unittest.makeSuite(VolumeTests)}
|
||||
'security_group': unittest.makeSuite(SecurityGroupTests),
|
||||
'volume': unittest.makeSuite(VolumeTests)
|
||||
}
|
||||
sys.exit(base.run_tests(suites))
|
||||
|
||||
Reference in New Issue
Block a user