Fixed bugs in bug fix (plugin call)

Checked for pep8 errors

Tested in several 'live' failure scenarios
This commit is contained in:
Salvatore Orlando
2011-03-15 17:24:16 +00:00
25 changed files with 2830 additions and 29 deletions
+88
View File
@@ -560,6 +560,40 @@ class NetworkCommands(object):
db.network_delete_safe(context.get_admin_context(), network.id)
class VmCommands(object):
"""Class for mangaging VM instances."""
def live_migration(self, ec2_id, dest):
"""Migrates a running instance to a new machine.
:param ec2_id: instance id which comes from euca-describe-instance.
:param dest: destination host name.
"""
ctxt = context.get_admin_context()
instance_id = ec2_id_to_id(ec2_id)
if FLAGS.connection_type != 'libvirt':
msg = _('Only KVM is supported for now. Sorry!')
raise exception.Error(msg)
if (FLAGS.volume_driver != 'nova.volume.driver.AOEDriver' and \
FLAGS.volume_driver != 'nova.volume.driver.ISCSIDriver'):
msg = _("Support only AOEDriver and ISCSIDriver. Sorry!")
raise exception.Error(msg)
rpc.call(ctxt,
FLAGS.scheduler_topic,
{"method": "live_migration",
"args": {"instance_id": instance_id,
"dest": dest,
"topic": FLAGS.compute_topic}})
print _('Migration of %s initiated.'
'Check its progress using euca-describe-instances.') % ec2_id
class ServiceCommands(object):
"""Enable and disable running services"""
@@ -604,6 +638,59 @@ class ServiceCommands(object):
return
db.service_update(ctxt, svc['id'], {'disabled': True})
def describe_resource(self, host):
"""Describes cpu/memory/hdd info for host.
:param host: hostname.
"""
result = rpc.call(context.get_admin_context(),
FLAGS.scheduler_topic,
{"method": "show_host_resources",
"args": {"host": host}})
if type(result) != dict:
print _('An unexpected error has occurred.')
print _('[Result]'), result
else:
cpu = result['resource']['vcpus']
mem = result['resource']['memory_mb']
hdd = result['resource']['local_gb']
cpu_u = result['resource']['vcpus_used']
mem_u = result['resource']['memory_mb_used']
hdd_u = result['resource']['local_gb_used']
print 'HOST\t\t\tPROJECT\t\tcpu\tmem(mb)\tdisk(gb)'
print '%s(total)\t\t\t%s\t%s\t%s' % (host, cpu, mem, hdd)
print '%s(used)\t\t\t%s\t%s\t%s' % (host, cpu_u, mem_u, hdd_u)
for p_id, val in result['usage'].items():
print '%s\t\t%s\t\t%s\t%s\t%s' % (host,
p_id,
val['vcpus'],
val['memory_mb'],
val['local_gb'])
def update_resource(self, host):
"""Updates available vcpu/memory/disk info for host.
:param host: hostname.
"""
ctxt = context.get_admin_context()
service_refs = db.service_get_all_by_host(ctxt, host)
if len(service_refs) <= 0:
raise exception.Invalid(_('%s does not exist.') % host)
service_refs = [s for s in service_refs if s['topic'] == 'compute']
if len(service_refs) <= 0:
raise exception.Invalid(_('%s is not compute node.') % host)
rpc.call(ctxt,
db.queue_get_for(ctxt, FLAGS.compute_topic, host),
{"method": "update_available_resource"})
class LogCommands(object):
def request(self, request_id, logfile='/var/log/nova.log'):
@@ -908,6 +995,7 @@ CATEGORIES = [
('fixed', FixedIpCommands),
('floating', FloatingIpCommands),
('network', NetworkCommands),
('vm', VmCommands),
('service', ServiceCommands),
('log', LogCommands),
('db', DbCommands),
+1
View File
@@ -76,6 +76,7 @@ if [ "$CMD" == "install" ]; then
sudo apt-get install -y python-migrate python-eventlet python-gflags python-ipy python-tempita
sudo apt-get install -y python-libvirt python-libxml2 python-routes python-cheetah
sudo apt-get install -y python-netaddr python-paste python-pastedeploy python-glance
sudo apt-get install -y python-multiprocessing
if [ "$USE_IPV6" == 1 ]; then
sudo apt-get install -y radvd
+7 -4
View File
@@ -147,8 +147,6 @@ class CloudController(object):
instance_ref['id'])
ec2_id = ec2utils.id_to_ec2_id(instance_ref['id'])
image_ec2_id = self._image_ec2_id(instance_ref['image_id'], 'machine')
k_ec2_id = self._image_ec2_id(instance_ref['kernel_id'], 'kernel')
r_ec2_id = self._image_ec2_id(instance_ref['ramdisk_id'], 'ramdisk')
data = {
'user-data': base64.b64decode(instance_ref['user_data']),
'meta-data': {
@@ -167,8 +165,6 @@ class CloudController(object):
'instance-type': instance_ref['instance_type'],
'local-hostname': hostname,
'local-ipv4': address,
'kernel-id': k_ec2_id,
'ramdisk-id': r_ec2_id,
'placement': {'availability-zone': availability_zone},
'public-hostname': hostname,
'public-ipv4': floating_ip or '',
@@ -176,6 +172,13 @@ class CloudController(object):
'reservation-id': instance_ref['reservation_id'],
'security-groups': '',
'mpi': mpi}}
for image_type in ['kernel', 'ramdisk']:
if '%s_id' % image_type in instance_ref:
ec2_id = self._image_ec2_id(instance_ref['%s_id' % image_type],
image_type)
data['meta-data']['%s-id' % image_type] = ec2_id
if False: # TODO(vish): store ancestor ids
data['ancestor-ami-ids'] = []
if False: # TODO(vish): store product codes
+252 -1
View File
@@ -36,9 +36,12 @@ terminating it.
import base64
import datetime
import os
import random
import string
import socket
import tempfile
import time
import functools
from nova import exception
@@ -61,6 +64,9 @@ flags.DEFINE_integer('password_length', 12,
flags.DEFINE_string('console_host', socket.gethostname(),
'Console proxy host to use to connect to instances on'
'this host.')
flags.DEFINE_integer('live_migration_retry_count', 30,
("Retry count needed in live_migration."
" sleep 1 sec for each count"))
LOG = logging.getLogger('nova.compute.manager')
@@ -181,7 +187,7 @@ class ComputeManager(manager.Manager):
context=context)
self.db.instance_update(context,
instance_id,
{'host': self.host})
{'host': self.host, 'launched_on': self.host})
self.db.instance_set_state(context,
instance_id,
@@ -723,3 +729,248 @@ class ComputeManager(manager.Manager):
self.volume_manager.remove_compute_volume(context, volume_id)
self.db.volume_detached(context, volume_id)
return True
@exception.wrap_exception
def compare_cpu(self, context, cpu_info):
"""Checks the host cpu is compatible to a cpu given by xml.
:param context: security context
:param cpu_info: json string obtained from virConnect.getCapabilities
:returns: See driver.compare_cpu
"""
return self.driver.compare_cpu(cpu_info)
@exception.wrap_exception
def create_shared_storage_test_file(self, context):
"""Makes tmpfile under FLAGS.instance_path.
This method enables compute nodes to recognize that they mounts
same shared storage. (create|check|creanup)_shared_storage_test_file()
is a pair.
:param context: security context
:returns: tmpfile name(basename)
"""
dirpath = FLAGS.instances_path
fd, tmp_file = tempfile.mkstemp(dir=dirpath)
LOG.debug(_("Creating tmpfile %s to notify to other "
"compute nodes that they should mount "
"the same storage.") % tmp_file)
os.close(fd)
return os.path.basename(tmp_file)
@exception.wrap_exception
def check_shared_storage_test_file(self, context, filename):
"""Confirms existence of the tmpfile under FLAGS.instances_path.
:param context: security context
:param filename: confirm existence of FLAGS.instances_path/thisfile
"""
tmp_file = os.path.join(FLAGS.instances_path, filename)
if not os.path.exists(tmp_file):
raise exception.NotFound(_('%s not found') % tmp_file)
@exception.wrap_exception
def cleanup_shared_storage_test_file(self, context, filename):
"""Removes existence of the tmpfile under FLAGS.instances_path.
:param context: security context
:param filename: remove existence of FLAGS.instances_path/thisfile
"""
tmp_file = os.path.join(FLAGS.instances_path, filename)
os.remove(tmp_file)
@exception.wrap_exception
def update_available_resource(self, context):
"""See comments update_resource_info.
:param context: security context
:returns: See driver.update_available_resource()
"""
return self.driver.update_available_resource(context, self.host)
def pre_live_migration(self, context, instance_id):
"""Preparations for live migration at dest host.
:param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
"""
# Getting instance info
instance_ref = self.db.instance_get(context, instance_id)
ec2_id = instance_ref['hostname']
# Getting fixed ips
fixed_ip = self.db.instance_get_fixed_address(context, instance_id)
if not fixed_ip:
msg = _("%(instance_id)s(%(ec2_id)s) does not have fixed_ip.")
raise exception.NotFound(msg % locals())
# If any volume is mounted, prepare here.
if not instance_ref['volumes']:
LOG.info(_("%s has no volume."), ec2_id)
else:
for v in instance_ref['volumes']:
self.volume_manager.setup_compute_volume(context, v['id'])
# Bridge settings.
# Call this method prior to ensure_filtering_rules_for_instance,
# since bridge is not set up, ensure_filtering_rules_for instance
# fails.
#
# Retry operation is necessary because continuously request comes,
# concorrent request occurs to iptables, then it complains.
max_retry = FLAGS.live_migration_retry_count
for cnt in range(max_retry):
try:
self.network_manager.setup_compute_network(context,
instance_id)
break
except exception.ProcessExecutionError:
if cnt == max_retry - 1:
raise
else:
LOG.warn(_("setup_compute_network() failed %(cnt)d."
"Retry up to %(max_retry)d for %(ec2_id)s.")
% locals())
time.sleep(1)
# Creating filters to hypervisors and firewalls.
# An example is that nova-instance-instance-xxx,
# which is written to libvirt.xml(Check "virsh nwfilter-list")
# This nwfilter is necessary on the destination host.
# In addition, this method is creating filtering rule
# onto destination host.
self.driver.ensure_filtering_rules_for_instance(instance_ref)
def live_migration(self, context, instance_id, dest):
"""Executing live migration.
:param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param dest: destination host
"""
# Get instance for error handling.
instance_ref = self.db.instance_get(context, instance_id)
i_name = instance_ref.name
try:
# Checking volume node is working correctly when any volumes
# are attached to instances.
if instance_ref['volumes']:
rpc.call(context,
FLAGS.volume_topic,
{"method": "check_for_export",
"args": {'instance_id': instance_id}})
# Asking dest host to preparing live migration.
rpc.call(context,
self.db.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": "pre_live_migration",
"args": {'instance_id': instance_id}})
except Exception:
msg = _("Pre live migration for %(i_name)s failed at %(dest)s")
LOG.error(msg % locals())
self.recover_live_migration(context, instance_ref)
raise
# Executing live migration
# live_migration might raises exceptions, but
# nothing must be recovered in this version.
self.driver.live_migration(context, instance_ref, dest,
self.post_live_migration,
self.recover_live_migration)
def post_live_migration(self, ctxt, instance_ref, dest):
"""Post operations for live migration.
This method is called from live_migration
and mainly updating database record.
:param ctxt: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param dest: destination host
"""
LOG.info(_('post_live_migration() is started..'))
instance_id = instance_ref['id']
# Detaching volumes.
try:
for vol in self.db.volume_get_all_by_instance(ctxt, instance_id):
self.volume_manager.remove_compute_volume(ctxt, vol['id'])
except exception.NotFound:
pass
# Releasing vlan.
# (not necessary in current implementation?)
# Releasing security group ingress rule.
self.driver.unfilter_instance(instance_ref)
# Database updating.
i_name = instance_ref.name
try:
# Not return if floating_ip is not found, otherwise,
# instance never be accessible..
floating_ip = self.db.instance_get_floating_address(ctxt,
instance_id)
if not floating_ip:
LOG.info(_('No floating_ip is found for %s.'), i_name)
else:
floating_ip_ref = self.db.floating_ip_get_by_address(ctxt,
floating_ip)
self.db.floating_ip_update(ctxt,
floating_ip_ref['address'],
{'host': dest})
except exception.NotFound:
LOG.info(_('No floating_ip is found for %s.'), i_name)
except:
LOG.error(_("Live migration: Unexpected error:"
"%s cannot inherit floating ip..") % i_name)
# Restore instance/volume state
self.recover_live_migration(ctxt, instance_ref, dest)
LOG.info(_('Migrating %(i_name)s to %(dest)s finished successfully.')
% locals())
LOG.info(_("You may see the error \"libvirt: QEMU error: "
"Domain not found: no domain with matching name.\" "
"This error can be safely ignored."))
def recover_live_migration(self, ctxt, instance_ref, host=None):
"""Recovers Instance/volume state from migrating -> running.
:param ctxt: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param host:
DB column value is updated by this hostname.
if none, the host instance currently running is selected.
"""
if not host:
host = instance_ref['host']
self.db.instance_update(ctxt,
instance_ref['id'],
{'state_description': 'running',
'state': power_state.RUNNING,
'host': host})
for volume in instance_ref['volumes']:
self.db.volume_update(ctxt, volume['id'], {'status': 'in-use'})
+59
View File
@@ -104,6 +104,11 @@ def service_get_all_by_host(context, host):
return IMPL.service_get_all_by_host(context, host)
def service_get_all_compute_by_host(context, host):
"""Get all compute services for a given host."""
return IMPL.service_get_all_compute_by_host(context, host)
def service_get_all_compute_sorted(context):
"""Get all compute services sorted by instance count.
@@ -153,6 +158,29 @@ def service_update(context, service_id, values):
###################
def compute_node_get(context, compute_id, session=None):
"""Get an computeNode or raise if it does not exist."""
return IMPL.compute_node_get(context, compute_id)
def compute_node_create(context, values):
"""Create a computeNode from the values dictionary."""
return IMPL.compute_node_create(context, values)
def compute_node_update(context, compute_id, values):
"""Set the given properties on an computeNode and update it.
Raises NotFound if computeNode does not exist.
"""
return IMPL.compute_node_update(context, compute_id, values)
###################
def certificate_create(context, values):
"""Create a certificate from the values dictionary."""
return IMPL.certificate_create(context, values)
@@ -257,6 +285,11 @@ def floating_ip_get_by_address(context, address):
return IMPL.floating_ip_get_by_address(context, address)
def floating_ip_update(context, address, values):
"""Update a floating ip by address or raise if it doesn't exist."""
return IMPL.floating_ip_update(context, address, values)
####################
def migration_update(context, id, values):
@@ -441,6 +474,27 @@ def instance_add_security_group(context, instance_id, security_group_id):
security_group_id)
def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id):
"""Get instances.vcpus by host and project."""
return IMPL.instance_get_vcpu_sum_by_host_and_project(context,
hostname,
proj_id)
def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id):
"""Get amount of memory by host and project."""
return IMPL.instance_get_memory_sum_by_host_and_project(context,
hostname,
proj_id)
def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id):
"""Get total amount of disk by host and project."""
return IMPL.instance_get_disk_sum_by_host_and_project(context,
hostname,
proj_id)
def instance_action_create(context, values):
"""Create an instance action from the values dictionary."""
return IMPL.instance_action_create(context, values)
@@ -765,6 +819,11 @@ def volume_get_all_by_host(context, host):
return IMPL.volume_get_all_by_host(context, host)
def volume_get_all_by_instance(context, instance_id):
"""Get all volumes belonging to a instance."""
return IMPL.volume_get_all_by_instance(context, instance_id)
def volume_get_all_by_project(context, project_id):
"""Get all volumes belonging to a project."""
return IMPL.volume_get_all_by_project(context, project_id)
+121
View File
@@ -119,6 +119,11 @@ def service_destroy(context, service_id):
service_ref = service_get(context, service_id, session=session)
service_ref.delete(session=session)
if service_ref.topic == 'compute' and \
len(service_ref.compute_node) != 0:
for c in service_ref.compute_node:
c.delete(session=session)
@require_admin_context
def service_get(context, service_id, session=None):
@@ -126,6 +131,7 @@ def service_get(context, service_id, session=None):
session = get_session()
result = session.query(models.Service).\
options(joinedload('compute_node')).\
filter_by(id=service_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
@@ -175,6 +181,24 @@ def service_get_all_by_host(context, host):
all()
@require_admin_context
def service_get_all_compute_by_host(context, host):
topic = 'compute'
session = get_session()
result = session.query(models.Service).\
options(joinedload('compute_node')).\
filter_by(deleted=False).\
filter_by(host=host).\
filter_by(topic=topic).\
all()
if not result:
raise exception.NotFound(_("%s does not exist or is not "
"a compute node.") % host)
return result
@require_admin_context
def _service_get_all_topic_subquery(context, session, topic, subq, label):
sort_value = getattr(subq.c, label)
@@ -285,6 +309,42 @@ def service_update(context, service_id, values):
###################
@require_admin_context
def compute_node_get(context, compute_id, session=None):
if not session:
session = get_session()
result = session.query(models.ComputeNode).\
filter_by(id=compute_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
if not result:
raise exception.NotFound(_('No computeNode for id %s') % compute_id)
return result
@require_admin_context
def compute_node_create(context, values):
compute_node_ref = models.ComputeNode()
compute_node_ref.update(values)
compute_node_ref.save()
return compute_node_ref
@require_admin_context
def compute_node_update(context, compute_id, values):
session = get_session()
with session.begin():
compute_ref = compute_node_get(context, compute_id, session=session)
compute_ref.update(values)
compute_ref.save(session=session)
###################
@require_admin_context
def certificate_get(context, certificate_id, session=None):
if not session:
@@ -506,6 +566,16 @@ def floating_ip_get_by_address(context, address, session=None):
return result
@require_context
def floating_ip_update(context, address, values):
session = get_session()
with session.begin():
floating_ip_ref = floating_ip_get_by_address(context, address, session)
for (key, value) in values.iteritems():
floating_ip_ref[key] = value
floating_ip_ref.save(session=session)
###################
@@ -906,6 +976,45 @@ def instance_add_security_group(context, instance_id, security_group_id):
instance_ref.save(session=session)
@require_context
def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id):
session = get_session()
result = session.query(models.Instance).\
filter_by(host=hostname).\
filter_by(project_id=proj_id).\
filter_by(deleted=False).\
value(func.sum(models.Instance.vcpus))
if not result:
return 0
return result
@require_context
def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id):
session = get_session()
result = session.query(models.Instance).\
filter_by(host=hostname).\
filter_by(project_id=proj_id).\
filter_by(deleted=False).\
value(func.sum(models.Instance.memory_mb))
if not result:
return 0
return result
@require_context
def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id):
session = get_session()
result = session.query(models.Instance).\
filter_by(host=hostname).\
filter_by(project_id=proj_id).\
filter_by(deleted=False).\
value(func.sum(models.Instance.local_gb))
if not result:
return 0
return result
@require_context
def instance_action_create(context, values):
"""Create an instance action from the values dictionary."""
@@ -1530,6 +1639,18 @@ def volume_get_all_by_host(context, host):
all()
@require_admin_context
def volume_get_all_by_instance(context, instance_id):
session = get_session()
result = session.query(models.Volume).\
filter_by(instance_id=instance_id).\
filter_by(deleted=False).\
all()
if not result:
raise exception.NotFound(_('No volume for instance %s') % instance_id)
return result
@require_context
def volume_get_all_by_project(context, project_id):
authorize_project_context(context, project_id)
@@ -0,0 +1,83 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate import *
from nova import log as logging
from sqlalchemy import *
meta = MetaData()
instances = Table('instances', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)
#
# New Tables
#
compute_nodes = Table('compute_nodes', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('service_id', Integer(), nullable=False),
Column('vcpus', Integer(), nullable=False),
Column('memory_mb', Integer(), nullable=False),
Column('local_gb', Integer(), nullable=False),
Column('vcpus_used', Integer(), nullable=False),
Column('memory_mb_used', Integer(), nullable=False),
Column('local_gb_used', Integer(), nullable=False),
Column('hypervisor_type',
Text(convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=False),
Column('hypervisor_version', Integer(), nullable=False),
Column('cpu_info',
Text(convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=False),
)
#
# Tables to alter
#
instances_launched_on = Column(
'launched_on',
Text(convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=True)
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine;
# bind migrate_engine to your metadata
meta.bind = migrate_engine
try:
compute_nodes.create()
except Exception:
logging.info(repr(compute_nodes))
logging.exception('Exception while creating table')
meta.drop_all(tables=[compute_nodes])
raise
instances.create_column(instances_launched_on)
+38
View File
@@ -113,6 +113,41 @@ class Service(BASE, NovaBase):
availability_zone = Column(String(255), default='nova')
class ComputeNode(BASE, NovaBase):
"""Represents a running compute service on a host."""
__tablename__ = 'compute_nodes'
id = Column(Integer, primary_key=True)
service_id = Column(Integer, ForeignKey('services.id'), nullable=True)
service = relationship(Service,
backref=backref('compute_node'),
foreign_keys=service_id,
primaryjoin='and_('
'ComputeNode.service_id == Service.id,'
'ComputeNode.deleted == False)')
vcpus = Column(Integer, nullable=True)
memory_mb = Column(Integer, nullable=True)
local_gb = Column(Integer, nullable=True)
vcpus_used = Column(Integer, nullable=True)
memory_mb_used = Column(Integer, nullable=True)
local_gb_used = Column(Integer, nullable=True)
hypervisor_type = Column(Text, nullable=True)
hypervisor_version = Column(Integer, nullable=True)
# Note(masumotok): Expected Strings example:
#
# '{"arch":"x86_64",
# "model":"Nehalem",
# "topology":{"sockets":1, "threads":2, "cores":3},
# "features":["tdtscp", "xtpr"]}'
#
# Points are "json translatable" and it must have all dictionary keys
# above, since it is copied from <cpu> tag of getCapabilities()
# (See libvirt.virtConnection).
cpu_info = Column(Text, nullable=True)
class Certificate(BASE, NovaBase):
"""Represents a an x509 certificate"""
__tablename__ = 'certificates'
@@ -191,6 +226,9 @@ class Instance(BASE, NovaBase):
display_name = Column(String(255))
display_description = Column(String(255))
# To remember on which host a instance booted.
# An instance may have moved to another host by live migraiton.
launched_on = Column(Text)
locked = Column(Boolean)
os_type = Column(String(255))
+237
View File
@@ -26,10 +26,14 @@ import datetime
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova.compute import power_state
FLAGS = flags.FLAGS
flags.DEFINE_integer('service_down_time', 60,
'maximum time since last checkin for up service')
flags.DECLARE('instances_path', 'nova.compute.manager')
class NoValidHost(exception.Error):
@@ -64,3 +68,236 @@ class Scheduler(object):
def schedule(self, context, topic, *_args, **_kwargs):
"""Must override at least this method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
def schedule_live_migration(self, context, instance_id, dest):
"""Live migration scheduling method.
:param context:
:param instance_id:
:param dest: destination host
:return:
The host where instance is running currently.
Then scheduler send request that host.
"""
# Whether instance exists and is running.
instance_ref = db.instance_get(context, instance_id)
# Checking instance.
self._live_migration_src_check(context, instance_ref)
# Checking destination host.
self._live_migration_dest_check(context, instance_ref, dest)
# Common checking.
self._live_migration_common_check(context, instance_ref, dest)
# Changing instance_state.
db.instance_set_state(context,
instance_id,
power_state.PAUSED,
'migrating')
# Changing volume state
for volume_ref in instance_ref['volumes']:
db.volume_update(context,
volume_ref['id'],
{'status': 'migrating'})
# Return value is necessary to send request to src
# Check _schedule() in detail.
src = instance_ref['host']
return src
def _live_migration_src_check(self, context, instance_ref):
"""Live migration check routine (for src host).
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
"""
# Checking instance is running.
if (power_state.RUNNING != instance_ref['state'] or \
'running' != instance_ref['state_description']):
ec2_id = instance_ref['hostname']
raise exception.Invalid(_('Instance(%s) is not running') % ec2_id)
# Checing volume node is running when any volumes are mounted
# to the instance.
if len(instance_ref['volumes']) != 0:
services = db.service_get_all_by_topic(context, 'volume')
if len(services) < 1 or not self.service_is_up(services[0]):
raise exception.Invalid(_("volume node is not alive"
"(time synchronize problem?)"))
# Checking src host exists and compute node
src = instance_ref['host']
services = db.service_get_all_compute_by_host(context, src)
# Checking src host is alive.
if not self.service_is_up(services[0]):
raise exception.Invalid(_("%s is not alive(time "
"synchronize problem?)") % src)
def _live_migration_dest_check(self, context, instance_ref, dest):
"""Live migration check routine (for destination host).
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
"""
# Checking dest exists and compute node.
dservice_refs = db.service_get_all_compute_by_host(context, dest)
dservice_ref = dservice_refs[0]
# Checking dest host is alive.
if not self.service_is_up(dservice_ref):
raise exception.Invalid(_("%s is not alive(time "
"synchronize problem?)") % dest)
# Checking whether The host where instance is running
# and dest is not same.
src = instance_ref['host']
if dest == src:
ec2_id = instance_ref['hostname']
raise exception.Invalid(_("%(dest)s is where %(ec2_id)s is "
"running now. choose other host.")
% locals())
# Checking dst host still has enough capacities.
self.assert_compute_node_has_enough_resources(context,
instance_ref,
dest)
def _live_migration_common_check(self, context, instance_ref, dest):
"""Live migration common check routine.
Below checkings are followed by
http://wiki.libvirt.org/page/TodoPreMigrationChecks
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
"""
# Checking shared storage connectivity
self.mounted_on_same_shared_storage(context, instance_ref, dest)
# Checking dest exists.
dservice_refs = db.service_get_all_compute_by_host(context, dest)
dservice_ref = dservice_refs[0]['compute_node'][0]
# Checking original host( where instance was launched at) exists.
try:
oservice_refs = db.service_get_all_compute_by_host(context,
instance_ref['launched_on'])
except exception.NotFound:
raise exception.Invalid(_("host %s where instance was launched "
"does not exist.")
% instance_ref['launched_on'])
oservice_ref = oservice_refs[0]['compute_node'][0]
# Checking hypervisor is same.
orig_hypervisor = oservice_ref['hypervisor_type']
dest_hypervisor = dservice_ref['hypervisor_type']
if orig_hypervisor != dest_hypervisor:
raise exception.Invalid(_("Different hypervisor type"
"(%(orig_hypervisor)s->"
"%(dest_hypervisor)s)')" % locals()))
# Checkng hypervisor version.
orig_hypervisor = oservice_ref['hypervisor_version']
dest_hypervisor = dservice_ref['hypervisor_version']
if orig_hypervisor > dest_hypervisor:
raise exception.Invalid(_("Older hypervisor version"
"(%(orig_hypervisor)s->"
"%(dest_hypervisor)s)") % locals())
# Checking cpuinfo.
try:
rpc.call(context,
db.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": 'compare_cpu',
"args": {'cpu_info': oservice_ref['cpu_info']}})
except rpc.RemoteError:
src = instance_ref['host']
logging.exception(_("host %(dest)s is not compatible with "
"original host %(src)s.") % locals())
raise
def assert_compute_node_has_enough_resources(self, context,
instance_ref, dest):
"""Checks if destination host has enough resource for live migration.
Currently, only memory checking has been done.
If storage migration(block migration, meaning live-migration
without any shared storage) will be available, local storage
checking is also necessary.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
"""
# Getting instance information
ec2_id = instance_ref['hostname']
# Getting host information
service_refs = db.service_get_all_compute_by_host(context, dest)
compute_node_ref = service_refs[0]['compute_node'][0]
mem_total = int(compute_node_ref['memory_mb'])
mem_used = int(compute_node_ref['memory_mb_used'])
mem_avail = mem_total - mem_used
mem_inst = instance_ref['memory_mb']
if mem_avail <= mem_inst:
raise exception.NotEmpty(_("Unable to migrate %(ec2_id)s "
"to destination: %(dest)s "
"(host:%(mem_avail)s "
"<= instance:%(mem_inst)s)")
% locals())
def mounted_on_same_shared_storage(self, context, instance_ref, dest):
"""Check if the src and dest host mount same shared storage.
At first, dest host creates temp file, and src host can see
it if they mounts same shared storage. Then src host erase it.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
"""
src = instance_ref['host']
dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest)
src_t = db.queue_get_for(context, FLAGS.compute_topic, src)
try:
# create tmpfile at dest host
filename = rpc.call(context, dst_t,
{"method": 'create_shared_storage_test_file'})
# make sure existence at src host.
rpc.call(context, src_t,
{"method": 'check_shared_storage_test_file',
"args": {'filename': filename}})
except rpc.RemoteError:
ipath = FLAGS.instances_path
logging.error(_("Cannot confirm tmpfile at %(ipath)s is on "
"same shared storage between %(src)s "
"and %(dest)s.") % locals())
raise
finally:
rpc.call(context, dst_t,
{"method": 'cleanup_shared_storage_test_file',
"args": {'filename': filename}})
+52
View File
@@ -77,3 +77,55 @@ class SchedulerManager(manager.Manager):
{"method": method,
"args": kwargs})
LOG.debug(_("Casting to %(topic)s %(host)s for %(method)s") % locals())
# NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.
# Based on bexar design summit discussion,
# just put this here for bexar release.
def show_host_resources(self, context, host, *args):
"""Shows the physical/usage resource given by hosts.
:param context: security context
:param host: hostname
:returns:
example format is below.
{'resource':D, 'usage':{proj_id1:D, proj_id2:D}}
D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048}
"""
compute_ref = db.service_get_all_compute_by_host(context, host)
compute_ref = compute_ref[0]
# Getting physical resource information
compute_node_ref = compute_ref['compute_node'][0]
resource = {'vcpus': compute_node_ref['vcpus'],
'memory_mb': compute_node_ref['memory_mb'],
'local_gb': compute_node_ref['local_gb'],
'vcpus_used': compute_node_ref['vcpus_used'],
'memory_mb_used': compute_node_ref['memory_mb_used'],
'local_gb_used': compute_node_ref['local_gb_used']}
# Getting usage resource information
usage = {}
instance_refs = db.instance_get_all_by_host(context,
compute_ref['host'])
if not instance_refs:
return {'resource': resource, 'usage': usage}
project_ids = [i['project_id'] for i in instance_refs]
project_ids = list(set(project_ids))
for project_id in project_ids:
vcpus = db.instance_get_vcpu_sum_by_host_and_project(context,
host,
project_id)
mem = db.instance_get_memory_sum_by_host_and_project(context,
host,
project_id)
hdd = db.instance_get_disk_sum_by_host_and_project(context,
host,
project_id)
usage[project_id] = {'vcpus': int(vcpus),
'memory_mb': int(mem),
'local_gb': int(hdd)}
return {'resource': resource, 'usage': usage}
+3
View File
@@ -92,6 +92,9 @@ class Service(object):
except exception.NotFound:
self._create_service_ref(ctxt)
if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt)
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
if self.report_interval:
+294
View File
@@ -20,6 +20,7 @@ Tests For Compute
"""
import datetime
import mox
from nova import compute
from nova import context
@@ -27,15 +28,20 @@ from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova import test
from nova import utils
from nova.auth import manager
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
from nova.db.sqlalchemy import models
from nova.image import local
LOG = logging.getLogger('nova.tests.compute')
FLAGS = flags.FLAGS
flags.DECLARE('stub_network', 'nova.compute.manager')
flags.DECLARE('live_migration_retry_count', 'nova.compute.manager')
class ComputeTestCase(test.TestCase):
@@ -83,6 +89,41 @@ class ComputeTestCase(test.TestCase):
'project_id': self.project.id}
return db.security_group_create(self.context, values)
def _get_dummy_instance(self):
"""Get mock-return-value instance object
Use this when any testcase executed later than test_run_terminate
"""
vol1 = models.Volume()
vol1['id'] = 1
vol2 = models.Volume()
vol2['id'] = 2
instance_ref = models.Instance()
instance_ref['id'] = 1
instance_ref['volumes'] = [vol1, vol2]
instance_ref['hostname'] = 'i-00000001'
instance_ref['host'] = 'dummy'
return instance_ref
def test_create_instance_defaults_display_name(self):
"""Verify that an instance cannot be created without a display_name."""
cases = [dict(), dict(display_name=None)]
for instance in cases:
ref = self.compute_api.create(self.context,
FLAGS.default_instance_type, None, **instance)
try:
self.assertNotEqual(ref[0]['display_name'], None)
finally:
db.instance_destroy(self.context, ref[0]['id'])
def test_create_instance_associates_security_groups(self):
"""Make sure create associates security groups"""
group = self._create_group()
instance_ref = models.Instance()
instance_ref['id'] = 1
instance_ref['volumes'] = [{'id': 1}, {'id': 2}]
instance_ref['hostname'] = 'i-00000001'
return instance_ref
def test_create_instance_defaults_display_name(self):
"""Verify that an instance cannot be created without a display_name."""
cases = [dict(), dict(display_name=None)]
@@ -301,3 +342,256 @@ class ComputeTestCase(test.TestCase):
self.compute.terminate_instance(self.context, instance_id)
type = instance_types.get_by_flavor_id("1")
self.assertEqual(type, 'm1.tiny')
def _setup_other_managers(self):
self.volume_manager = utils.import_object(FLAGS.volume_manager)
self.network_manager = utils.import_object(FLAGS.network_manager)
self.compute_driver = utils.import_object(FLAGS.compute_driver)
def test_pre_live_migration_instance_has_no_fixed_ip(self):
"""Confirm raising exception if instance doesn't have fixed_ip."""
instance_ref = self._get_dummy_instance()
c = context.get_admin_context()
i_id = instance_ref['id']
dbmock = self.mox.CreateMock(db)
dbmock.instance_get(c, i_id).AndReturn(instance_ref)
dbmock.instance_get_fixed_address(c, i_id).AndReturn(None)
self.compute.db = dbmock
self.mox.ReplayAll()
self.assertRaises(exception.NotFound,
self.compute.pre_live_migration,
c, instance_ref['id'])
def test_pre_live_migration_instance_has_volume(self):
"""Confirm setup_compute_volume is called when volume is mounted."""
i_ref = self._get_dummy_instance()
c = context.get_admin_context()
self._setup_other_managers()
dbmock = self.mox.CreateMock(db)
volmock = self.mox.CreateMock(self.volume_manager)
netmock = self.mox.CreateMock(self.network_manager)
drivermock = self.mox.CreateMock(self.compute_driver)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy')
for i in range(len(i_ref['volumes'])):
vid = i_ref['volumes'][i]['id']
volmock.setup_compute_volume(c, vid).InAnyOrder('g1')
netmock.setup_compute_network(c, i_ref['id'])
drivermock.ensure_filtering_rules_for_instance(i_ref)
self.compute.db = dbmock
self.compute.volume_manager = volmock
self.compute.network_manager = netmock
self.compute.driver = drivermock
self.mox.ReplayAll()
ret = self.compute.pre_live_migration(c, i_ref['id'])
self.assertEqual(ret, None)
def test_pre_live_migration_instance_has_no_volume(self):
"""Confirm log meg when instance doesn't mount any volumes."""
i_ref = self._get_dummy_instance()
i_ref['volumes'] = []
c = context.get_admin_context()
self._setup_other_managers()
dbmock = self.mox.CreateMock(db)
netmock = self.mox.CreateMock(self.network_manager)
drivermock = self.mox.CreateMock(self.compute_driver)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy')
self.mox.StubOutWithMock(compute_manager.LOG, 'info')
compute_manager.LOG.info(_("%s has no volume."), i_ref['hostname'])
netmock.setup_compute_network(c, i_ref['id'])
drivermock.ensure_filtering_rules_for_instance(i_ref)
self.compute.db = dbmock
self.compute.network_manager = netmock
self.compute.driver = drivermock
self.mox.ReplayAll()
ret = self.compute.pre_live_migration(c, i_ref['id'])
self.assertEqual(ret, None)
def test_pre_live_migration_setup_compute_node_fail(self):
"""Confirm operation setup_compute_network() fails.
It retries and raise exception when timeout exceeded.
"""
i_ref = self._get_dummy_instance()
c = context.get_admin_context()
self._setup_other_managers()
dbmock = self.mox.CreateMock(db)
netmock = self.mox.CreateMock(self.network_manager)
volmock = self.mox.CreateMock(self.volume_manager)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
dbmock.instance_get_fixed_address(c, i_ref['id']).AndReturn('dummy')
for i in range(len(i_ref['volumes'])):
volmock.setup_compute_volume(c, i_ref['volumes'][i]['id'])
for i in range(FLAGS.live_migration_retry_count):
netmock.setup_compute_network(c, i_ref['id']).\
AndRaise(exception.ProcessExecutionError())
self.compute.db = dbmock
self.compute.network_manager = netmock
self.compute.volume_manager = volmock
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
self.compute.pre_live_migration,
c, i_ref['id'])
def test_live_migration_works_correctly_with_volume(self):
"""Confirm check_for_export to confirm volume health check."""
i_ref = self._get_dummy_instance()
c = context.get_admin_context()
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host'])
dbmock = self.mox.CreateMock(db)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export",
"args": {'instance_id': i_ref['id']}})
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id']}})
self.mox.StubOutWithMock(self.compute.driver, 'live_migration')
self.compute.driver.live_migration(c, i_ref, i_ref['host'],
self.compute.post_live_migration,
self.compute.recover_live_migration)
self.compute.db = dbmock
self.mox.ReplayAll()
ret = self.compute.live_migration(c, i_ref['id'], i_ref['host'])
self.assertEqual(ret, None)
def test_live_migration_dest_raises_exception(self):
"""Confirm exception when pre_live_migration fails."""
i_ref = self._get_dummy_instance()
c = context.get_admin_context()
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host'])
dbmock = self.mox.CreateMock(db)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export",
"args": {'instance_id': i_ref['id']}})
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id']}}).\
AndRaise(rpc.RemoteError('', '', ''))
dbmock.instance_update(c, i_ref['id'], {'state_description': 'running',
'state': power_state.RUNNING,
'host': i_ref['host']})
for v in i_ref['volumes']:
dbmock.volume_update(c, v['id'], {'status': 'in-use'})
self.compute.db = dbmock
self.mox.ReplayAll()
self.assertRaises(rpc.RemoteError,
self.compute.live_migration,
c, i_ref['id'], i_ref['host'])
def test_live_migration_dest_raises_exception_no_volume(self):
"""Same as above test(input pattern is different) """
i_ref = self._get_dummy_instance()
i_ref['volumes'] = []
c = context.get_admin_context()
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host'])
dbmock = self.mox.CreateMock(db)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id']}}).\
AndRaise(rpc.RemoteError('', '', ''))
dbmock.instance_update(c, i_ref['id'], {'state_description': 'running',
'state': power_state.RUNNING,
'host': i_ref['host']})
self.compute.db = dbmock
self.mox.ReplayAll()
self.assertRaises(rpc.RemoteError,
self.compute.live_migration,
c, i_ref['id'], i_ref['host'])
def test_live_migration_works_correctly_no_volume(self):
"""Confirm live_migration() works as expected correctly."""
i_ref = self._get_dummy_instance()
i_ref['volumes'] = []
c = context.get_admin_context()
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host'])
dbmock = self.mox.CreateMock(db)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
self.mox.StubOutWithMock(rpc, 'call')
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id']}})
self.mox.StubOutWithMock(self.compute.driver, 'live_migration')
self.compute.driver.live_migration(c, i_ref, i_ref['host'],
self.compute.post_live_migration,
self.compute.recover_live_migration)
self.compute.db = dbmock
self.mox.ReplayAll()
ret = self.compute.live_migration(c, i_ref['id'], i_ref['host'])
self.assertEqual(ret, None)
def test_post_live_migration_working_correctly(self):
"""Confirm post_live_migration() works as expected correctly."""
dest = 'desthost'
flo_addr = '1.2.1.2'
# Preparing datas
c = context.get_admin_context()
instance_id = self._create_instance()
i_ref = db.instance_get(c, instance_id)
db.instance_update(c, i_ref['id'], {'state_description': 'migrating',
'state': power_state.PAUSED})
v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id})
fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1',
'instance_id': instance_id})
fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
flo_ref = db.floating_ip_create(c, {'address': flo_addr,
'fixed_ip_id': fix_ref['id']})
# reload is necessary before setting mocks
i_ref = db.instance_get(c, instance_id)
# Preparing mocks
self.mox.StubOutWithMock(self.compute.volume_manager,
'remove_compute_volume')
for v in i_ref['volumes']:
self.compute.volume_manager.remove_compute_volume(c, v['id'])
self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
self.compute.driver.unfilter_instance(i_ref)
# executing
self.mox.ReplayAll()
ret = self.compute.post_live_migration(c, i_ref, dest)
# make sure every data is rewritten to dest
i_ref = db.instance_get(c, i_ref['id'])
c1 = (i_ref['host'] == dest)
flo_refs = db.floating_ip_get_all_by_host(c, dest)
c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr)
# post operaton
self.assertTrue(c1 and c2)
db.instance_destroy(c, instance_id)
db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr)
+622 -1
View File
@@ -20,10 +20,12 @@ Tests For Scheduler
"""
import datetime
import mox
from mox import IgnoreArg
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import service
from nova import test
@@ -32,11 +34,14 @@ from nova import utils
from nova.auth import manager as auth_manager
from nova.scheduler import manager
from nova.scheduler import driver
from nova.compute import power_state
from nova.db.sqlalchemy import models
FLAGS = flags.FLAGS
flags.DECLARE('max_cores', 'nova.scheduler.simple')
flags.DECLARE('stub_network', 'nova.compute.manager')
flags.DECLARE('instances_path', 'nova.compute.manager')
class TestDriver(driver.Scheduler):
@@ -54,6 +59,34 @@ class SchedulerTestCase(test.TestCase):
super(SchedulerTestCase, self).setUp()
self.flags(scheduler_driver='nova.tests.test_scheduler.TestDriver')
def _create_compute_service(self):
"""Create compute-manager(ComputeNode and Service record)."""
ctxt = context.get_admin_context()
dic = {'host': 'dummy', 'binary': 'nova-compute', 'topic': 'compute',
'report_count': 0, 'availability_zone': 'dummyzone'}
s_ref = db.service_create(ctxt, dic)
dic = {'service_id': s_ref['id'],
'vcpus': 16, 'memory_mb': 32, 'local_gb': 100,
'vcpus_used': 16, 'memory_mb_used': 32, 'local_gb_used': 10,
'hypervisor_type': 'qemu', 'hypervisor_version': 12003,
'cpu_info': ''}
db.compute_node_create(ctxt, dic)
return db.service_get(ctxt, s_ref['id'])
def _create_instance(self, **kwargs):
"""Create a test instance"""
ctxt = context.get_admin_context()
inst = {}
inst['user_id'] = 'admin'
inst['project_id'] = kwargs.get('project_id', 'fake')
inst['host'] = kwargs.get('host', 'dummy')
inst['vcpus'] = kwargs.get('vcpus', 1)
inst['memory_mb'] = kwargs.get('memory_mb', 10)
inst['local_gb'] = kwargs.get('local_gb', 20)
return db.instance_create(ctxt, inst)
def test_fallback(self):
scheduler = manager.SchedulerManager()
self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
@@ -76,6 +109,73 @@ class SchedulerTestCase(test.TestCase):
self.mox.ReplayAll()
scheduler.named_method(ctxt, 'topic', num=7)
def test_show_host_resources_host_not_exit(self):
"""A host given as an argument does not exists."""
scheduler = manager.SchedulerManager()
dest = 'dummydest'
ctxt = context.get_admin_context()
try:
scheduler.show_host_resources(ctxt, dest)
except exception.NotFound, e:
c1 = (e.message.find(_("does not exist or is not a "
"compute node.")) >= 0)
self.assertTrue(c1)
def _dic_is_equal(self, dic1, dic2, keys=None):
"""Compares 2 dictionary contents(Helper method)"""
if not keys:
keys = ['vcpus', 'memory_mb', 'local_gb',
'vcpus_used', 'memory_mb_used', 'local_gb_used']
for key in keys:
if not (dic1[key] == dic2[key]):
return False
return True
def test_show_host_resources_no_project(self):
"""No instance are running on the given host."""
scheduler = manager.SchedulerManager()
ctxt = context.get_admin_context()
s_ref = self._create_compute_service()
result = scheduler.show_host_resources(ctxt, s_ref['host'])
# result checking
c1 = ('resource' in result and 'usage' in result)
compute_node = s_ref['compute_node'][0]
c2 = self._dic_is_equal(result['resource'], compute_node)
c3 = result['usage'] == {}
self.assertTrue(c1 and c2 and c3)
db.service_destroy(ctxt, s_ref['id'])
def test_show_host_resources_works_correctly(self):
"""Show_host_resources() works correctly as expected."""
scheduler = manager.SchedulerManager()
ctxt = context.get_admin_context()
s_ref = self._create_compute_service()
i_ref1 = self._create_instance(project_id='p-01', host=s_ref['host'])
i_ref2 = self._create_instance(project_id='p-02', vcpus=3,
host=s_ref['host'])
result = scheduler.show_host_resources(ctxt, s_ref['host'])
c1 = ('resource' in result and 'usage' in result)
compute_node = s_ref['compute_node'][0]
c2 = self._dic_is_equal(result['resource'], compute_node)
c3 = result['usage'].keys() == ['p-01', 'p-02']
keys = ['vcpus', 'memory_mb', 'local_gb']
c4 = self._dic_is_equal(result['usage']['p-01'], i_ref1, keys)
c5 = self._dic_is_equal(result['usage']['p-02'], i_ref2, keys)
self.assertTrue(c1 and c2 and c3 and c4 and c5)
db.service_destroy(ctxt, s_ref['id'])
db.instance_destroy(ctxt, i_ref1['id'])
db.instance_destroy(ctxt, i_ref2['id'])
class ZoneSchedulerTestCase(test.TestCase):
"""Test case for zone scheduler"""
@@ -161,9 +261,15 @@ class SimpleDriverTestCase(test.TestCase):
inst['project_id'] = self.project.id
inst['instance_type'] = 'm1.tiny'
inst['mac_address'] = utils.generate_mac()
inst['vcpus'] = kwargs.get('vcpus', 1)
inst['ami_launch_index'] = 0
inst['vcpus'] = 1
inst['availability_zone'] = kwargs.get('availability_zone', None)
inst['host'] = kwargs.get('host', 'dummy')
inst['memory_mb'] = kwargs.get('memory_mb', 20)
inst['local_gb'] = kwargs.get('local_gb', 30)
inst['launched_on'] = kwargs.get('launghed_on', 'dummy')
inst['state_description'] = kwargs.get('state_description', 'running')
inst['state'] = kwargs.get('state', power_state.RUNNING)
return db.instance_create(self.context, inst)['id']
def _create_volume(self):
@@ -173,6 +279,211 @@ class SimpleDriverTestCase(test.TestCase):
vol['availability_zone'] = 'test'
return db.volume_create(self.context, vol)['id']
def _create_compute_service(self, **kwargs):
"""Create a compute service."""
dic = {'binary': 'nova-compute', 'topic': 'compute',
'report_count': 0, 'availability_zone': 'dummyzone'}
dic['host'] = kwargs.get('host', 'dummy')
s_ref = db.service_create(self.context, dic)
if 'created_at' in kwargs.keys() or 'updated_at' in kwargs.keys():
t = datetime.datetime.utcnow() - datetime.timedelta(0)
dic['created_at'] = kwargs.get('created_at', t)
dic['updated_at'] = kwargs.get('updated_at', t)
db.service_update(self.context, s_ref['id'], dic)
dic = {'service_id': s_ref['id'],
'vcpus': 16, 'memory_mb': 32, 'local_gb': 100,
'vcpus_used': 16, 'local_gb_used': 10,
'hypervisor_type': 'qemu', 'hypervisor_version': 12003,
'cpu_info': ''}
dic['memory_mb_used'] = kwargs.get('memory_mb_used', 32)
dic['hypervisor_type'] = kwargs.get('hypervisor_type', 'qemu')
dic['hypervisor_version'] = kwargs.get('hypervisor_version', 12003)
db.compute_node_create(self.context, dic)
return db.service_get(self.context, s_ref['id'])
def test_doesnt_report_disabled_hosts_as_up(self):
"""Ensures driver doesn't find hosts before they are enabled"""
# NOTE(vish): constructing service without create method
# because we are going to use it without queue
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.start()
s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute')
s2 = db.service_get_by_args(self.context, 'host2', 'nova-compute')
db.service_update(self.context, s1['id'], {'disabled': True})
db.service_update(self.context, s2['id'], {'disabled': True})
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(0, len(hosts))
compute1.kill()
compute2.kill()
def test_reports_enabled_hosts_as_up(self):
"""Ensures driver can find the hosts that are up"""
# NOTE(vish): constructing service without create method
# because we are going to use it without queue
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.start()
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(2, len(hosts))
compute1.kill()
compute2.kill()
def test_least_busy_host_gets_instance(self):
"""Ensures the host with less cores gets the next one"""
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.start()
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
host = self.scheduler.driver.schedule_run_instance(self.context,
instance_id2)
self.assertEqual(host, 'host2')
compute1.terminate_instance(self.context, instance_id1)
db.instance_destroy(self.context, instance_id2)
compute1.kill()
compute2.kill()
def test_specific_host_gets_instance(self):
"""Ensures if you set availability_zone it launches on that zone"""
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.start()
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance(availability_zone='nova:host1')
host = self.scheduler.driver.schedule_run_instance(self.context,
instance_id2)
self.assertEqual('host1', host)
compute1.terminate_instance(self.context, instance_id1)
db.instance_destroy(self.context, instance_id2)
compute1.kill()
compute2.kill()
def test_wont_sechedule_if_specified_host_is_down(self):
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.start()
s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute')
now = datetime.datetime.utcnow()
delta = datetime.timedelta(seconds=FLAGS.service_down_time * 2)
past = now - delta
db.service_update(self.context, s1['id'], {'updated_at': past})
instance_id2 = self._create_instance(availability_zone='nova:host1')
self.assertRaises(driver.WillNotSchedule,
self.scheduler.driver.schedule_run_instance,
self.context,
instance_id2)
db.instance_destroy(self.context, instance_id2)
compute1.kill()
def test_will_schedule_on_disabled_host_if_specified(self):
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.start()
s1 = db.service_get_by_args(self.context, 'host1', 'nova-compute')
db.service_update(self.context, s1['id'], {'disabled': True})
instance_id2 = self._create_instance(availability_zone='nova:host1')
host = self.scheduler.driver.schedule_run_instance(self.context,
instance_id2)
self.assertEqual('host1', host)
db.instance_destroy(self.context, instance_id2)
compute1.kill()
def test_too_many_cores(self):
"""Ensures we don't go over max cores"""
compute1 = service.Service('host1',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.start()
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
instance_id = self._create_instance()
compute1.run_instance(self.context, instance_id)
instance_ids1.append(instance_id)
instance_id = self._create_instance()
compute2.run_instance(self.context, instance_id)
instance_ids2.append(instance_id)
instance_id = self._create_instance()
self.assertRaises(driver.NoValidHost,
self.scheduler.driver.schedule_run_instance,
self.context,
instance_id)
for instance_id in instance_ids1:
compute1.terminate_instance(self.context, instance_id)
for instance_id in instance_ids2:
compute2.terminate_instance(self.context, instance_id)
compute1.kill()
compute2.kill()
def test_least_busy_host_gets_volume(self):
"""Ensures the host with less gigabytes gets the next one"""
volume1 = service.Service('host1',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume2.start()
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
host = self.scheduler.driver.schedule_create_volume(self.context,
volume_id2)
self.assertEqual(host, 'host2')
volume1.delete_volume(self.context, volume_id1)
db.volume_destroy(self.context, volume_id2)
dic = {'service_id': s_ref['id'],
'vcpus': 16, 'memory_mb': 32, 'local_gb': 100,
'vcpus_used': 16, 'memory_mb_used': 12, 'local_gb_used': 10,
'hypervisor_type': 'qemu', 'hypervisor_version': 12003,
'cpu_info': ''}
def test_doesnt_report_disabled_hosts_as_up(self):
"""Ensures driver doesn't find hosts before they are enabled"""
compute1 = self.start_service('compute', host='host1')
@@ -316,3 +627,313 @@ class SimpleDriverTestCase(test.TestCase):
volume2.delete_volume(self.context, volume_id)
volume1.kill()
volume2.kill()
def test_scheduler_live_migration_with_volume(self):
"""scheduler_live_migration() works correctly as expected.
Also, checks instance state is changed from 'running' -> 'migrating'.
"""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
dic = {'instance_id': instance_id, 'size': 1}
v_ref = db.volume_create(self.context, dic)
# cannot check 2nd argument b/c the addresses of instance object
# is different.
driver_i = self.scheduler.driver
nocare = mox.IgnoreArg()
self.mox.StubOutWithMock(driver_i, '_live_migration_src_check')
self.mox.StubOutWithMock(driver_i, '_live_migration_dest_check')
self.mox.StubOutWithMock(driver_i, '_live_migration_common_check')
driver_i._live_migration_src_check(nocare, nocare)
driver_i._live_migration_dest_check(nocare, nocare, i_ref['host'])
driver_i._live_migration_common_check(nocare, nocare, i_ref['host'])
self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
kwargs = {'instance_id': instance_id, 'dest': i_ref['host']}
rpc.cast(self.context,
db.queue_get_for(nocare, FLAGS.compute_topic, i_ref['host']),
{"method": 'live_migration', "args": kwargs})
self.mox.ReplayAll()
self.scheduler.live_migration(self.context, FLAGS.compute_topic,
instance_id=instance_id,
dest=i_ref['host'])
i_ref = db.instance_get(self.context, instance_id)
self.assertTrue(i_ref['state_description'] == 'migrating')
db.instance_destroy(self.context, instance_id)
db.volume_destroy(self.context, v_ref['id'])
def test_live_migration_src_check_instance_not_running(self):
"""The instance given by instance_id is not running."""
instance_id = self._create_instance(state_description='migrating')
i_ref = db.instance_get(self.context, instance_id)
try:
self.scheduler.driver._live_migration_src_check(self.context,
i_ref)
except exception.Invalid, e:
c = (e.message.find('is not running') > 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
def test_live_migration_src_check_volume_node_not_alive(self):
"""Raise exception when volume node is not alive."""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
dic = {'instance_id': instance_id, 'size': 1}
v_ref = db.volume_create(self.context, {'instance_id': instance_id,
'size': 1})
t1 = datetime.datetime.utcnow() - datetime.timedelta(1)
dic = {'created_at': t1, 'updated_at': t1, 'binary': 'nova-volume',
'topic': 'volume', 'report_count': 0}
s_ref = db.service_create(self.context, dic)
try:
self.scheduler.driver.schedule_live_migration(self.context,
instance_id,
i_ref['host'])
except exception.Invalid, e:
c = (e.message.find('volume node is not alive') >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.volume_destroy(self.context, v_ref['id'])
def test_live_migration_src_check_compute_node_not_alive(self):
"""Confirms src-compute node is alive."""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
t = datetime.datetime.utcnow() - datetime.timedelta(10)
s_ref = self._create_compute_service(created_at=t, updated_at=t,
host=i_ref['host'])
try:
self.scheduler.driver._live_migration_src_check(self.context,
i_ref)
except exception.Invalid, e:
c = (e.message.find('is not alive') >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_src_check_works_correctly(self):
"""Confirms this method finishes with no error."""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
s_ref = self._create_compute_service(host=i_ref['host'])
ret = self.scheduler.driver._live_migration_src_check(self.context,
i_ref)
self.assertTrue(ret == None)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_dest_check_not_alive(self):
"""Confirms exception raises in case dest host does not exist."""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
t = datetime.datetime.utcnow() - datetime.timedelta(10)
s_ref = self._create_compute_service(created_at=t, updated_at=t,
host=i_ref['host'])
try:
self.scheduler.driver._live_migration_dest_check(self.context,
i_ref,
i_ref['host'])
except exception.Invalid, e:
c = (e.message.find('is not alive') >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_dest_check_service_same_host(self):
"""Confirms exceptioin raises in case dest and src is same host."""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
s_ref = self._create_compute_service(host=i_ref['host'])
try:
self.scheduler.driver._live_migration_dest_check(self.context,
i_ref,
i_ref['host'])
except exception.Invalid, e:
c = (e.message.find('choose other host') >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_dest_check_service_lack_memory(self):
"""Confirms exception raises when dest doesn't have enough memory."""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
s_ref = self._create_compute_service(host='somewhere',
memory_mb_used=12)
try:
self.scheduler.driver._live_migration_dest_check(self.context,
i_ref,
'somewhere')
except exception.NotEmpty, e:
c = (e.message.find('Unable to migrate') >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_dest_check_service_works_correctly(self):
"""Confirms method finishes with no error."""
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
s_ref = self._create_compute_service(host='somewhere',
memory_mb_used=5)
ret = self.scheduler.driver._live_migration_dest_check(self.context,
i_ref,
'somewhere')
self.assertTrue(ret == None)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_common_check_service_orig_not_exists(self):
"""Destination host does not exist."""
dest = 'dummydest'
# mocks for live_migration_common_check()
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
t1 = datetime.datetime.utcnow() - datetime.timedelta(10)
s_ref = self._create_compute_service(created_at=t1, updated_at=t1,
host=dest)
# mocks for mounted_on_same_shared_storage()
fpath = '/test/20110127120000'
self.mox.StubOutWithMock(driver, 'rpc', use_mock_anything=True)
topic = FLAGS.compute_topic
driver.rpc.call(mox.IgnoreArg(),
db.queue_get_for(self.context, topic, dest),
{"method": 'create_shared_storage_test_file'}).AndReturn(fpath)
driver.rpc.call(mox.IgnoreArg(),
db.queue_get_for(mox.IgnoreArg(), topic, i_ref['host']),
{"method": 'check_shared_storage_test_file',
"args": {'filename': fpath}})
driver.rpc.call(mox.IgnoreArg(),
db.queue_get_for(mox.IgnoreArg(), topic, dest),
{"method": 'cleanup_shared_storage_test_file',
"args": {'filename': fpath}})
self.mox.ReplayAll()
try:
self.scheduler.driver._live_migration_common_check(self.context,
i_ref,
dest)
except exception.Invalid, e:
c = (e.message.find('does not exist') >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_common_check_service_different_hypervisor(self):
"""Original host and dest host has different hypervisor type."""
dest = 'dummydest'
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
# compute service for destination
s_ref = self._create_compute_service(host=i_ref['host'])
# compute service for original host
s_ref2 = self._create_compute_service(host=dest, hypervisor_type='xen')
# mocks
driver = self.scheduler.driver
self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage')
driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest)
self.mox.ReplayAll()
try:
self.scheduler.driver._live_migration_common_check(self.context,
i_ref,
dest)
except exception.Invalid, e:
c = (e.message.find(_('Different hypervisor type')) >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.service_destroy(self.context, s_ref2['id'])
def test_live_migration_common_check_service_different_version(self):
"""Original host and dest host has different hypervisor version."""
dest = 'dummydest'
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
# compute service for destination
s_ref = self._create_compute_service(host=i_ref['host'])
# compute service for original host
s_ref2 = self._create_compute_service(host=dest,
hypervisor_version=12002)
# mocks
driver = self.scheduler.driver
self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage')
driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest)
self.mox.ReplayAll()
try:
self.scheduler.driver._live_migration_common_check(self.context,
i_ref,
dest)
except exception.Invalid, e:
c = (e.message.find(_('Older hypervisor version')) >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.service_destroy(self.context, s_ref2['id'])
def test_live_migration_common_check_checking_cpuinfo_fail(self):
"""Raise excetion when original host doen't have compatible cpu."""
dest = 'dummydest'
instance_id = self._create_instance()
i_ref = db.instance_get(self.context, instance_id)
# compute service for destination
s_ref = self._create_compute_service(host=i_ref['host'])
# compute service for original host
s_ref2 = self._create_compute_service(host=dest)
# mocks
driver = self.scheduler.driver
self.mox.StubOutWithMock(driver, 'mounted_on_same_shared_storage')
driver.mounted_on_same_shared_storage(mox.IgnoreArg(), i_ref, dest)
self.mox.StubOutWithMock(rpc, 'call', use_mock_anything=True)
rpc.call(mox.IgnoreArg(), mox.IgnoreArg(),
{"method": 'compare_cpu',
"args": {'cpu_info': s_ref2['compute_node'][0]['cpu_info']}}).\
AndRaise(rpc.RemoteError("doesn't have compatibility to", "", ""))
self.mox.ReplayAll()
try:
self.scheduler.driver._live_migration_common_check(self.context,
i_ref,
dest)
except rpc.RemoteError, e:
c = (e.message.find(_("doesn't have compatibility to")) >= 0)
self.assertTrue(c)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
db.service_destroy(self.context, s_ref2['id'])
+41
View File
@@ -30,6 +30,7 @@ from nova import rpc
from nova import test
from nova import service
from nova import manager
from nova.compute import manager as compute_manager
FLAGS = flags.FLAGS
flags.DEFINE_string("fake_manager", "nova.tests.test_service.FakeManager",
@@ -251,3 +252,43 @@ class ServiceTestCase(test.TestCase):
serv.report_state()
self.assert_(not serv.model_disconnected)
def test_compute_can_update_available_resource(self):
"""Confirm compute updates their record of compute-service table."""
host = 'foo'
binary = 'nova-compute'
topic = 'compute'
# Any mocks are not working without UnsetStubs() here.
self.mox.UnsetStubs()
ctxt = context.get_admin_context()
service_ref = db.service_create(ctxt, {'host': host,
'binary': binary,
'topic': topic})
serv = service.Service(host,
binary,
topic,
'nova.compute.manager.ComputeManager')
# This testcase want to test calling update_available_resource.
# No need to call periodic call, then below variable must be set 0.
serv.report_interval = 0
serv.periodic_interval = 0
# Creating mocks
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)
# Just doing start()-stop(), not confirm new db record is created,
# because update_available_resource() works only in
# libvirt environment. This testcase confirms
# update_available_resource() is called. Otherwise, mox complains.
self.mox.ReplayAll()
serv.start()
serv.stop()
db.service_destroy(ctxt, service_ref['id'])
+225 -5
View File
@@ -14,22 +14,29 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
import os
import eventlet
import mox
import os
import re
import sys
from xml.etree.ElementTree import fromstring as xml_to_tree
from xml.dom.minidom import parseString as xml_to_dom
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import test
from nova import utils
from nova.api.ec2 import cloud
from nova.auth import manager
from nova.compute import manager as compute_manager
from nova.compute import power_state
from nova.db.sqlalchemy import models
from nova.virt import libvirt_conn
libvirt = None
FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
@@ -104,11 +111,28 @@ class LibvirtConnTestCase(test.TestCase):
libvirt_conn._late_load_cheetah()
self.flags(fake_call=True)
self.manager = manager.AuthManager()
try:
pjs = self.manager.get_projects()
pjs = [p for p in pjs if p.name == 'fake']
if 0 != len(pjs):
self.manager.delete_project(pjs[0])
users = self.manager.get_users()
users = [u for u in users if u.name == 'fake']
if 0 != len(users):
self.manager.delete_user(users[0])
except Exception, e:
pass
users = self.manager.get_users()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.network = utils.import_object(FLAGS.network_manager)
self.context = context.get_admin_context()
FLAGS.instances_path = ''
self.call_libvirt_dependant_setup = False
test_ip = '10.11.12.13'
test_instance = {'memory_kb': '1024000',
@@ -120,6 +144,58 @@ class LibvirtConnTestCase(test.TestCase):
'bridge': 'br101',
'instance_type': 'm1.small'}
def lazy_load_library_exists(self):
"""check if libvirt is available."""
# try to connect libvirt. if fail, skip test.
try:
import libvirt
import libxml2
except ImportError:
return False
global libvirt
libvirt = __import__('libvirt')
libvirt_conn.libvirt = __import__('libvirt')
libvirt_conn.libxml2 = __import__('libxml2')
return True
def create_fake_libvirt_mock(self, **kwargs):
"""Defining mocks for LibvirtConnection(libvirt is not used)."""
# A fake libvirt.virConnect
class FakeLibvirtConnection(object):
pass
# A fake libvirt_conn.IptablesFirewallDriver
class FakeIptablesFirewallDriver(object):
def __init__(self, **kwargs):
pass
def setattr(self, key, val):
self.__setattr__(key, val)
# Creating mocks
fake = FakeLibvirtConnection()
fakeip = FakeIptablesFirewallDriver
# Customizing above fake if necessary
for key, val in kwargs.items():
fake.__setattr__(key, val)
# Inevitable mocks for libvirt_conn.LibvirtConnection
self.mox.StubOutWithMock(libvirt_conn.utils, 'import_class')
libvirt_conn.utils.import_class(mox.IgnoreArg()).AndReturn(fakeip)
self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection, '_conn')
libvirt_conn.LibvirtConnection._conn = fake
def create_service(self, **kwargs):
service_ref = {'host': kwargs.get('host', 'dummy'),
'binary': 'nova-compute',
'topic': 'compute',
'report_count': 0,
'availability_zone': 'zone'}
return db.service_create(context.get_admin_context(), service_ref)
def test_xml_and_uri_no_ramdisk_no_kernel(self):
instance_data = dict(self.test_instance)
self._check_xml_and_uri(instance_data,
@@ -259,8 +335,8 @@ class LibvirtConnTestCase(test.TestCase):
expected_result,
'%s failed common check %d' % (xml, i))
# This test is supposed to make sure we don't override a specifically
# set uri
# This test is supposed to make sure we don't
# override a specifically set uri
#
# Deliberately not just assigning this string to FLAGS.libvirt_uri and
# checking against that later on. This way we make sure the
@@ -274,6 +350,150 @@ class LibvirtConnTestCase(test.TestCase):
self.assertEquals(uri, testuri)
db.instance_destroy(user_context, instance_ref['id'])
def test_update_available_resource_works_correctly(self):
"""Confirm compute_node table is updated successfully."""
org_path = FLAGS.instances_path = ''
FLAGS.instances_path = '.'
# Prepare mocks
def getVersion():
return 12003
def getType():
return 'qemu'
def listDomainsID():
return []
service_ref = self.create_service(host='dummy')
self.create_fake_libvirt_mock(getVersion=getVersion,
getType=getType,
listDomainsID=listDomainsID)
self.mox.StubOutWithMock(libvirt_conn.LibvirtConnection,
'get_cpu_info')
libvirt_conn.LibvirtConnection.get_cpu_info().AndReturn('cpuinfo')
# Start test
self.mox.ReplayAll()
conn = libvirt_conn.LibvirtConnection(False)
conn.update_available_resource(self.context, 'dummy')
service_ref = db.service_get(self.context, service_ref['id'])
compute_node = service_ref['compute_node'][0]
if sys.platform.upper() == 'LINUX2':
self.assertTrue(compute_node['vcpus'] >= 0)
self.assertTrue(compute_node['memory_mb'] > 0)
self.assertTrue(compute_node['local_gb'] > 0)
self.assertTrue(compute_node['vcpus_used'] == 0)
self.assertTrue(compute_node['memory_mb_used'] > 0)
self.assertTrue(compute_node['local_gb_used'] > 0)
self.assertTrue(len(compute_node['hypervisor_type']) > 0)
self.assertTrue(compute_node['hypervisor_version'] > 0)
else:
self.assertTrue(compute_node['vcpus'] >= 0)
self.assertTrue(compute_node['memory_mb'] == 0)
self.assertTrue(compute_node['local_gb'] > 0)
self.assertTrue(compute_node['vcpus_used'] == 0)
self.assertTrue(compute_node['memory_mb_used'] == 0)
self.assertTrue(compute_node['local_gb_used'] > 0)
self.assertTrue(len(compute_node['hypervisor_type']) > 0)
self.assertTrue(compute_node['hypervisor_version'] > 0)
db.service_destroy(self.context, service_ref['id'])
FLAGS.instances_path = org_path
def test_update_resource_info_no_compute_record_found(self):
"""Raise exception if no recorde found on services table."""
org_path = FLAGS.instances_path = ''
FLAGS.instances_path = '.'
self.create_fake_libvirt_mock()
self.mox.ReplayAll()
conn = libvirt_conn.LibvirtConnection(False)
self.assertRaises(exception.Invalid,
conn.update_available_resource,
self.context, 'dummy')
FLAGS.instances_path = org_path
def test_ensure_filtering_rules_for_instance_timeout(self):
"""ensure_filtering_fules_for_instance() finishes with timeout."""
# Skip if non-libvirt environment
if not self.lazy_load_library_exists():
return
# Preparing mocks
def fake_none(self):
return
def fake_raise(self):
raise libvirt.libvirtError('ERR')
self.create_fake_libvirt_mock(nwfilterLookupByName=fake_raise)
instance_ref = db.instance_create(self.context, self.test_instance)
# Start test
self.mox.ReplayAll()
try:
conn = libvirt_conn.LibvirtConnection(False)
conn.firewall_driver.setattr('setup_basic_filtering', fake_none)
conn.firewall_driver.setattr('prepare_instance_filter', fake_none)
conn.ensure_filtering_rules_for_instance(instance_ref)
except exception.Error, e:
c1 = (0 <= e.message.find('Timeout migrating for'))
self.assertTrue(c1)
db.instance_destroy(self.context, instance_ref['id'])
def test_live_migration_raises_exception(self):
"""Confirms recover method is called when exceptions are raised."""
# Skip if non-libvirt environment
if not self.lazy_load_library_exists():
return
# Preparing data
self.compute = utils.import_object(FLAGS.compute_manager)
instance_dict = {'host': 'fake', 'state': power_state.RUNNING,
'state_description': 'running'}
instance_ref = db.instance_create(self.context, self.test_instance)
instance_ref = db.instance_update(self.context, instance_ref['id'],
instance_dict)
vol_dict = {'status': 'migrating', 'size': 1}
volume_ref = db.volume_create(self.context, vol_dict)
db.volume_attached(self.context, volume_ref['id'], instance_ref['id'],
'/dev/fake')
# Preparing mocks
vdmock = self.mox.CreateMock(libvirt.virDomain)
self.mox.StubOutWithMock(vdmock, "migrateToURI")
vdmock.migrateToURI(FLAGS.live_migration_uri % 'dest',
mox.IgnoreArg(),
None, FLAGS.live_migration_bandwidth).\
AndRaise(libvirt.libvirtError('ERR'))
def fake_lookup(instance_name):
if instance_name == instance_ref.name:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
# Start test
self.mox.ReplayAll()
conn = libvirt_conn.LibvirtConnection(False)
self.assertRaises(libvirt.libvirtError,
conn._live_migration,
self.context, instance_ref, 'dest', '',
self.compute.recover_live_migration)
instance_ref = db.instance_get(self.context, instance_ref['id'])
self.assertTrue(instance_ref['state_description'] == 'running')
self.assertTrue(instance_ref['state'] == power_state.RUNNING)
volume_ref = db.volume_get(self.context, volume_ref['id'])
self.assertTrue(volume_ref['status'] == 'in-use')
db.volume_destroy(self.context, volume_ref['id'])
db.instance_destroy(self.context, instance_ref['id'])
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+195
View File
@@ -20,6 +20,8 @@ Tests for Volume Code.
"""
import cStringIO
from nova import context
from nova import exception
from nova import db
@@ -173,3 +175,196 @@ class VolumeTestCase(test.TestCase):
# each of them having a different FLAG for storage_node
# This will allow us to test cross-node interactions
pass
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""
driver_name = "nova.volume.driver.FakeAOEDriver"
def setUp(self):
super(DriverTestCase, self).setUp()
self.flags(volume_driver=self.driver_name,
logging_default_format_string="%(message)s")
self.volume = utils.import_object(FLAGS.volume_manager)
self.context = context.get_admin_context()
self.output = ""
def _fake_execute(_command, *_args, **_kwargs):
"""Fake _execute."""
return self.output, None
self.volume.driver._execute = _fake_execute
self.volume.driver._sync_execute = _fake_execute
log = logging.getLogger()
self.stream = cStringIO.StringIO()
log.addHandler(logging.StreamHandler(self.stream))
inst = {}
self.instance_id = db.instance_create(self.context, inst)['id']
def tearDown(self):
super(DriverTestCase, self).tearDown()
def _attach_volume(self):
"""Attach volumes to an instance. This function also sets
a fake log message."""
return []
def _detach_volume(self, volume_id_list):
"""Detach volumes from an instance."""
for volume_id in volume_id_list:
db.volume_detached(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
class AOETestCase(DriverTestCase):
"""Test Case for AOEDriver"""
driver_name = "nova.volume.driver.AOEDriver"
def setUp(self):
super(AOETestCase, self).setUp()
def tearDown(self):
super(AOETestCase, self).tearDown()
def _attach_volume(self):
"""Attach volumes to an instance. This function also sets
a fake log message."""
volume_id_list = []
for index in xrange(3):
vol = {}
vol['size'] = 0
volume_id = db.volume_create(self.context,
vol)['id']
self.volume.create_volume(self.context, volume_id)
# each volume has a different mountpoint
mountpoint = "/dev/sd" + chr((ord('b') + index))
db.volume_attached(self.context, volume_id, self.instance_id,
mountpoint)
(shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context,
volume_id)
self.output += "%s %s eth0 /dev/nova-volumes/vol-foo auto run\n" \
% (shelf_id, blade_id)
volume_id_list.append(volume_id)
return volume_id_list
def test_check_for_export_with_no_volume(self):
"""No log message when no volume is attached to an instance."""
self.stream.truncate(0)
self.volume.check_for_export(self.context, self.instance_id)
self.assertEqual(self.stream.getvalue(), '')
def test_check_for_export_with_all_vblade_processes(self):
"""No log message when all the vblade processes are running."""
volume_id_list = self._attach_volume()
self.stream.truncate(0)
self.volume.check_for_export(self.context, self.instance_id)
self.assertEqual(self.stream.getvalue(), '')
self._detach_volume(volume_id_list)
def test_check_for_export_with_vblade_process_missing(self):
"""Output a warning message when some vblade processes aren't
running."""
volume_id_list = self._attach_volume()
# the first vblade process isn't running
self.output = self.output.replace("run", "down", 1)
(shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context,
volume_id_list[0])
msg_is_match = False
self.stream.truncate(0)
try:
self.volume.check_for_export(self.context, self.instance_id)
except exception.ProcessExecutionError, e:
volume_id = volume_id_list[0]
msg = _("Cannot confirm exported volume id:%(volume_id)s. "
"vblade process for e%(shelf_id)s.%(blade_id)s "
"isn't running.") % locals()
msg_is_match = (0 <= e.message.find(msg))
self.assertTrue(msg_is_match)
self._detach_volume(volume_id_list)
class ISCSITestCase(DriverTestCase):
"""Test Case for ISCSIDriver"""
driver_name = "nova.volume.driver.ISCSIDriver"
def setUp(self):
super(ISCSITestCase, self).setUp()
def tearDown(self):
super(ISCSITestCase, self).tearDown()
def _attach_volume(self):
"""Attach volumes to an instance. This function also sets
a fake log message."""
volume_id_list = []
for index in xrange(3):
vol = {}
vol['size'] = 0
vol_ref = db.volume_create(self.context, vol)
self.volume.create_volume(self.context, vol_ref['id'])
vol_ref = db.volume_get(self.context, vol_ref['id'])
# each volume has a different mountpoint
mountpoint = "/dev/sd" + chr((ord('b') + index))
db.volume_attached(self.context, vol_ref['id'], self.instance_id,
mountpoint)
volume_id_list.append(vol_ref['id'])
return volume_id_list
def test_check_for_export_with_no_volume(self):
"""No log message when no volume is attached to an instance."""
self.stream.truncate(0)
self.volume.check_for_export(self.context, self.instance_id)
self.assertEqual(self.stream.getvalue(), '')
def test_check_for_export_with_all_volume_exported(self):
"""No log message when all the vblade processes are running."""
volume_id_list = self._attach_volume()
self.mox.StubOutWithMock(self.volume.driver, '_execute')
for i in volume_id_list:
tid = db.volume_get_iscsi_target_num(self.context, i)
self.volume.driver._execute("sudo ietadm --op show --tid=%(tid)d"
% locals())
self.stream.truncate(0)
self.mox.ReplayAll()
self.volume.check_for_export(self.context, self.instance_id)
self.assertEqual(self.stream.getvalue(), '')
self.mox.UnsetStubs()
self._detach_volume(volume_id_list)
def test_check_for_export_with_some_volume_missing(self):
"""Output a warning message when some volumes are not recognied
by ietd."""
volume_id_list = self._attach_volume()
# the first vblade process isn't running
tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0])
self.mox.StubOutWithMock(self.volume.driver, '_execute')
self.volume.driver._execute("sudo ietadm --op show --tid=%(tid)d"
% locals()).AndRaise(exception.ProcessExecutionError())
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
self.volume.check_for_export,
self.context,
self.instance_id)
msg = _("Cannot confirm exported volume id:%s.") % volume_id_list[0]
self.assertTrue(0 <= self.stream.getvalue().find(msg))
self.mox.UnsetStubs()
self._detach_volume(volume_id_list)
+9
View File
@@ -0,0 +1,9 @@
<cpu>
<arch>$arch</arch>
<model>$model</model>
<vendor>$vendor</vendor>
<topology sockets="$topology.sockets" cores="$topology.cores" threads="$topology.threads"/>
#for $var in $features
<features name="$var" />
#end for
</cpu>
+21
View File
@@ -407,6 +407,27 @@ class FakeConnection(object):
"""
return True
def update_available_resource(self, ctxt, host):
"""This method is supported only by libvirt."""
return
def compare_cpu(self, xml):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
def ensure_filtering_rules_for_instance(self, instance_ref):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
def live_migration(self, context, instance_ref, dest,
post_method, recover_method):
"""This method is supported only by libvirt."""
return
def unfilter_instance(self, instance_ref):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
class FakeInstance(object):
+369
View File
@@ -36,10 +36,13 @@ Supports KVM, QEMU, UML, and XEN.
"""
import multiprocessing
import os
import shutil
import sys
import random
import subprocess
import time
import uuid
from xml.dom import minidom
@@ -69,6 +72,7 @@ Template = None
LOG = logging.getLogger('nova.virt.libvirt_conn')
FLAGS = flags.FLAGS
flags.DECLARE('live_migration_retry_count', 'nova.compute.manager')
# TODO(vish): These flags should probably go into a shared location
flags.DEFINE_string('rescue_image_id', 'ami-rescue', 'Rescue ami image')
flags.DEFINE_string('rescue_kernel_id', 'aki-rescue', 'Rescue aki image')
@@ -99,6 +103,17 @@ flags.DEFINE_string('ajaxterm_portrange',
flags.DEFINE_string('firewall_driver',
'nova.virt.libvirt_conn.IptablesFirewallDriver',
'Firewall driver (defaults to iptables)')
flags.DEFINE_string('cpuinfo_xml_template',
utils.abspath('virt/cpuinfo.xml.template'),
'CpuInfo XML Template (Used only live migration now)')
flags.DEFINE_string('live_migration_uri',
"qemu+tcp://%s/system",
'Define protocol used by live_migration feature')
flags.DEFINE_string('live_migration_flag',
"VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER",
'Define live migration behavior.')
flags.DEFINE_integer('live_migration_bandwidth', 0,
'Define live migration behavior')
def get_connection(read_only):
@@ -145,6 +160,7 @@ class LibvirtConnection(object):
self.libvirt_uri = self.get_uri()
self.libvirt_xml = open(FLAGS.libvirt_xml_template).read()
self.cpuinfo_xml = open(FLAGS.cpuinfo_xml_template).read()
self._wrapped_conn = None
self.read_only = read_only
@@ -850,6 +866,158 @@ class LibvirtConnection(object):
return interfaces
def get_vcpu_total(self):
"""Get vcpu number of physical computer.
:returns: the number of cpu core.
"""
# On certain platforms, this will raise a NotImplementedError.
try:
return multiprocessing.cpu_count()
except NotImplementedError:
LOG.warn(_("Cannot get the number of cpu, because this "
"function is not implemented for this platform. "
"This error can be safely ignored for now."))
return 0
def get_memory_mb_total(self):
"""Get the total memory size(MB) of physical computer.
:returns: the total amount of memory(MB).
"""
if sys.platform.upper() != 'LINUX2':
return 0
meminfo = open('/proc/meminfo').read().split()
idx = meminfo.index('MemTotal:')
# transforming kb to mb.
return int(meminfo[idx + 1]) / 1024
def get_local_gb_total(self):
"""Get the total hdd size(GB) of physical computer.
:returns:
The total amount of HDD(GB).
Note that this value shows a partition where
NOVA-INST-DIR/instances mounts.
"""
hddinfo = os.statvfs(FLAGS.instances_path)
return hddinfo.f_frsize * hddinfo.f_blocks / 1024 / 1024 / 1024
def get_vcpu_used(self):
""" Get vcpu usage number of physical computer.
:returns: The total number of vcpu that currently used.
"""
total = 0
for dom_id in self._conn.listDomainsID():
dom = self._conn.lookupByID(dom_id)
total += len(dom.vcpus()[1])
return total
def get_memory_mb_used(self):
"""Get the free memory size(MB) of physical computer.
:returns: the total usage of memory(MB).
"""
if sys.platform.upper() != 'LINUX2':
return 0
m = open('/proc/meminfo').read().split()
idx1 = m.index('MemFree:')
idx2 = m.index('Buffers:')
idx3 = m.index('Cached:')
avail = (int(m[idx1 + 1]) + int(m[idx2 + 1]) + int(m[idx3 + 1])) / 1024
return self.get_memory_mb_total() - avail
def get_local_gb_used(self):
"""Get the free hdd size(GB) of physical computer.
:returns:
The total usage of HDD(GB).
Note that this value shows a partition where
NOVA-INST-DIR/instances mounts.
"""
hddinfo = os.statvfs(FLAGS.instances_path)
avail = hddinfo.f_frsize * hddinfo.f_bavail / 1024 / 1024 / 1024
return self.get_local_gb_total() - avail
def get_hypervisor_type(self):
"""Get hypervisor type.
:returns: hypervisor type (ex. qemu)
"""
return self._conn.getType()
def get_hypervisor_version(self):
"""Get hypervisor version.
:returns: hypervisor version (ex. 12003)
"""
return self._conn.getVersion()
def get_cpu_info(self):
"""Get cpuinfo information.
Obtains cpu feature from virConnect.getCapabilities,
and returns as a json string.
:return: see above description
"""
xml = self._conn.getCapabilities()
xml = libxml2.parseDoc(xml)
nodes = xml.xpathEval('//cpu')
if len(nodes) != 1:
raise exception.Invalid(_("Invalid xml. '<cpu>' must be 1,"
"but %d\n") % len(nodes)
+ xml.serialize())
cpu_info = dict()
cpu_info['arch'] = xml.xpathEval('//cpu/arch')[0].getContent()
cpu_info['model'] = xml.xpathEval('//cpu/model')[0].getContent()
cpu_info['vendor'] = xml.xpathEval('//cpu/vendor')[0].getContent()
topology_node = xml.xpathEval('//cpu/topology')[0].get_properties()
topology = dict()
while topology_node != None:
name = topology_node.get_name()
topology[name] = topology_node.getContent()
topology_node = topology_node.get_next()
keys = ['cores', 'sockets', 'threads']
tkeys = topology.keys()
if list(set(tkeys)) != list(set(keys)):
ks = ', '.join(keys)
raise exception.Invalid(_("Invalid xml: topology(%(topology)s) "
"must have %(ks)s") % locals())
feature_nodes = xml.xpathEval('//cpu/feature')
features = list()
for nodes in feature_nodes:
features.append(nodes.get_properties().getContent())
cpu_info['topology'] = topology
cpu_info['features'] = features
return utils.dumps(cpu_info)
def block_stats(self, instance_name, disk):
"""
Note that this function takes an instance name, not an Instance, so
@@ -880,6 +1048,207 @@ class LibvirtConnection(object):
def refresh_security_group_members(self, security_group_id):
self.firewall_driver.refresh_security_group_members(security_group_id)
def update_available_resource(self, ctxt, host):
"""Updates compute manager resource info on ComputeNode table.
This method is called when nova-coompute launches, and
whenever admin executes "nova-manage service update_resource".
:param ctxt: security context
:param host: hostname that compute manager is currently running
"""
try:
service_ref = db.service_get_all_compute_by_host(ctxt, host)[0]
except exception.NotFound:
raise exception.Invalid(_("Cannot update compute manager "
"specific info, because no service "
"record was found."))
# Updating host information
dic = {'vcpus': self.get_vcpu_total(),
'memory_mb': self.get_memory_mb_total(),
'local_gb': self.get_local_gb_total(),
'vcpus_used': self.get_vcpu_used(),
'memory_mb_used': self.get_memory_mb_used(),
'local_gb_used': self.get_local_gb_used(),
'hypervisor_type': self.get_hypervisor_type(),
'hypervisor_version': self.get_hypervisor_version(),
'cpu_info': self.get_cpu_info()}
compute_node_ref = service_ref['compute_node']
if not compute_node_ref:
LOG.info(_('Compute_service record created for %s ') % host)
dic['service_id'] = service_ref['id']
db.compute_node_create(ctxt, dic)
else:
LOG.info(_('Compute_service record updated for %s ') % host)
db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
def compare_cpu(self, cpu_info):
"""Checks the host cpu is compatible to a cpu given by xml.
"xml" must be a part of libvirt.openReadonly().getCapabilities().
return values follows by virCPUCompareResult.
if 0 > return value, do live migration.
'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult'
:param cpu_info: json string that shows cpu feature(see get_cpu_info())
:returns:
None. if given cpu info is not compatible to this server,
raise exception.
"""
LOG.info(_('Instance launched has CPU info:\n%s') % cpu_info)
dic = utils.loads(cpu_info)
xml = str(Template(self.cpuinfo_xml, searchList=dic))
LOG.info(_('to xml...\n:%s ' % xml))
u = "http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult"
m = _("CPU doesn't have compatibility.\n\n%(ret)s\n\nRefer to %(u)s")
# unknown character exists in xml, then libvirt complains
try:
ret = self._conn.compareCPU(xml, 0)
except libvirt.libvirtError, e:
ret = e.message
LOG.error(m % locals())
raise
if ret <= 0:
raise exception.Invalid(m % locals())
return
def ensure_filtering_rules_for_instance(self, instance_ref):
"""Setting up filtering rules and waiting for its completion.
To migrate an instance, filtering rules to hypervisors
and firewalls are inevitable on destination host.
( Waiting only for filterling rules to hypervisor,
since filtering rules to firewall rules can be set faster).
Concretely, the below method must be called.
- setup_basic_filtering (for nova-basic, etc.)
- prepare_instance_filter(for nova-instance-instance-xxx, etc.)
to_xml may have to be called since it defines PROJNET, PROJMASK.
but libvirt migrates those value through migrateToURI(),
so , no need to be called.
Don't use thread for this method since migration should
not be started when setting-up filtering rules operations
are not completed.
:params instance_ref: nova.db.sqlalchemy.models.Instance object
"""
# If any instances never launch at destination host,
# basic-filtering must be set here.
self.firewall_driver.setup_basic_filtering(instance_ref)
# setting up n)ova-instance-instance-xx mainly.
self.firewall_driver.prepare_instance_filter(instance_ref)
# wait for completion
timeout_count = range(FLAGS.live_migration_retry_count)
while timeout_count:
try:
filter_name = 'nova-instance-%s' % instance_ref.name
self._conn.nwfilterLookupByName(filter_name)
break
except libvirt.libvirtError:
timeout_count.pop()
if len(timeout_count) == 0:
ec2_id = instance_ref['hostname']
iname = instance_ref.name
msg = _('Timeout migrating for %(ec2_id)s(%(iname)s)')
raise exception.Error(msg % locals())
time.sleep(1)
def live_migration(self, ctxt, instance_ref, dest,
post_method, recover_method):
"""Spawning live_migration operation for distributing high-load.
:params ctxt: security context
:params instance_ref:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:params dest: destination host
:params post_method:
post operation method.
expected nova.compute.manager.post_live_migration.
:params recover_method:
recovery method when any exception occurs.
expected nova.compute.manager.recover_live_migration.
"""
greenthread.spawn(self._live_migration, ctxt, instance_ref, dest,
post_method, recover_method)
def _live_migration(self, ctxt, instance_ref, dest,
post_method, recover_method):
"""Do live migration.
:params ctxt: security context
:params instance_ref:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:params dest: destination host
:params post_method:
post operation method.
expected nova.compute.manager.post_live_migration.
:params recover_method:
recovery method when any exception occurs.
expected nova.compute.manager.recover_live_migration.
"""
# Do live migration.
try:
flaglist = FLAGS.live_migration_flag.split(',')
flagvals = [getattr(libvirt, x.strip()) for x in flaglist]
logical_sum = reduce(lambda x, y: x | y, flagvals)
if self.read_only:
tmpconn = self._connect(self.libvirt_uri, False)
dom = tmpconn.lookupByName(instance_ref.name)
dom.migrateToURI(FLAGS.live_migration_uri % dest,
logical_sum,
None,
FLAGS.live_migration_bandwidth)
tmpconn.close()
else:
dom = self._conn.lookupByName(instance_ref.name)
dom.migrateToURI(FLAGS.live_migration_uri % dest,
logical_sum,
None,
FLAGS.live_migration_bandwidth)
except Exception:
recover_method(ctxt, instance_ref)
raise
# Waiting for completion of live_migration.
timer = utils.LoopingCall(f=None)
def wait_for_live_migration():
"""waiting for live migration completion"""
try:
self.get_info(instance_ref.name)['state']
except exception.NotFound:
timer.stop()
post_method(ctxt, instance_ref, dest)
timer.f = wait_for_live_migration
timer.start(interval=0.5, now=True)
def unfilter_instance(self, instance_ref):
"""See comments of same method in firewall_driver."""
self.firewall_driver.unfilter_instance(instance_ref)
class FirewallDriver(object):
def prepare_instance_filter(self, instance):
+16 -2
View File
@@ -20,6 +20,7 @@ their attributes like VDIs, VIFs, as well as their lookup functions.
"""
import os
import sys
import pickle
import re
import time
@@ -428,6 +429,8 @@ class VMHelper(HelperBase):
% locals())
return vdi_uuid
except BaseException as e:
LOG.exception(_("instance %s: Failed to fetch glance image"),
instance_id, exc_info=sys.exc_info())
try:
vdi_uuid = session.get_xenapi().VDI.get_uuid(vdi)
e.args = e.args + ({image_type: vdi_uuid},)
@@ -490,12 +493,23 @@ class VMHelper(HelperBase):
else:
return session.get_xenapi().VDI.get_uuid(vdi_ref)
except BaseException as e:
LOG.exception(_("instance %s: Failed to fetch glance image"),
instance_id, exc_info=sys.exc_info())
if vdi_ref:
try:
vdi_uuid = session.get_xenapi().VDI.get_uuid(vdi_ref)
e.args = e.args + ({image_type: vdi_uuid},)
except:
pass # ignore failures in retrieving VDI
if filename:
try:
splits = filename.split("/")
if len(splits) > 0:
vdi_uuid = splits[len(splits) - 1]
e.args = e.args + ({image_type: vdi_uuid},)
except:
pass # ignore errors parsing file name
raise e
@classmethod
@@ -1022,8 +1036,8 @@ def _write_partition(virtual_size, dev):
def execute(*cmd, **kwargs):
return utils.execute(*cmd, **kwargs)
execute('parted', '--script', dest, 'mklabel', 'msdos')
execute('parted', '--script', dest, 'mkpart', 'primary',
execute('sudo', 'parted', '--script', dest, 'mklabel', 'msdos')
execute('sudo', 'parted', '--script', dest, 'mkpart', 'primary',
'%ds' % primary_first,
'%ds' % primary_last)
+12 -5
View File
@@ -93,7 +93,10 @@ class VMOps(object):
instance.id, exc_info=sys.exc_info())
LOG.debug(_('Instance %s failed to spawn - performing clean-up'),
instance.id)
vdis = {}
vdis = {
ImageType.KERNEL: None,
ImageType.RAMDISK: None,
}
if vdi_uuid:
vdis[disk_image_type] = vdi_uuid
#extract VDI uuid from spawn error
@@ -121,7 +124,8 @@ class VMOps(object):
if remove_from_dom0:
LOG.debug(_("Removing kernel/ramdisk files from dom0"))
self._destroy_kernel_ramdisk_plugin_call(
vdis[ImageType.KERNEL], vdis[ImageType.RAMDISK])
vdis[ImageType.KERNEL], vdis[ImageType.RAMDISK],
False)
#re-throw the error
raise spawn_error
@@ -540,12 +544,15 @@ class VMOps(object):
except self.XenAPI.Failure, exc:
LOG.exception(exc)
def _destroy_kernel_ramdisk_plugin_call(self, kernel, ramdisk):
def _destroy_kernel_ramdisk_plugin_call(self, kernel, ramdisk,
filenames=True):
args = {}
kernel_arg_name = "kernel-" + (filenames and "file" or "uuid")
ramdisk_arg_name = "ramdisk-" + (filenames and "file" or "uuid")
if kernel:
args['kernel-uuid'] = kernel
args[kernel_arg_name] = kernel
if ramdisk:
args['ramdisk-uuid'] = ramdisk
args[ramdisk_arg_name] = ramdisk
task = self._session.async_call_plugin(
'glance', 'remove_kernel_ramdisk', args)
self._session.wait_for_task(task)
+25 -5
View File
@@ -269,6 +269,27 @@ class XenAPIConnection(object):
'username': FLAGS.xenapi_connection_username,
'password': FLAGS.xenapi_connection_password}
def update_available_resource(self, ctxt, host):
"""This method is supported only by libvirt."""
return
def compare_cpu(self, xml):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
def ensure_filtering_rules_for_instance(self, instance_ref):
"""This method is supported only libvirt."""
return
def live_migration(self, context, instance_ref, dest,
post_method, recover_method):
"""This method is supported only by libvirt."""
return
def unfilter_instance(self, instance_ref):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls"""
@@ -339,11 +360,10 @@ class XenAPISession(object):
try:
name = self._session.xenapi.task.get_name_label(task)
status = self._session.xenapi.task.get_status(task)
if id:
action = dict(
instance_id=int(id),
action=name[0:255], # Ensure action is never > 255
error=None)
action = dict(
action=name[0:255], # Ensure action is never > 255
instance_id=id and int(id) or None,
error=None)
if status == "pending":
return
elif status == "success":
+52 -4
View File
@@ -143,6 +143,10 @@ class VolumeDriver(object):
"""Undiscover volume on a remote host."""
raise NotImplementedError()
def check_for_export(self, context, volume_id):
"""Make sure volume is exported."""
raise NotImplementedError()
class AOEDriver(VolumeDriver):
"""Implements AOE specific volume commands."""
@@ -198,15 +202,45 @@ class AOEDriver(VolumeDriver):
self._try_execute('sudo', 'vblade-persist', 'destroy',
shelf_id, blade_id)
def discover_volume(self, _volume):
def discover_volume(self, context, _volume):
"""Discover volume on a remote host."""
self._execute('sudo', 'aoe-discover')
self._execute('sudo', 'aoe-stat', check_exit_code=False)
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
_volume['id'])
self._execute("sudo aoe-discover")
out, err = self._execute("sudo aoe-stat", check_exit_code=False)
device_path = 'e%(shelf_id)d.%(blade_id)d' % locals()
if out.find(device_path) >= 0:
return "/dev/etherd/%s" % device_path
else:
return
def undiscover_volume(self, _volume):
"""Undiscover volume on a remote host."""
pass
def check_for_export(self, context, volume_id):
"""Make sure volume is exported."""
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
volume_id)
cmd = "sudo vblade-persist ls --no-header"
out, _err = self._execute(cmd)
exported = False
for line in out.split('\n'):
param = line.split(' ')
if len(param) == 6 and param[0] == str(shelf_id) \
and param[1] == str(blade_id) and param[-1] == "run":
exported = True
break
if not exported:
# Instance will be terminated in this case.
desc = _("Cannot confirm exported volume id:%(volume_id)s. "
"vblade process for e%(shelf_id)s.%(blade_id)s "
"isn't running.") % locals()
raise exception.ProcessExecutionError(out, _err, cmd=cmd,
description=desc)
class FakeAOEDriver(AOEDriver):
"""Logs calls instead of executing."""
@@ -402,7 +436,7 @@ class ISCSIDriver(VolumeDriver):
(property_key, property_value))
return self._run_iscsiadm(iscsi_properties, iscsi_command)
def discover_volume(self, volume):
def discover_volume(self, context, volume):
"""Discover volume on a remote host."""
iscsi_properties = self._get_iscsi_properties(volume)
@@ -461,6 +495,20 @@ class ISCSIDriver(VolumeDriver):
self._run_iscsiadm(iscsi_properties, "--logout")
self._run_iscsiadm(iscsi_properties, "--op delete")
def check_for_export(self, context, volume_id):
"""Make sure volume is exported."""
tid = self.db.volume_get_iscsi_target_num(context, volume_id)
try:
self._execute("sudo ietadm --op show --tid=%(tid)d" % locals())
except exception.ProcessExecutionError, e:
# Instances remount read-only in this case.
# /etc/init.d/iscsitarget restart and rebooting nova-volume
# is better since ensure_export() works at boot time.
logging.error(_("Cannot confirm exported volume "
"id:%(volume_id)s.") % locals())
raise
class FakeISCSIDriver(ISCSIDriver):
"""Logs calls instead of executing."""
+7 -1
View File
@@ -160,7 +160,7 @@ class VolumeManager(manager.Manager):
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
path = self.driver.local_path(volume_ref)
else:
path = self.driver.discover_volume(volume_ref)
path = self.driver.discover_volume(context, volume_ref)
return path
def remove_compute_volume(self, context, volume_id):
@@ -171,3 +171,9 @@ class VolumeManager(manager.Manager):
return True
else:
self.driver.undiscover_volume(volume_ref)
def check_for_export(self, context, instance_id):
"""Make sure whether volume is exported."""
instance_ref = self.db.instance_get(context, instance_id)
for volume in instance_ref['volumes']:
self.driver.check_for_export(context, volume['id'])
@@ -216,7 +216,7 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port, os_type):
'x-image-meta-status': 'queued',
'x-image-meta-disk-format': 'vhd',
'x-image-meta-container-format': 'ovf',
'x-image-meta-property-os-type': os_type
'x-image-meta-property-os-type': os_type,
}
for header, value in headers.iteritems():