This adds the servers search capabilities defined in the OS API v1.1 spec.. and more for admins.

For users, flavor=, image=, status=, and name= can be specified.  name= supports regular expression matching.
Most other options are ignored.  (things outside of the spec like 'recurse_zones' and 'reservation_id' still work, also)

If admin_api is enabled and context is an admin: along with the above, one can specify ip= and ip6= which will do regular expression matching.  Also, any other 'Instance' column name can be specified, so you can do regexp matching there as well.  Unknown Instance columns are ignored.

Also fixes up fixed_ip=, making a 404 returned vs a 500 error... and handling this properly with zone recursion as well.
This commit is contained in:
Chris Behrens
2011-08-09 21:09:10 +00:00
committed by Tarmac
12 changed files with 1111 additions and 136 deletions
+30 -12
View File
@@ -213,8 +213,9 @@ class CloudController(object):
def _get_mpi_data(self, context, project_id):
result = {}
search_opts = {'project_id': project_id}
for instance in self.compute_api.get_all(context,
project_id=project_id):
search_opts=search_opts):
if instance['fixed_ips']:
line = '%s slots=%d' % (instance['fixed_ips'][0]['address'],
instance['vcpus'])
@@ -264,8 +265,13 @@ class CloudController(object):
def get_metadata(self, address):
ctxt = context.get_admin_context()
instance_ref = self.compute_api.get_all(ctxt, fixed_ip=address)
if instance_ref is None:
search_opts = {'fixed_ip': address}
try:
instance_ref = self.compute_api.get_all(ctxt,
search_opts=search_opts)
except exception.NotFound:
instance_ref = None
if not instance_ref:
return None
# This ensures that all attributes of the instance
@@ -1086,11 +1092,16 @@ class CloudController(object):
return result
def describe_instances(self, context, **kwargs):
return self._format_describe_instances(context, **kwargs)
# Optional DescribeInstances argument
instance_id = kwargs.get('instance_id', None)
return self._format_describe_instances(context,
instance_id=instance_id)
def describe_instances_v6(self, context, **kwargs):
kwargs['use_v6'] = True
return self._format_describe_instances(context, **kwargs)
# Optional DescribeInstancesV6 argument
instance_id = kwargs.get('instance_id', None)
return self._format_describe_instances(context,
instance_id=instance_id, use_v6=True)
def _format_describe_instances(self, context, **kwargs):
return {'reservationSet': self._format_instances(context, **kwargs)}
@@ -1152,7 +1163,8 @@ class CloudController(object):
result['groupSet'] = CloudController._convert_to_set(
security_group_names, 'groupId')
def _format_instances(self, context, instance_id=None, **kwargs):
def _format_instances(self, context, instance_id=None, use_v6=False,
**search_opts):
# TODO(termie): this method is poorly named as its name does not imply
# that it will be making a variety of database calls
# rather than simply formatting a bunch of instances that
@@ -1163,11 +1175,17 @@ class CloudController(object):
instances = []
for ec2_id in instance_id:
internal_id = ec2utils.ec2_id_to_id(ec2_id)
instance = self.compute_api.get(context,
instance_id=internal_id)
try:
instance = self.compute_api.get(context, internal_id)
except exception.NotFound:
continue
instances.append(instance)
else:
instances = self.compute_api.get_all(context, **kwargs)
try:
instances = self.compute_api.get_all(context,
search_opts=search_opts)
except exception.NotFound:
instances = []
for instance in instances:
if not context.is_admin:
if instance['image_ref'] == str(FLAGS.vpn_image_id):
@@ -1189,7 +1207,7 @@ class CloudController(object):
fixed_addr = fixed['address']
if fixed['floating_ips']:
floating_addr = fixed['floating_ips'][0]['address']
if fixed['network'] and 'use_v6' in kwargs:
if fixed['network'] and use_v6:
i['dnsNameV6'] = ipv6.to_global(
fixed['network']['cidr_v6'],
fixed['virtual_interface']['address'],
@@ -1326,7 +1344,7 @@ class CloudController(object):
'AvailabilityZone'),
block_device_mapping=kwargs.get('block_device_mapping', {}))
return self._format_run_instances(context,
instances[0]['reservation_id'])
reservation_id=instances[0]['reservation_id'])
def _do_instance(self, action, context, ec2_id):
instance_id = ec2utils.ec2_id_to_id(ec2_id)
+33
View File
@@ -27,6 +27,7 @@ from nova import flags
from nova import log as logging
from nova import quota
from nova.api.openstack import wsgi
from nova.compute import power_state as compute_power_state
LOG = logging.getLogger('nova.api.openstack.common')
@@ -37,6 +38,38 @@ XML_NS_V10 = 'http://docs.rackspacecloud.com/servers/api/v1.0'
XML_NS_V11 = 'http://docs.openstack.org/compute/api/v1.1'
_STATUS_MAP = {
None: 'BUILD',
compute_power_state.NOSTATE: 'BUILD',
compute_power_state.RUNNING: 'ACTIVE',
compute_power_state.BLOCKED: 'ACTIVE',
compute_power_state.SUSPENDED: 'SUSPENDED',
compute_power_state.PAUSED: 'PAUSED',
compute_power_state.SHUTDOWN: 'SHUTDOWN',
compute_power_state.SHUTOFF: 'SHUTOFF',
compute_power_state.CRASHED: 'ERROR',
compute_power_state.FAILED: 'ERROR',
compute_power_state.BUILDING: 'BUILD',
}
def status_from_power_state(power_state):
"""Map the power state to the server status string"""
return _STATUS_MAP[power_state]
def power_states_from_status(status):
"""Map the server status string to a list of power states"""
power_states = []
for power_state, status_map in _STATUS_MAP.iteritems():
# Skip the 'None' state
if power_state is None:
continue
if status.lower() == status_map.lower():
power_states.append(power_state)
return power_states
def get_pagination_params(request):
"""Return marker, limit tuple from request.
+80 -17
View File
@@ -44,7 +44,7 @@ FLAGS = flags.FLAGS
class Controller(object):
""" The Server API controller for the OpenStack API """
""" The Server API base controller class for the OpenStack API """
def __init__(self):
self.compute_api = compute.API()
@@ -53,17 +53,21 @@ class Controller(object):
def index(self, req):
""" Returns a list of server names and ids for a given user """
try:
servers = self._items(req, is_detail=False)
servers = self._get_servers(req, is_detail=False)
except exception.Invalid as err:
return exc.HTTPBadRequest(explanation=str(err))
except exception.NotFound:
return exc.HTTPNotFound()
return servers
def detail(self, req):
""" Returns a list of server details for a given user """
try:
servers = self._items(req, is_detail=True)
servers = self._get_servers(req, is_detail=True)
except exception.Invalid as err:
return exc.HTTPBadRequest(explanation=str(err))
except exception.NotFound as err:
return exc.HTTPNotFound()
return servers
def _build_view(self, req, instance, is_detail=False):
@@ -75,22 +79,55 @@ class Controller(object):
def _action_rebuild(self, info, request, instance_id):
raise NotImplementedError()
def _items(self, req, is_detail):
"""Returns a list of servers for a given user.
builder - the response model builder
def _get_servers(self, req, is_detail):
"""Returns a list of servers, taking into account any search
options specified.
"""
query_str = req.str_GET
reservation_id = query_str.get('reservation_id')
project_id = query_str.get('project_id')
fixed_ip = query_str.get('fixed_ip')
recurse_zones = utils.bool_from_str(query_str.get('recurse_zones'))
search_opts = {}
search_opts.update(req.str_GET)
context = req.environ['nova.context']
remove_invalid_options(context, search_opts,
self._get_server_search_options())
# Convert recurse_zones into a boolean
search_opts['recurse_zones'] = utils.bool_from_str(
search_opts.get('recurse_zones', False))
# If search by 'status', we need to convert it to 'state'
# If the status is unknown, bail.
# Leave 'state' in search_opts so compute can pass it on to
# child zones..
if 'status' in search_opts:
status = search_opts['status']
search_opts['state'] = common.power_states_from_status(status)
if len(search_opts['state']) == 0:
reason = _('Invalid server status: %(status)s') % locals()
LOG.error(reason)
raise exception.InvalidInput(reason=reason)
# By default, compute's get_all() will return deleted instances.
# If an admin hasn't specified a 'deleted' search option, we need
# to filter out deleted instances by setting the filter ourselves.
# ... Unless 'changes-since' is specified, because 'changes-since'
# should return recently deleted images according to the API spec.
if 'deleted' not in search_opts:
# Admin hasn't specified deleted filter
if 'changes-since' not in search_opts:
# No 'changes-since', so we need to find non-deleted servers
search_opts['deleted'] = False
else:
# This is the default, but just in case..
search_opts['deleted'] = True
instance_list = self.compute_api.get_all(
req.environ['nova.context'],
reservation_id=reservation_id,
project_id=project_id,
fixed_ip=fixed_ip,
recurse_zones=recurse_zones)
context, search_opts=search_opts)
# FIXME(comstud): 'changes-since' is not fully implemented. Where
# should this be filtered?
limited_list = self._limit_items(instance_list, req)
servers = [self._build_view(req, inst, is_detail)['server']
for inst in limited_list]
@@ -506,6 +543,7 @@ class Controller(object):
class ControllerV10(Controller):
"""v1.0 OpenStack API controller"""
@scheduler_api.redirect_handler
def delete(self, req, id):
@@ -568,8 +606,13 @@ class ControllerV10(Controller):
""" Determine the admin password for a server on creation """
return self.helper._get_server_admin_password_old_style(server)
def _get_server_search_options(self):
"""Return server search options allowed by non-admin"""
return 'reservation_id', 'fixed_ip', 'name', 'recurse_zones'
class ControllerV11(Controller):
"""v1.1 OpenStack API controller"""
@scheduler_api.redirect_handler
def delete(self, req, id):
@@ -742,6 +785,11 @@ class ControllerV11(Controller):
""" Determine the admin password for a server on creation """
return self.helper._get_server_admin_password_new_style(server)
def _get_server_search_options(self):
"""Return server search options allowed by non-admin"""
return ('reservation_id', 'name', 'recurse_zones',
'status', 'image', 'flavor', 'changes-since')
class HeadersSerializer(wsgi.ResponseHeadersSerializer):
@@ -920,3 +968,18 @@ def create_resource(version='1.0'):
deserializer = wsgi.RequestDeserializer(body_deserializers)
return wsgi.Resource(controller, deserializer, serializer)
def remove_invalid_options(context, search_options, allowed_search_options):
"""Remove search options that are not valid for non-admin API/context"""
if FLAGS.allow_admin_api and context.is_admin:
# Allow all options
return
# Otherwise, strip out all unknown options
unknown_options = [opt for opt in search_options
if opt not in allowed_search_options]
unk_opt_str = ", ".join(unknown_options)
log_msg = _("Removing options '%(unk_opt_str)s' from query") % locals()
LOG.debug(log_msg)
for opt in unknown_options:
search_options.pop(opt, None)
+1 -15
View File
@@ -20,7 +20,6 @@ import hashlib
import os
from nova import exception
from nova.compute import power_state
import nova.compute
import nova.context
from nova.api.openstack import common
@@ -61,24 +60,11 @@ class ViewBuilder(object):
def _build_detail(self, inst):
"""Returns a detailed model of a server."""
power_mapping = {
None: 'BUILD',
power_state.NOSTATE: 'BUILD',
power_state.RUNNING: 'ACTIVE',
power_state.BLOCKED: 'ACTIVE',
power_state.SUSPENDED: 'SUSPENDED',
power_state.PAUSED: 'PAUSED',
power_state.SHUTDOWN: 'SHUTDOWN',
power_state.SHUTOFF: 'SHUTOFF',
power_state.CRASHED: 'ERROR',
power_state.FAILED: 'ERROR',
power_state.BUILDING: 'BUILD',
}
inst_dict = {
'id': inst['id'],
'name': inst['display_name'],
'status': power_mapping[inst.get('state')]}
'status': common.status_from_power_state(inst.get('state'))}
ctxt = nova.context.get_admin_context()
compute_api = nova.compute.API()
+58 -32
View File
@@ -19,6 +19,7 @@
"""Handles all requests relating to instances (guest vms)."""
import eventlet
import novaclient
import re
import time
@@ -712,59 +713,84 @@ class API(base.Base):
"""
return self.get(context, instance_id)
def get_all(self, context, project_id=None, reservation_id=None,
fixed_ip=None, recurse_zones=False):
def get_all(self, context, search_opts=None):
"""Get all instances filtered by one of the given parameters.
If there is no filter and the context is an admin, it will retreive
all instances in the system.
"""
if reservation_id is not None:
recurse_zones = True
instances = self.db.instance_get_all_by_reservation(
context, reservation_id)
elif fixed_ip is not None:
try:
instances = self.db.fixed_ip_get_instance(context, fixed_ip)
except exception.FloatingIpNotFound, e:
if not recurse_zones:
raise
instances = None
elif project_id or not context.is_admin:
if not context.project_id:
instances = self.db.instance_get_all_by_user(
context, context.user_id)
else:
if project_id is None:
project_id = context.project_id
instances = self.db.instance_get_all_by_project(
context, project_id)
else:
instances = self.db.instance_get_all(context)
if search_opts is None:
search_opts = {}
if instances is None:
instances = []
elif not isinstance(instances, list):
instances = [instances]
LOG.debug(_("Searching by: %s") % str(search_opts))
# Fixups for the DB call
filters = {}
def _remap_flavor_filter(flavor_id):
instance_type = self.db.instance_type_get_by_flavor_id(
context, flavor_id)
filters['instance_type_id'] = instance_type['id']
def _remap_fixed_ip_filter(fixed_ip):
# Turn fixed_ip into a regexp match. Since '.' matches
# any character, we need to use regexp escaping for it.
filters['ip'] = '^%s$' % fixed_ip.replace('.', '\\.')
# search_option to filter_name mapping.
filter_mapping = {
'image': 'image_ref',
'name': 'display_name',
'instance_name': 'name',
'recurse_zones': None,
'flavor': _remap_flavor_filter,
'fixed_ip': _remap_fixed_ip_filter}
# copy from search_opts, doing various remappings as necessary
for opt, value in search_opts.iteritems():
# Do remappings.
# Values not in the filter_mapping table are copied as-is.
# If remapping is None, option is not copied
# If the remapping is a string, it is the filter_name to use
try:
remap_object = filter_mapping[opt]
except KeyError:
filters[opt] = value
else:
if remap_object:
if isinstance(remap_object, basestring):
filters[remap_object] = value
else:
remap_object(value)
recurse_zones = search_opts.get('recurse_zones', False)
if 'reservation_id' in filters:
recurse_zones = True
instances = self.db.instance_get_all_by_filters(context, filters)
if not recurse_zones:
return instances
# Recurse zones. Need admin context for this. Send along
# the un-modified search options we received..
admin_context = context.elevated()
children = scheduler_api.call_zone_method(admin_context,
"list",
errors_to_ignore=[novaclient.exceptions.NotFound],
novaclient_collection_name="servers",
reservation_id=reservation_id,
project_id=project_id,
fixed_ip=fixed_ip,
recurse_zones=True)
search_opts=search_opts)
for zone, servers in children:
# 'servers' can be None if a 404 was returned by a zone
if servers is None:
continue
for server in servers:
# Results are ready to send to user. No need to scrub.
server._info['_is_precooked'] = True
instances.append(server._info)
return instances
def _cast_compute_message(self, method, context, instance_id, host=None,
+16 -10
View File
@@ -387,15 +387,6 @@ def fixed_ip_get_by_virtual_interface(context, vif_id):
return IMPL.fixed_ip_get_by_virtual_interface(context, vif_id)
def fixed_ip_get_instance(context, address):
"""Get an instance for a fixed ip by address."""
return IMPL.fixed_ip_get_instance(context, address)
def fixed_ip_get_instance_v6(context, address):
return IMPL.fixed_ip_get_instance_v6(context, address)
def fixed_ip_get_network(context, address):
"""Get a network for a fixed ip by address."""
return IMPL.fixed_ip_get_network(context, address)
@@ -500,6 +491,11 @@ def instance_get_all(context):
return IMPL.instance_get_all(context)
def instance_get_all_by_filters(context, filters):
"""Get all instances that match all filters."""
return IMPL.instance_get_all_by_filters(context, filters)
def instance_get_active_by_window(context, begin, end=None):
"""Get instances active during a certain time window."""
return IMPL.instance_get_active_by_window(context, begin, end)
@@ -521,10 +517,20 @@ def instance_get_all_by_host(context, host):
def instance_get_all_by_reservation(context, reservation_id):
"""Get all instance belonging to a reservation."""
"""Get all instances belonging to a reservation."""
return IMPL.instance_get_all_by_reservation(context, reservation_id)
def instance_get_by_fixed_ip(context, address):
"""Get an instance for a fixed ip by address."""
return IMPL.instance_get_by_fixed_ip(context, address)
def instance_get_by_fixed_ipv6(context, address):
"""Get an instance for a fixed ip by IPv6 address."""
return IMPL.instance_get_by_fixed_ipv6(context, address)
def instance_get_fixed_addresses(context, instance_id):
"""Get the fixed ip address of an instance."""
return IMPL.instance_get_fixed_addresses(context, instance_id)
+149 -44
View File
@@ -18,6 +18,7 @@
"""
Implementation of SQLAlchemy backend.
"""
import re
import warnings
from nova import block_device
@@ -829,28 +830,6 @@ def fixed_ip_get_by_virtual_interface(context, vif_id):
return rv
@require_context
def fixed_ip_get_instance(context, address):
fixed_ip_ref = fixed_ip_get_by_address(context, address)
return fixed_ip_ref.instance
@require_context
def fixed_ip_get_instance_v6(context, address):
session = get_session()
# convert IPv6 address to mac
mac = ipv6.to_mac(address)
# get virtual interface
vif_ref = virtual_interface_get_by_address(context, mac)
# look up instance based on instance_id from vif row
result = session.query(models.Instance).\
filter_by(id=vif_ref['instance_id'])
return result
@require_admin_context
def fixed_ip_get_network(context, address):
fixed_ip_ref = fixed_ip_get_by_address(context, address)
@@ -1169,6 +1148,114 @@ def instance_get_all(context):
all()
@require_context
def instance_get_all_by_filters(context, filters):
"""Return instances that match all filters. Deleted instances
will be returned by default, unless there's a filter that says
otherwise"""
def _regexp_filter_by_ipv6(instance, filter_re):
for interface in instance['virtual_interfaces']:
fixed_ipv6 = interface.get('fixed_ipv6')
if fixed_ipv6 and filter_re.match(fixed_ipv6):
return True
return False
def _regexp_filter_by_ip(instance, filter_re):
for interface in instance['virtual_interfaces']:
for fixed_ip in interface['fixed_ips']:
if not fixed_ip or not fixed_ip['address']:
continue
if filter_re.match(fixed_ip['address']):
return True
for floating_ip in fixed_ip.get('floating_ips', []):
if not floating_ip or not floating_ip['address']:
continue
if filter_re.match(floating_ip['address']):
return True
return False
def _regexp_filter_by_column(instance, filter_name, filter_re):
try:
v = getattr(instance, filter_name)
except AttributeError:
return True
if v and filter_re.match(str(v)):
return True
return False
def _exact_match_filter(query, column, value):
"""Do exact match against a column. value to match can be a list
so you can match any value in the list.
"""
if isinstance(value, list):
column_attr = getattr(models.Instance, column)
return query.filter(column_attr.in_(value))
else:
filter_dict = {}
filter_dict[column] = value
return query.filter_by(**filter_dict)
session = get_session()
query_prefix = session.query(models.Instance).\
options(joinedload_all('fixed_ips.floating_ips')).\
options(joinedload_all('virtual_interfaces.network')).\
options(joinedload_all(
'virtual_interfaces.fixed_ips.floating_ips')).\
options(joinedload('security_groups')).\
options(joinedload_all('fixed_ips.network')).\
options(joinedload('metadata')).\
options(joinedload('instance_type'))
# Make a copy of the filters dictionary to use going forward, as we'll
# be modifying it and we shouldn't affect the caller's use of it.
filters = filters.copy()
if not context.is_admin:
# If we're not admin context, add appropriate filter..
if context.project_id:
filters['project_id'] = context.project_id
else:
filters['user_id'] = context.user_id
# Filters for exact matches that we can do along with the SQL query...
# For other filters that don't match this, we will do regexp matching
exact_match_filter_names = ['project_id', 'user_id', 'image_ref',
'state', 'instance_type_id', 'deleted']
query_filters = [key for key in filters.iterkeys()
if key in exact_match_filter_names]
for filter_name in query_filters:
# Do the matching and remove the filter from the dictionary
# so we don't try it again below..
query_prefix = _exact_match_filter(query_prefix, filter_name,
filters.pop(filter_name))
instances = query_prefix.all()
if not instances:
return []
# Now filter on everything else for regexp matching..
# For filters not in the list, we'll attempt to use the filter_name
# as a column name in Instance..
regexp_filter_funcs = {'ip6': _regexp_filter_by_ipv6,
'ip': _regexp_filter_by_ip}
for filter_name in filters.iterkeys():
filter_func = regexp_filter_funcs.get(filter_name, None)
filter_re = re.compile(str(filters[filter_name]))
if filter_func:
filter_l = lambda instance: filter_func(instance, filter_re)
else:
filter_l = lambda instance: _regexp_filter_by_column(instance,
filter_name, filter_re)
instances = filter(filter_l, instances)
return instances
@require_admin_context
def instance_get_active_by_window(context, begin, end=None):
"""Return instances that were continuously active over the given window"""
@@ -1237,30 +1324,48 @@ def instance_get_all_by_project(context, project_id):
@require_context
def instance_get_all_by_reservation(context, reservation_id):
session = get_session()
query = session.query(models.Instance).\
filter_by(reservation_id=reservation_id).\
options(joinedload_all('fixed_ips.floating_ips')).\
options(joinedload('virtual_interfaces')).\
options(joinedload('security_groups')).\
options(joinedload_all('fixed_ips.network')).\
options(joinedload('metadata')).\
options(joinedload('instance_type'))
if is_admin_context(context):
return session.query(models.Instance).\
options(joinedload_all('fixed_ips.floating_ips')).\
options(joinedload('virtual_interfaces')).\
options(joinedload('security_groups')).\
options(joinedload_all('fixed_ips.network')).\
options(joinedload('metadata')).\
options(joinedload('instance_type')).\
filter_by(reservation_id=reservation_id).\
filter_by(deleted=can_read_deleted(context)).\
all()
return query.\
filter_by(deleted=can_read_deleted(context)).\
all()
elif is_user_context(context):
return session.query(models.Instance).\
options(joinedload_all('fixed_ips.floating_ips')).\
options(joinedload('virtual_interfaces')).\
options(joinedload('security_groups')).\
options(joinedload_all('fixed_ips.network')).\
options(joinedload('metadata')).\
options(joinedload('instance_type')).\
filter_by(project_id=context.project_id).\
filter_by(reservation_id=reservation_id).\
filter_by(deleted=False).\
all()
return query.\
filter_by(project_id=context.project_id).\
filter_by(deleted=False).\
all()
@require_context
def instance_get_by_fixed_ip(context, address):
"""Return instance ref by exact match of FixedIP"""
fixed_ip_ref = fixed_ip_get_by_address(context, address)
return fixed_ip_ref.instance
@require_context
def instance_get_by_fixed_ipv6(context, address):
"""Return instance ref by exact match of IPv6"""
session = get_session()
# convert IPv6 address to mac
mac = ipv6.to_mac(address)
# get virtual interface
vif_ref = virtual_interface_get_by_address(context, mac)
# look up instance based on instance_id from vif row
result = session.query(models.Instance).\
filter_by(id=vif_ref['instance_id'])
return result
@require_admin_context
@@ -1302,7 +1407,7 @@ def instance_get_fixed_addresses_v6(context, instance_id):
network_refs = network_get_all_by_instance(context, instance_id)
# compile a list of cidr_v6 prefixes sorted by network id
prefixes = [ref.cidr_v6 for ref in
sorted(network_refs, key=lambda ref: ref.id)]
sorted(network_refs, key=lambda ref: ref.id)]
# get vifs associated with instance
vif_refs = virtual_interface_get_by_instance(context, instance_ref.id)
# compile list of the mac_addresses for vifs sorted by network id
+1
View File
@@ -180,6 +180,7 @@ class Instance(BASE, NovaBase):
image_ref = Column(String(255))
kernel_id = Column(String(255))
ramdisk_id = Column(String(255))
server_name = Column(String(255))
# image_ref = Column(Integer, ForeignKey('images.id'), nullable=True)
# kernel_id = Column(Integer, ForeignKey('images.id'), nullable=True)
+6 -2
View File
@@ -71,14 +71,18 @@ def fake_wsgi(self, req):
return self.application
def wsgi_app(inner_app10=None, inner_app11=None, fake_auth=True):
def wsgi_app(inner_app10=None, inner_app11=None, fake_auth=True,
fake_auth_context=None):
if not inner_app10:
inner_app10 = openstack.APIRouterV10()
if not inner_app11:
inner_app11 = openstack.APIRouterV11()
if fake_auth:
ctxt = context.RequestContext('fake', 'fake')
if fake_auth_context is not None:
ctxt = fake_auth_context
else:
ctxt = context.RequestContext('fake', 'fake')
api10 = openstack.FaultWrapper(wsgi.InjectContext(ctxt,
limits.RateLimitingMiddleware(inner_app10)))
api11 = openstack.FaultWrapper(wsgi.InjectContext(ctxt,
+275 -2
View File
@@ -236,7 +236,8 @@ class ServersTest(test.TestCase):
fakes.stub_out_key_pair_funcs(self.stubs)
fakes.stub_out_image_service(self.stubs)
self.stubs.Set(utils, 'gen_uuid', fake_gen_uuid)
self.stubs.Set(nova.db.api, 'instance_get_all', return_servers)
self.stubs.Set(nova.db.api, 'instance_get_all_by_filters',
return_servers)
self.stubs.Set(nova.db.api, 'instance_get', return_server_by_id)
self.stubs.Set(nova.db, 'instance_get_by_uuid',
return_server_by_uuid)
@@ -1098,6 +1099,277 @@ class ServersTest(test.TestCase):
self.assertEqual(res.status_int, 400)
self.assertTrue(res.body.find('marker param') > -1)
def test_get_servers_with_bad_option_v1_0(self):
# 1.0 API ignores unknown options
def fake_get_all(compute_self, context, search_opts=None):
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
req = webob.Request.blank('/v1.0/servers?unknownoption=whee')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_with_bad_option_v1_1(self):
# 1.1 API also ignores unknown options
def fake_get_all(compute_self, context, search_opts=None):
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
req = webob.Request.blank('/v1.1/servers?unknownoption=whee')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_allows_image_v1_1(self):
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
self.assertTrue('image' in search_opts)
self.assertEqual(search_opts['image'], '12345')
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
self.flags(allow_admin_api=False)
req = webob.Request.blank('/v1.1/servers?image=12345')
res = req.get_response(fakes.wsgi_app())
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_allows_flavor_v1_1(self):
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
self.assertTrue('flavor' in search_opts)
# flavor is an integer ID
self.assertEqual(search_opts['flavor'], '12345')
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
self.flags(allow_admin_api=False)
req = webob.Request.blank('/v1.1/servers?flavor=12345')
res = req.get_response(fakes.wsgi_app())
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_allows_status_v1_1(self):
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
self.assertTrue('state' in search_opts)
self.assertEqual(set(search_opts['state']),
set([power_state.RUNNING, power_state.BLOCKED]))
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
self.flags(allow_admin_api=False)
req = webob.Request.blank('/v1.1/servers?status=active')
res = req.get_response(fakes.wsgi_app())
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_invalid_status_v1_1(self):
"""Test getting servers by invalid status"""
self.flags(allow_admin_api=False)
req = webob.Request.blank('/v1.1/servers?status=running')
res = req.get_response(fakes.wsgi_app())
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 400)
self.assertTrue(res.body.find('Invalid server status') > -1)
def test_get_servers_allows_name_v1_1(self):
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
self.assertTrue('name' in search_opts)
self.assertEqual(search_opts['name'], 'whee.*')
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
self.flags(allow_admin_api=False)
req = webob.Request.blank('/v1.1/servers?name=whee.*')
res = req.get_response(fakes.wsgi_app())
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_unknown_or_admin_options1(self):
"""Test getting servers by admin-only or unknown options.
This tests when admin_api is off. Make sure the admin and
unknown options are stripped before they get to
compute_api.get_all()
"""
self.flags(allow_admin_api=False)
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
# Allowed by user
self.assertTrue('name' in search_opts)
self.assertTrue('status' in search_opts)
# Allowed only by admins with admin API on
self.assertFalse('ip' in search_opts)
self.assertFalse('unknown_option' in search_opts)
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
query_str = "name=foo&ip=10.*&status=active&unknown_option=meow"
req = webob.Request.blank('/v1.1/servers?%s' % query_str)
# Request admin context
context = nova.context.RequestContext('testuser', 'testproject',
is_admin=True)
res = req.get_response(fakes.wsgi_app(fake_auth_context=context))
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_unknown_or_admin_options2(self):
"""Test getting servers by admin-only or unknown options.
This tests when admin_api is on, but context is a user.
Make sure the admin and unknown options are stripped before
they get to compute_api.get_all()
"""
self.flags(allow_admin_api=True)
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
# Allowed by user
self.assertTrue('name' in search_opts)
self.assertTrue('status' in search_opts)
# Allowed only by admins with admin API on
self.assertFalse('ip' in search_opts)
self.assertFalse('unknown_option' in search_opts)
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
query_str = "name=foo&ip=10.*&status=active&unknown_option=meow"
req = webob.Request.blank('/v1.1/servers?%s' % query_str)
# Request admin context
context = nova.context.RequestContext('testuser', 'testproject',
is_admin=False)
res = req.get_response(fakes.wsgi_app(fake_auth_context=context))
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_unknown_or_admin_options3(self):
"""Test getting servers by admin-only or unknown options.
This tests when admin_api is on and context is admin.
All options should be passed through to compute_api.get_all()
"""
self.flags(allow_admin_api=True)
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
# Allowed by user
self.assertTrue('name' in search_opts)
self.assertTrue('status' in search_opts)
# Allowed only by admins with admin API on
self.assertTrue('ip' in search_opts)
self.assertTrue('unknown_option' in search_opts)
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
query_str = "name=foo&ip=10.*&status=active&unknown_option=meow"
req = webob.Request.blank('/v1.1/servers?%s' % query_str)
# Request admin context
context = nova.context.RequestContext('testuser', 'testproject',
is_admin=True)
res = req.get_response(fakes.wsgi_app(fake_auth_context=context))
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_admin_allows_ip_v1_1(self):
"""Test getting servers by ip with admin_api enabled and
admin context
"""
self.flags(allow_admin_api=True)
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
self.assertTrue('ip' in search_opts)
self.assertEqual(search_opts['ip'], '10\..*')
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
req = webob.Request.blank('/v1.1/servers?ip=10\..*')
# Request admin context
context = nova.context.RequestContext('testuser', 'testproject',
is_admin=True)
res = req.get_response(fakes.wsgi_app(fake_auth_context=context))
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def test_get_servers_admin_allows_ip6_v1_1(self):
"""Test getting servers by ip6 with admin_api enabled and
admin context
"""
self.flags(allow_admin_api=True)
def fake_get_all(compute_self, context, search_opts=None):
self.assertNotEqual(search_opts, None)
self.assertTrue('ip6' in search_opts)
self.assertEqual(search_opts['ip6'], 'ffff.*')
return [stub_instance(100)]
self.stubs.Set(nova.compute.API, 'get_all', fake_get_all)
req = webob.Request.blank('/v1.1/servers?ip6=ffff.*')
# Request admin context
context = nova.context.RequestContext('testuser', 'testproject',
is_admin=True)
res = req.get_response(fakes.wsgi_app(fake_auth_context=context))
# The following assert will fail if either of the asserts in
# fake_get_all() fail
self.assertEqual(res.status_int, 200)
servers = json.loads(res.body)['servers']
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['id'], 100)
def _setup_for_create_instance(self):
"""Shared implementation for tests below that create instance"""
def instance_create(context, inst):
@@ -1665,6 +1937,7 @@ class ServersTest(test.TestCase):
def test_get_all_server_details_v1_0(self):
req = webob.Request.blank('/v1.0/servers/detail')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_dict = json.loads(res.body)
for i, s in enumerate(res_dict['servers']):
@@ -1720,7 +1993,7 @@ class ServersTest(test.TestCase):
return [stub_instance(i, 'fake', 'fake', None, None, i % 2)
for i in xrange(5)]
self.stubs.Set(nova.db.api, 'instance_get_all_by_project',
self.stubs.Set(nova.db.api, 'instance_get_all_by_filters',
return_servers_with_host)
req = webob.Request.blank('/v1.0/servers/detail')
+457 -1
View File
@@ -26,6 +26,7 @@ from nova.compute import power_state
from nova import context
from nova import db
from nova.db.sqlalchemy import models
from nova.db.sqlalchemy import api as sqlalchemy_api
from nova import exception
from nova import flags
import nova.image.fake
@@ -73,8 +74,11 @@ class ComputeTestCase(test.TestCase):
self.stubs.Set(nova.image.fake._FakeImageService, 'show', fake_show)
def _create_instance(self, params={}):
def _create_instance(self, params=None):
"""Create a test instance"""
if params is None:
params = {}
inst = {}
inst['image_ref'] = 1
inst['reservation_id'] = 'r-fakeres'
@@ -864,6 +868,458 @@ class ComputeTestCase(test.TestCase):
self.assertEqual(len(instances), 1)
self.assertEqual(power_state.SHUTOFF, instances[0]['state'])
def test_get_all_by_name_regexp(self):
"""Test searching instances by name (display_name)"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'display_name': 'woot'})
instance_id2 = self._create_instance({
'display_name': 'woo',
'id': 20})
instance_id3 = self._create_instance({
'display_name': 'not-woot',
'id': 30})
instances = self.compute_api.get_all(c,
search_opts={'name': 'woo.*'})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id1 in instance_ids)
self.assertTrue(instance_id2 in instance_ids)
instances = self.compute_api.get_all(c,
search_opts={'name': 'woot.*'})
instance_ids = [instance.id for instance in instances]
self.assertEqual(len(instances), 1)
self.assertTrue(instance_id1 in instance_ids)
instances = self.compute_api.get_all(c,
search_opts={'name': '.*oot.*'})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id1 in instance_ids)
self.assertTrue(instance_id3 in instance_ids)
instances = self.compute_api.get_all(c,
search_opts={'name': 'n.*'})
self.assertEqual(len(instances), 1)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id3 in instance_ids)
instances = self.compute_api.get_all(c,
search_opts={'name': 'noth.*'})
self.assertEqual(len(instances), 0)
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
def test_get_all_by_instance_name_regexp(self):
"""Test searching instances by name"""
self.flags(instance_name_template='instance-%d')
c = context.get_admin_context()
instance_id1 = self._create_instance()
instance_id2 = self._create_instance({'id': 2})
instance_id3 = self._create_instance({'id': 10})
instances = self.compute_api.get_all(c,
search_opts={'instance_name': 'instance.*'})
self.assertEqual(len(instances), 3)
instances = self.compute_api.get_all(c,
search_opts={'instance_name': '.*\-\d$'})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id1 in instance_ids)
self.assertTrue(instance_id2 in instance_ids)
instances = self.compute_api.get_all(c,
search_opts={'instance_name': 'i.*2'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id2)
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
def test_get_by_fixed_ip(self):
"""Test getting 1 instance by Fixed IP"""
c = context.get_admin_context()
instance_id1 = self._create_instance()
instance_id2 = self._create_instance({'id': 20})
instance_id3 = self._create_instance({'id': 30})
vif_ref1 = db.virtual_interface_create(c,
{'address': '12:34:56:78:90:12',
'instance_id': instance_id1,
'network_id': 1})
vif_ref2 = db.virtual_interface_create(c,
{'address': '90:12:34:56:78:90',
'instance_id': instance_id2,
'network_id': 1})
db.fixed_ip_create(c,
{'address': '1.1.1.1',
'instance_id': instance_id1,
'virtual_interface_id': vif_ref1['id']})
db.fixed_ip_create(c,
{'address': '1.1.2.1',
'instance_id': instance_id2,
'virtual_interface_id': vif_ref2['id']})
# regex not allowed
instances = self.compute_api.get_all(c,
search_opts={'fixed_ip': '.*'})
self.assertEqual(len(instances), 0)
instances = self.compute_api.get_all(c,
search_opts={'fixed_ip': '1.1.3.1'})
self.assertEqual(len(instances), 0)
instances = self.compute_api.get_all(c,
search_opts={'fixed_ip': '1.1.1.1'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id1)
instances = self.compute_api.get_all(c,
search_opts={'fixed_ip': '1.1.2.1'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id2)
db.virtual_interface_delete(c, vif_ref1['id'])
db.virtual_interface_delete(c, vif_ref2['id'])
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
def test_get_all_by_ip_regexp(self):
"""Test searching by Floating and Fixed IP"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'display_name': 'woot'})
instance_id2 = self._create_instance({
'display_name': 'woo',
'id': 20})
instance_id3 = self._create_instance({
'display_name': 'not-woot',
'id': 30})
vif_ref1 = db.virtual_interface_create(c,
{'address': '12:34:56:78:90:12',
'instance_id': instance_id1,
'network_id': 1})
vif_ref2 = db.virtual_interface_create(c,
{'address': '90:12:34:56:78:90',
'instance_id': instance_id2,
'network_id': 1})
vif_ref3 = db.virtual_interface_create(c,
{'address': '34:56:78:90:12:34',
'instance_id': instance_id3,
'network_id': 1})
db.fixed_ip_create(c,
{'address': '1.1.1.1',
'instance_id': instance_id1,
'virtual_interface_id': vif_ref1['id']})
db.fixed_ip_create(c,
{'address': '1.1.2.1',
'instance_id': instance_id2,
'virtual_interface_id': vif_ref2['id']})
fix_addr = db.fixed_ip_create(c,
{'address': '1.1.3.1',
'instance_id': instance_id3,
'virtual_interface_id': vif_ref3['id']})
fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
flo_ref = db.floating_ip_create(c,
{'address': '10.0.0.2',
'fixed_ip_id': fix_ref['id']})
# ends up matching 2nd octet here.. so all 3 match
instances = self.compute_api.get_all(c,
search_opts={'ip': '.*\.1'})
self.assertEqual(len(instances), 3)
instances = self.compute_api.get_all(c,
search_opts={'ip': '1.*'})
self.assertEqual(len(instances), 3)
instances = self.compute_api.get_all(c,
search_opts={'ip': '.*\.1.\d+$'})
self.assertEqual(len(instances), 1)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id1 in instance_ids)
instances = self.compute_api.get_all(c,
search_opts={'ip': '.*\.2.+'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id2)
instances = self.compute_api.get_all(c,
search_opts={'ip': '10.*'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id3)
db.virtual_interface_delete(c, vif_ref1['id'])
db.virtual_interface_delete(c, vif_ref2['id'])
db.virtual_interface_delete(c, vif_ref3['id'])
db.floating_ip_destroy(c, '10.0.0.2')
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
def test_get_all_by_ipv6_regexp(self):
"""Test searching by IPv6 address"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'display_name': 'woot'})
instance_id2 = self._create_instance({
'display_name': 'woo',
'id': 20})
instance_id3 = self._create_instance({
'display_name': 'not-woot',
'id': 30})
vif_ref1 = db.virtual_interface_create(c,
{'address': '12:34:56:78:90:12',
'instance_id': instance_id1,
'network_id': 1})
vif_ref2 = db.virtual_interface_create(c,
{'address': '90:12:34:56:78:90',
'instance_id': instance_id2,
'network_id': 1})
vif_ref3 = db.virtual_interface_create(c,
{'address': '34:56:78:90:12:34',
'instance_id': instance_id3,
'network_id': 1})
# This will create IPv6 addresses of:
# 1: fd00::1034:56ff:fe78:9012
# 20: fd00::9212:34ff:fe56:7890
# 30: fd00::3656:78ff:fe90:1234
instances = self.compute_api.get_all(c,
search_opts={'ip6': '.*1034.*'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id1)
instances = self.compute_api.get_all(c,
search_opts={'ip6': '^fd00.*'})
self.assertEqual(len(instances), 3)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id1 in instance_ids)
self.assertTrue(instance_id2 in instance_ids)
self.assertTrue(instance_id3 in instance_ids)
instances = self.compute_api.get_all(c,
search_opts={'ip6': '^.*12.*34.*'})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id2 in instance_ids)
self.assertTrue(instance_id3 in instance_ids)
db.virtual_interface_delete(c, vif_ref1['id'])
db.virtual_interface_delete(c, vif_ref2['id'])
db.virtual_interface_delete(c, vif_ref3['id'])
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
def test_get_all_by_multiple_options_at_once(self):
"""Test searching by multiple options at once"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'display_name': 'woot'})
instance_id2 = self._create_instance({
'display_name': 'woo',
'id': 20})
instance_id3 = self._create_instance({
'display_name': 'not-woot',
'id': 30})
vif_ref1 = db.virtual_interface_create(c,
{'address': '12:34:56:78:90:12',
'instance_id': instance_id1,
'network_id': 1})
vif_ref2 = db.virtual_interface_create(c,
{'address': '90:12:34:56:78:90',
'instance_id': instance_id2,
'network_id': 1})
vif_ref3 = db.virtual_interface_create(c,
{'address': '34:56:78:90:12:34',
'instance_id': instance_id3,
'network_id': 1})
db.fixed_ip_create(c,
{'address': '1.1.1.1',
'instance_id': instance_id1,
'virtual_interface_id': vif_ref1['id']})
db.fixed_ip_create(c,
{'address': '1.1.2.1',
'instance_id': instance_id2,
'virtual_interface_id': vif_ref2['id']})
fix_addr = db.fixed_ip_create(c,
{'address': '1.1.3.1',
'instance_id': instance_id3,
'virtual_interface_id': vif_ref3['id']})
fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
flo_ref = db.floating_ip_create(c,
{'address': '10.0.0.2',
'fixed_ip_id': fix_ref['id']})
# ip ends up matching 2nd octet here.. so all 3 match ip
# but 'name' only matches one
instances = self.compute_api.get_all(c,
search_opts={'ip': '.*\.1', 'name': 'not.*'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id3)
# ip ends up matching any ip with a '2' in it.. so instance
# 2 and 3.. but name should only match #2
# but 'name' only matches one
instances = self.compute_api.get_all(c,
search_opts={'ip': '.*2', 'name': '^woo.*'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id2)
# same as above but no match on name (name matches instance_id1
# but the ip query doesn't
instances = self.compute_api.get_all(c,
search_opts={'ip': '.*2.*', 'name': '^woot.*'})
self.assertEqual(len(instances), 0)
# ip matches all 3... ipv6 matches #2+#3...name matches #3
instances = self.compute_api.get_all(c,
search_opts={'ip': '.*\.1',
'name': 'not.*',
'ip6': '^.*12.*34.*'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id3)
db.virtual_interface_delete(c, vif_ref1['id'])
db.virtual_interface_delete(c, vif_ref2['id'])
db.virtual_interface_delete(c, vif_ref3['id'])
db.floating_ip_destroy(c, '10.0.0.2')
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
def test_get_all_by_image(self):
"""Test searching instances by image"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'image_ref': '1234'})
instance_id2 = self._create_instance({
'id': 2,
'image_ref': '4567'})
instance_id3 = self._create_instance({
'id': 10,
'image_ref': '4567'})
instances = self.compute_api.get_all(c,
search_opts={'image': '123'})
self.assertEqual(len(instances), 0)
instances = self.compute_api.get_all(c,
search_opts={'image': '1234'})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id1)
instances = self.compute_api.get_all(c,
search_opts={'image': '4567'})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id2 in instance_ids)
self.assertTrue(instance_id3 in instance_ids)
# Test passing a list as search arg
instances = self.compute_api.get_all(c,
search_opts={'image': ['1234', '4567']})
self.assertEqual(len(instances), 3)
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
def test_get_all_by_flavor(self):
"""Test searching instances by image"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'instance_type_id': 1})
instance_id2 = self._create_instance({
'id': 2,
'instance_type_id': 2})
instance_id3 = self._create_instance({
'id': 10,
'instance_type_id': 2})
# NOTE(comstud): Migrations set up the instance_types table
# for us. Therefore, we assume the following is true for
# these tests:
# instance_type_id 1 == flavor 3
# instance_type_id 2 == flavor 1
# instance_type_id 3 == flavor 4
# instance_type_id 4 == flavor 5
# instance_type_id 5 == flavor 2
instances = self.compute_api.get_all(c,
search_opts={'flavor': 5})
self.assertEqual(len(instances), 0)
self.assertRaises(exception.FlavorNotFound,
self.compute_api.get_all,
c, search_opts={'flavor': 99})
instances = self.compute_api.get_all(c,
search_opts={'flavor': 3})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id1)
instances = self.compute_api.get_all(c,
search_opts={'flavor': 1})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id2 in instance_ids)
self.assertTrue(instance_id3 in instance_ids)
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
def test_get_all_by_state(self):
"""Test searching instances by state"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'state': power_state.SHUTDOWN})
instance_id2 = self._create_instance({
'id': 2,
'state': power_state.RUNNING})
instance_id3 = self._create_instance({
'id': 10,
'state': power_state.RUNNING})
instances = self.compute_api.get_all(c,
search_opts={'state': power_state.SUSPENDED})
self.assertEqual(len(instances), 0)
instances = self.compute_api.get_all(c,
search_opts={'state': power_state.SHUTDOWN})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id1)
instances = self.compute_api.get_all(c,
search_opts={'state': power_state.RUNNING})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id2 in instance_ids)
self.assertTrue(instance_id3 in instance_ids)
# Test passing a list as search arg
instances = self.compute_api.get_all(c,
search_opts={'state': [power_state.SHUTDOWN,
power_state.RUNNING]})
self.assertEqual(len(instances), 3)
db.instance_destroy(c, instance_id1)
db.instance_destroy(c, instance_id2)
db.instance_destroy(c, instance_id3)
@staticmethod
def _parse_db_block_device_mapping(bdm_ref):
attr_list = ('delete_on_termination', 'device_name', 'no_device',
+5 -1
View File
@@ -43,17 +43,21 @@ class MetadataTestCase(test.TestCase):
'reservation_id': 'r-xxxxxxxx',
'user_data': '',
'image_ref': 7,
'fixed_ips': [],
'root_device_name': '/dev/sda1',
'hostname': 'test'})
def instance_get(*args, **kwargs):
return self.instance
def instance_get_list(*args, **kwargs):
return [self.instance]
def floating_get(*args, **kwargs):
return '99.99.99.99'
self.stubs.Set(api, 'instance_get', instance_get)
self.stubs.Set(api, 'fixed_ip_get_instance', instance_get)
self.stubs.Set(api, 'instance_get_all_by_filters', instance_get_list)
self.stubs.Set(api, 'instance_get_floating_address', floating_get)
self.app = metadatarequesthandler.MetadataRequestHandler()