Add support for EBS volumes to the live migration feature.
Currently, only AoE is supported.
This commit is contained in:
@@ -26,6 +26,7 @@ Kei Masumoto <masumotok@nttdata.co.jp>
|
||||
Matt Dietz <matt.dietz@rackspace.com>
|
||||
Michael Gundlach <michael.gundlach@rackspace.com>
|
||||
Monty Taylor <mordred@inaugust.com>
|
||||
Muneyuki Noguchi <noguchimn@nttdata.co.jp>
|
||||
Paul Voccio <paul@openstack.org>
|
||||
Rick Clark <rick@openstack.org>
|
||||
Rick Harris <rconradharris@gmail.com>
|
||||
|
||||
+5
-4
@@ -83,8 +83,6 @@ from nova import rpc
|
||||
from nova.cloudpipe import pipelib
|
||||
from nova.api.ec2 import cloud
|
||||
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DECLARE('fixed_range', 'nova.network.manager')
|
||||
flags.DECLARE('num_networks', 'nova.network.manager')
|
||||
@@ -462,6 +460,10 @@ class InstanceCommands(object):
|
||||
def live_migration(self, ec2_id, dest):
|
||||
"""live_migration"""
|
||||
|
||||
if FLAGS.volume_driver != 'nova.volume.driver.AOEDriver':
|
||||
raise exception.Error('Only AOEDriver is supported for now. '
|
||||
'Sorry.')
|
||||
|
||||
logging.basicConfig()
|
||||
ctxt = context.get_admin_context()
|
||||
|
||||
@@ -491,7 +493,6 @@ class InstanceCommands(object):
|
||||
class HostCommands(object):
|
||||
"""Class for mangaging host(physical nodes)."""
|
||||
|
||||
|
||||
def list(self):
|
||||
"""describe host list."""
|
||||
|
||||
@@ -502,7 +503,6 @@ class HostCommands(object):
|
||||
for host_ref in host_refs:
|
||||
print host_ref['name']
|
||||
|
||||
|
||||
def show(self, host):
|
||||
"""describe cpu/memory/hdd info for host."""
|
||||
|
||||
@@ -546,6 +546,7 @@ CATEGORIES = [
|
||||
('instance', InstanceCommands),
|
||||
('host', HostCommands)]
|
||||
|
||||
|
||||
def lazy_match(name, key_value_tuples):
|
||||
"""Finds all objects that have a key that case insensitively contains
|
||||
[name] key_value_tuples is a list of tuples of the form (key, value)
|
||||
|
||||
+2
-11
@@ -679,22 +679,13 @@ class CloudController(object):
|
||||
ec2_id = None
|
||||
if (floating_ip_ref['fixed_ip']
|
||||
and floating_ip_ref['fixed_ip']['instance']):
|
||||
# modified by masumotok
|
||||
internal_id = \
|
||||
floating_ip_ref['fixed_ip']['instance']['internal_id']
|
||||
internal_id = floating_ip_ref['fixed_ip']['instance']
|
||||
ec2_id = internal_id_to_ec2_id(internal_id)
|
||||
address_rv = {'public_ip': address,
|
||||
'instance_id': ec2_id}
|
||||
if context.user.is_admin():
|
||||
# modified by masumotok- b/c proj_id is never inserted
|
||||
#details = "%s (%s)" % (address_rv['instance_id'],
|
||||
# floating_ip_ref['project_id'])
|
||||
if None != address_rv['instance_id']:
|
||||
status = 'reserved'
|
||||
else:
|
||||
status = None
|
||||
details = "%s (%s)" % (address_rv['instance_id'],
|
||||
status)
|
||||
floating_ip_ref['project_id'])
|
||||
address_rv['instance_id'] = details
|
||||
addresses.append(address_rv)
|
||||
return {'addressesSet': addresses}
|
||||
|
||||
+2
-1
@@ -233,7 +233,8 @@ class ComputeAPI(base.Base):
|
||||
terminated_at=datetime.datetime.utcnow())
|
||||
|
||||
host = instance['host']
|
||||
logging.error('terminate %s %s %s %s',context, FLAGS.compute_topic, host, self.db.queue_get_for(context, FLAGS.compute_topic, host))
|
||||
logging.error('terminate %s %s %s %s', context, FLAGS.compute_topic,
|
||||
host, self.db.queue_get_for(context, FLAGS.compute_topic, host))
|
||||
if host:
|
||||
rpc.cast(context,
|
||||
self.db.queue_get_for(context, FLAGS.compute_topic, host),
|
||||
|
||||
+31
-22
@@ -122,7 +122,7 @@ class ComputeManager(manager.Manager):
|
||||
raise exception.Error(_("Instance has already been created"))
|
||||
self.db.instance_update(context,
|
||||
instance_id,
|
||||
{'host': self.host, 'launched_on':self.host})
|
||||
{'host': self.host, 'launched_on': self.host})
|
||||
|
||||
self.db.instance_set_state(context,
|
||||
instance_id,
|
||||
@@ -443,21 +443,10 @@ class ComputeManager(manager.Manager):
|
||||
def pre_live_migration(self, context, instance_id, dest):
|
||||
"""Any preparation for live migration at dst host."""
|
||||
|
||||
# Getting volume info ( shlf/slot number )
|
||||
# Getting volume info
|
||||
instance_ref = db.instance_get(context, instance_id)
|
||||
ec2_id = instance_ref['hostname']
|
||||
|
||||
volumes = []
|
||||
try:
|
||||
volumes = db.volume_get_by_ec2_id(context, ec2_id)
|
||||
except exception.NotFound:
|
||||
logging.info(_('%s has no volume.'), ec2_id)
|
||||
|
||||
shelf_slots = {}
|
||||
for vol in volumes:
|
||||
shelf, slot = db.volume_get_shelf_and_blade(context, vol['id'])
|
||||
shelf_slots[vol.id] = (shelf, slot)
|
||||
|
||||
# Getting fixed ips
|
||||
fixed_ip = db.instance_get_fixed_address(context, instance_id)
|
||||
if None == fixed_ip:
|
||||
@@ -466,18 +455,22 @@ class ComputeManager(manager.Manager):
|
||||
tb = ''.join(traceback.format_tb(sys.exc_info()[2]))
|
||||
raise rpc.RemoteError(exc_type, val, tb)
|
||||
|
||||
# If any volume is mounted, prepare here.
|
||||
if 0 != len(shelf_slots):
|
||||
pass
|
||||
# if any volume is mounted, prepare here.
|
||||
try:
|
||||
for vol in db.volume_get_all_by_instance(context, instance_id):
|
||||
self.volume_manager.setup_compute_volume(context, vol['id'])
|
||||
except exception.NotFound:
|
||||
logging.info(_("%s has no volume.") % ec2_id)
|
||||
|
||||
# Creating nova-instance-instance-xxx, this is written to libvirt.xml,
|
||||
# and can be seen when executin "virsh nwfiter-list" On destination host,
|
||||
# this nwfilter is necessary.
|
||||
# In addition this method is creating security rule ingress rule onto
|
||||
# destination host.
|
||||
# Creating nova-instance-instance-xxx,
|
||||
# this is written to libvirt.xml,
|
||||
# and can be seen when executin "virsh nwfiter-list"
|
||||
# On destination host, this nwfilter is necessary.
|
||||
# In addition this method is creating security rule ingress rule
|
||||
# onto destination host.
|
||||
self.driver.setup_nwfilters_for_instance(instance_ref)
|
||||
|
||||
# 5. bridge settings
|
||||
# bridge settings
|
||||
self.network_manager.setup_compute_network(context, instance_id)
|
||||
return True
|
||||
|
||||
@@ -497,12 +490,23 @@ class ComputeManager(manager.Manager):
|
||||
"args": {'instance_id': instance_id,
|
||||
'dest': dest}})
|
||||
|
||||
instance_ref = db.instance_get(context, instance_id)
|
||||
ec2_id = instance_ref['hostname']
|
||||
if True != ret:
|
||||
logging.error(_('Pre live migration failed(err at %s)'), dest)
|
||||
db.instance_set_state(context,
|
||||
instance_id,
|
||||
power_state.RUNNING,
|
||||
'running')
|
||||
|
||||
try:
|
||||
for vol in db.volume_get_all_by_instance(context, instance_id):
|
||||
db.volume_update(context,
|
||||
vol['id'],
|
||||
{'status': 'in-use'})
|
||||
except exception.NotFound:
|
||||
pass
|
||||
|
||||
return
|
||||
|
||||
# Waiting for setting up nwfilter such as, nova-instance-instance-xxx.
|
||||
@@ -523,6 +527,11 @@ class ComputeManager(manager.Manager):
|
||||
logging.error(_('Timeout for pre_live_migration at %s'), dest)
|
||||
return
|
||||
|
||||
rpc.call(context,
|
||||
FLAGS.volume_topic,
|
||||
{"method": "check_for_export",
|
||||
"args": {'instance_id': instance_id}})
|
||||
|
||||
# Executing live migration
|
||||
# live_migration might raises ProcessExecution error, but
|
||||
# nothing must be recovered in this version.
|
||||
|
||||
@@ -711,6 +711,11 @@ def volume_get_by_ec2_id(context, ec2_id):
|
||||
return IMPL.volume_get_by_ec2_id(context, ec2_id)
|
||||
|
||||
|
||||
def volume_get_all_by_instance(context, instance_id):
|
||||
"""Get all volumes by instance id or raise if it does not exist."""
|
||||
return IMPL.volume_get_all_by_instance(context, instance_id)
|
||||
|
||||
|
||||
def volume_get_instance(context, volume_id):
|
||||
"""Get the instance that a volume is attached to."""
|
||||
return IMPL.volume_get_instance(context, volume_id)
|
||||
|
||||
@@ -900,6 +900,8 @@ def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id):
|
||||
def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id):
|
||||
return _instance_get_sum_by_host_and_project(context, 'local_gb',
|
||||
hostname, proj_id)
|
||||
|
||||
|
||||
@require_context
|
||||
def instance_action_create(context, values):
|
||||
"""Create an instance action from the values dictionary."""
|
||||
@@ -1497,6 +1499,18 @@ def volume_get_by_ec2_id(context, ec2_id):
|
||||
return result
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def volume_get_all_by_instance(context, instance_id):
|
||||
session = get_session()
|
||||
result = session.query(models.Volume).\
|
||||
filter_by(instance_id=instance_id).\
|
||||
filter_by(deleted=False).\
|
||||
all()
|
||||
if not result:
|
||||
raise exception.NotFound(_('No volume for instance %s') % instance_id)
|
||||
return result
|
||||
|
||||
|
||||
@require_context
|
||||
def volume_ec2_id_exists(context, ec2_id, session=None):
|
||||
if not session:
|
||||
|
||||
@@ -138,7 +138,6 @@ class NovaBase(object):
|
||||
# __tablename__ = 'hosts'
|
||||
# id = Column(String(255), primary_key=True)
|
||||
|
||||
# this class is created by masumotok
|
||||
class Host(BASE, NovaBase):
|
||||
"""Represents a host where services are running"""
|
||||
__tablename__ = 'hosts'
|
||||
|
||||
@@ -170,7 +170,8 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
# mocks for pre_live_migration
|
||||
self.ctxt = context.get_admin_context()
|
||||
db.instance_get = Mock(return_value=self.instance1)
|
||||
db.volume_get_by_ec2_id = Mock(return_value=[self.vol1, self.vol2])
|
||||
db.volume_get_all_by_instance \
|
||||
= Mock(return_value=[self.vol1, self.vol2])
|
||||
db.volume_get_shelf_and_blade = Mock(return_value=(3, 4))
|
||||
db.instance_get_fixed_address = Mock(return_value=self.fixed_ip1)
|
||||
db.security_group_get_by_instance \
|
||||
@@ -199,7 +200,7 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
def test02(self):
|
||||
"""02: NotAuthrized occurs on finding volume on DB. """
|
||||
|
||||
db.volume_get_by_ec2_id \
|
||||
db.volume_get_all_by_instance \
|
||||
= Mock(side_effect=exception.NotAuthorized('ERR'))
|
||||
|
||||
self.assertRaises(exception.NotAuthorized,
|
||||
@@ -211,7 +212,7 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
def test03(self):
|
||||
"""03: Unexpected exception occurs on finding volume on DB. """
|
||||
|
||||
db.volume_get_by_ec2_id = Mock(side_effect=TypeError('ERR'))
|
||||
db.volume_get_all_by_instance = Mock(side_effect=TypeError('ERR'))
|
||||
|
||||
self.assertRaises(TypeError,
|
||||
self.manager.pre_live_migration,
|
||||
@@ -222,7 +223,6 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
def test04(self):
|
||||
"""04: no volume and fixed ip found on DB, """
|
||||
|
||||
db.volume_get_by_ec2_id = Mock(side_effect=exception.NotFound('ERR'))
|
||||
db.instance_get_fixed_address = Mock(return_value=None)
|
||||
|
||||
self.assertRaises(rpc.RemoteError,
|
||||
@@ -230,10 +230,6 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
self.ctxt,
|
||||
'dummy_ec2_id',
|
||||
'host2')
|
||||
|
||||
c1 = (0 <= sys.stderr.buffer.find('has no volume'))
|
||||
|
||||
self.assertEqual(c1, True)
|
||||
|
||||
def test05(self):
|
||||
"""05: volume found and no fixed_ip found on DB. """
|
||||
|
||||
@@ -97,6 +97,9 @@ class NovaManageTestFunctions(unittest.TestCase):
|
||||
# prepare test data
|
||||
self.setTestData()
|
||||
|
||||
# only AoE is supported for now
|
||||
FLAGS.volume_driver = 'nova.volume.driver.AOEDriver'
|
||||
|
||||
|
||||
def setTestData(self):
|
||||
import bin.novamanagetest
|
||||
|
||||
@@ -474,7 +474,6 @@ class VlanManager(NetworkManager):
|
||||
"""Returns a fixed ip to the pool."""
|
||||
self.db.fixed_ip_update(context, address, {'allocated': False})
|
||||
|
||||
#def setup_compute_network(self, context, instance_id):
|
||||
def setup_compute_network(self, context, instance_id, network_ref=None):
|
||||
"""Sets up matching network for compute hosts."""
|
||||
if network_ref is None:
|
||||
|
||||
@@ -160,6 +160,15 @@ class Scheduler(object):
|
||||
power_state.PAUSED,
|
||||
'migrating')
|
||||
|
||||
# Changing volume state
|
||||
try:
|
||||
for vol in db.volume_get_all_by_instance(context, instance_id):
|
||||
db.volume_update(context,
|
||||
vol['id'],
|
||||
{'status': 'migrating'})
|
||||
except exception.NotFound:
|
||||
pass
|
||||
|
||||
# Requesting live migration.
|
||||
return src
|
||||
|
||||
|
||||
+1
-1
@@ -141,7 +141,7 @@ class Service(object):
|
||||
'local_gb': local_gb,
|
||||
'hypervisor_type': hypervisor,
|
||||
'hypervisor_version': version,
|
||||
'cpu_info':cpu_xml })
|
||||
'cpu_info': cpu_xml})
|
||||
return host_ref
|
||||
|
||||
def __getattr__(self, key):
|
||||
|
||||
@@ -1,222 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A service is a very thin wrapper around a Manager object. It exposes the
|
||||
manager's public methods to other components of the system via rpc. It will
|
||||
report state periodically to the database and is responsible for initiating
|
||||
any periodic tasts that need to be executed on a given host.
|
||||
|
||||
This module contains Service, a generic baseclass for all workers.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import task
|
||||
from twisted.application import service
|
||||
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova import rpc
|
||||
from nova import utils
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_integer('report_interval', 10,
|
||||
'seconds between nodes reporting state to datastore',
|
||||
lower_bound=1)
|
||||
|
||||
flags.DEFINE_integer('periodic_interval', 60,
|
||||
'seconds between running periodic tasks',
|
||||
lower_bound=1)
|
||||
|
||||
|
||||
class Service(object, service.Service):
|
||||
"""Base class for workers that run on hosts."""
|
||||
|
||||
def __init__(self, host, binary, topic, manager, report_interval=None,
|
||||
periodic_interval=None, *args, **kwargs):
|
||||
self.host = host
|
||||
self.binary = binary
|
||||
self.topic = topic
|
||||
self.manager_class_name = manager
|
||||
self.report_interval = report_interval
|
||||
self.periodic_interval = periodic_interval
|
||||
super(Service, self).__init__(*args, **kwargs)
|
||||
self.saved_args, self.saved_kwargs = args, kwargs
|
||||
|
||||
def startService(self): # pylint: disable-msg C0103
|
||||
manager_class = utils.import_class(self.manager_class_name)
|
||||
self.manager = manager_class(host=self.host, *self.saved_args,
|
||||
**self.saved_kwargs)
|
||||
self.manager.init_host()
|
||||
self.model_disconnected = False
|
||||
ctxt = context.get_admin_context()
|
||||
|
||||
try:
|
||||
host_ref = db.host_get_by_name(ctxt, self.host)
|
||||
except exception.NotFound:
|
||||
host_ref = db.host_create(ctxt, {'name': self.host})
|
||||
host_ref = self._update_host_ref(ctxt, host_ref)
|
||||
|
||||
try:
|
||||
service_ref = db.service_get_by_args(ctxt,
|
||||
self.host,
|
||||
self.binary)
|
||||
self.service_id = service_ref['id']
|
||||
except exception.NotFound:
|
||||
self._create_service_ref(ctxt)
|
||||
|
||||
conn = rpc.Connection.instance()
|
||||
if self.report_interval:
|
||||
consumer_all = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic=self.topic,
|
||||
proxy=self)
|
||||
consumer_node = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic='%s.%s' % (self.topic, self.host),
|
||||
proxy=self)
|
||||
|
||||
consumer_all.attach_to_twisted()
|
||||
consumer_node.attach_to_twisted()
|
||||
|
||||
pulse = task.LoopingCall(self.report_state)
|
||||
pulse.start(interval=self.report_interval, now=False)
|
||||
|
||||
if self.periodic_interval:
|
||||
pulse = task.LoopingCall(self.periodic_tasks)
|
||||
pulse.start(interval=self.periodic_interval, now=False)
|
||||
|
||||
def _create_service_ref(self, context):
|
||||
service_ref = db.service_create(context,
|
||||
{'host': self.host,
|
||||
'binary': self.binary,
|
||||
'topic': self.topic,
|
||||
'report_count': 0})
|
||||
self.service_id = service_ref['id']
|
||||
|
||||
def _update_host_ref(self, context, host_ref):
|
||||
|
||||
if 0 <= self.manager_class_name.find('ComputeManager'):
|
||||
vcpu = self.manager.driver.get_vcpu_number()
|
||||
memory_mb = self.manager.get_memory_mb()
|
||||
local_gb = self.manager.get_local_gb()
|
||||
hypervisor = self.manager.driver.get_hypervisor_type()
|
||||
version = self.manager.driver.get_hypervisor_version()
|
||||
cpu_xml = self.manager.driver.get_cpu_xml()
|
||||
|
||||
db.host_update(context,
|
||||
host_ref['id'],
|
||||
{'vcpus': vcpu,
|
||||
'memory_mb': memory_mb,
|
||||
'local_gb': local_gb,
|
||||
'hypervisor_type': hypervisor,
|
||||
'hypervisor_version': version,
|
||||
'cpu_info':cpu_xml })
|
||||
return host_ref
|
||||
|
||||
def __getattr__(self, key):
|
||||
manager = self.__dict__.get('manager', None)
|
||||
return getattr(manager, key)
|
||||
|
||||
@classmethod
|
||||
def create(cls,
|
||||
host=None,
|
||||
binary=None,
|
||||
topic=None,
|
||||
manager=None,
|
||||
report_interval=None,
|
||||
periodic_interval=None):
|
||||
"""Instantiates class and passes back application object.
|
||||
|
||||
Args:
|
||||
host, defaults to FLAGS.host
|
||||
binary, defaults to basename of executable
|
||||
topic, defaults to bin_name - "nova-" part
|
||||
manager, defaults to FLAGS.<topic>_manager
|
||||
report_interval, defaults to FLAGS.report_interval
|
||||
periodic_interval, defaults to FLAGS.periodic_interval
|
||||
"""
|
||||
if not host:
|
||||
host = FLAGS.host
|
||||
if not binary:
|
||||
binary = os.path.basename(inspect.stack()[-1][1])
|
||||
if not topic:
|
||||
topic = binary.rpartition("nova-")[2]
|
||||
if not manager:
|
||||
manager = FLAGS.get('%s_manager' % topic, None)
|
||||
if not report_interval:
|
||||
report_interval = FLAGS.report_interval
|
||||
if not periodic_interval:
|
||||
periodic_interval = FLAGS.periodic_interval
|
||||
logging.warn("Starting %s node", topic)
|
||||
service_obj = cls(host, binary, topic, manager,
|
||||
report_interval, periodic_interval)
|
||||
|
||||
# This is the parent service that twistd will be looking for when it
|
||||
# parses this file, return it so that we can get it into globals.
|
||||
application = service.Application(binary)
|
||||
service_obj.setServiceParent(application)
|
||||
return application
|
||||
|
||||
def kill(self):
|
||||
"""Destroy the service object in the datastore"""
|
||||
try:
|
||||
db.service_destroy(context.get_admin_context(), self.service_id)
|
||||
except exception.NotFound:
|
||||
logging.warn("Service killed that has no database entry")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def periodic_tasks(self):
|
||||
"""Tasks to be run at a periodic interval"""
|
||||
yield self.manager.periodic_tasks(context.get_admin_context())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def report_state(self):
|
||||
"""Update the state of this service in the datastore."""
|
||||
ctxt = context.get_admin_context()
|
||||
try:
|
||||
try:
|
||||
service_ref = db.service_get(ctxt, self.service_id)
|
||||
except exception.NotFound:
|
||||
logging.debug("The service database object disappeared, "
|
||||
"Recreating it.")
|
||||
self._create_service_ref(ctxt)
|
||||
service_ref = db.service_get(ctxt, self.service_id)
|
||||
|
||||
db.service_update(ctxt,
|
||||
self.service_id,
|
||||
{'report_count': service_ref['report_count'] + 1})
|
||||
|
||||
# TODO(termie): make this pattern be more elegant.
|
||||
if getattr(self, "model_disconnected", False):
|
||||
self.model_disconnected = False
|
||||
logging.error("Recovered model server connection!")
|
||||
|
||||
# TODO(vish): this should probably only catch connection errors
|
||||
except Exception: # pylint: disable-msg=W0702
|
||||
if not getattr(self, "model_disconnected", False):
|
||||
self.model_disconnected = True
|
||||
logging.exception("model server went away")
|
||||
yield
|
||||
+31
-14
@@ -695,8 +695,9 @@ class LibvirtConnection(object):
|
||||
xmlstr = self._conn.getCapabilities()
|
||||
xml = libxml2.parseDoc(xmlstr)
|
||||
nodes = xml.xpathEval('//cpu')
|
||||
if 1 != len(nodes):
|
||||
msg = 'Unexpected xml format. tag "cpu" must be 1, but %d.' % len(nodes)
|
||||
if 1 != len(nodes):
|
||||
msg = 'Unexpected xml format. tag "cpu" must be 1, but %d.' \
|
||||
% len(nodes)
|
||||
msg += '\n' + xml.serialize()
|
||||
raise exception.Invalid(_(msg))
|
||||
cpuxmlstr = re.sub("\n|[ ]+", ' ', nodes[0].serialize())
|
||||
@@ -735,8 +736,8 @@ class LibvirtConnection(object):
|
||||
except libvirt.libvirtError:
|
||||
return False
|
||||
|
||||
def compareCPU(self, xml):
|
||||
"""
|
||||
def compareCPU(self, xml):
|
||||
"""
|
||||
Check the host cpu is compatible to a cpu given by xml.
|
||||
"xml" must be a part of libvirt.openReadonly().getCapabilities().
|
||||
return values follows by virCPUCompareResult.
|
||||
@@ -747,9 +748,9 @@ class LibvirtConnection(object):
|
||||
return self._conn.compareCPU(xml, 0)
|
||||
|
||||
def live_migration(self, context, instance_ref, dest):
|
||||
"""
|
||||
Just spawning live_migration operation for
|
||||
distributing high-load.
|
||||
"""
|
||||
Just spawning live_migration operation for
|
||||
distributing high-load.
|
||||
"""
|
||||
greenthread.spawn(self._live_migration, context, instance_ref, dest)
|
||||
|
||||
@@ -757,14 +758,21 @@ class LibvirtConnection(object):
|
||||
""" Do live migration."""
|
||||
|
||||
# Do live migration.
|
||||
try:
|
||||
try:
|
||||
uri = FLAGS.live_migration_uri % dest
|
||||
out, err = utils.execute("sudo virsh migrate --live %s %s"
|
||||
% (instance_ref.name, uri))
|
||||
except exception.ProcessExecutionError:
|
||||
except exception.ProcessExecutionError:
|
||||
id = instance_ref['id']
|
||||
db.instance_set_state(context, id, power_state.RUNNING, 'running')
|
||||
raise
|
||||
try:
|
||||
for volume in db.volume_get_all_by_instance(context, id):
|
||||
db.volume_update(context,
|
||||
volume['id'],
|
||||
{'status': 'in-use'})
|
||||
except exception.NotFound:
|
||||
pass
|
||||
raise exception.ProcessExecutionError
|
||||
|
||||
# Waiting for completion of live_migration.
|
||||
timer = utils.LoopingCall(f=None)
|
||||
@@ -781,7 +789,7 @@ class LibvirtConnection(object):
|
||||
timer.start(interval=0.5, now=True)
|
||||
|
||||
def _post_live_migration(self, context, instance_ref, dest):
|
||||
"""
|
||||
"""
|
||||
Post operations for live migration.
|
||||
Mainly, database updating.
|
||||
"""
|
||||
@@ -808,13 +816,14 @@ class LibvirtConnection(object):
|
||||
db.network_update(context, network_ref['id'], {'host': dest})
|
||||
|
||||
try:
|
||||
floating_ip = db.instance_get_floating_address(context, instance_id)
|
||||
floating_ip \
|
||||
= db.instance_get_floating_address(context, instance_id)
|
||||
# Not return if floating_ip is not found, otherwise,
|
||||
# instance never be accessible..
|
||||
if None == floating_ip:
|
||||
logging.error('floating_ip is not found for %s ' % ec2_id)
|
||||
else:
|
||||
floating_ip_ref = db.floating_ip_get_by_address(context,
|
||||
else:
|
||||
floating_ip_ref = db.floating_ip_get_by_address(context,
|
||||
floating_ip)
|
||||
db.floating_ip_update(context,
|
||||
floating_ip_ref['address'],
|
||||
@@ -832,6 +841,14 @@ class LibvirtConnection(object):
|
||||
'state': power_state.RUNNING,
|
||||
'host': dest})
|
||||
|
||||
try:
|
||||
for volume in db.volume_get_all_by_instance(context, instance_id):
|
||||
db.volume_update(context,
|
||||
volume['id'],
|
||||
{'status': 'in-use'})
|
||||
except exception.NotFound:
|
||||
pass
|
||||
|
||||
logging.info(_('Live migrating %s to %s finishes successfully')
|
||||
% (ec2_id, dest))
|
||||
|
||||
|
||||
+23
-3
@@ -118,7 +118,7 @@ class VolumeDriver(object):
|
||||
"""Removes an export for a logical volume."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def discover_volume(self, volume):
|
||||
def discover_volume(self, context, volume):
|
||||
"""Discover volume on a remote host."""
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -180,15 +180,35 @@ class AOEDriver(VolumeDriver):
|
||||
self._try_execute("sudo vblade-persist destroy %s %s" %
|
||||
(shelf_id, blade_id))
|
||||
|
||||
def discover_volume(self, _volume):
|
||||
def discover_volume(self, context, volume):
|
||||
"""Discover volume on a remote host."""
|
||||
self._execute("sudo aoe-discover")
|
||||
self._execute("sudo aoe-stat", check_exit_code=False)
|
||||
shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context,
|
||||
volume['id'])
|
||||
return "/dev/etherd/e%s.%s" % (shelf_id, blade_id)
|
||||
|
||||
def undiscover_volume(self, _volume):
|
||||
"""Undiscover volume on a remote host."""
|
||||
pass
|
||||
|
||||
def check_for_export(self, context, volume_id):
|
||||
"""Make sure whether volume is exported."""
|
||||
(shelf_id,
|
||||
blade_id) = self.db.volume_get_shelf_and_blade(context,
|
||||
volume_id)
|
||||
(out, _err) = self._execute("sudo vblade-persist ls --no-header")
|
||||
exists = False
|
||||
for line in out.split('\n'):
|
||||
param = line.split(' ')
|
||||
if len(param) == 6 and param[0] == str(shelf_id) \
|
||||
and param[1] == str(blade_id) and param[-1] == "run":
|
||||
exists = True
|
||||
break
|
||||
if not exists:
|
||||
logging.warning(_("vblade process for e%s.%s isn't running.")
|
||||
% (shelf_id, blade_id))
|
||||
|
||||
|
||||
class FakeAOEDriver(AOEDriver):
|
||||
"""Logs calls instead of executing."""
|
||||
@@ -272,7 +292,7 @@ class ISCSIDriver(VolumeDriver):
|
||||
iscsi_portal = location.split(",")[0]
|
||||
return (iscsi_name, iscsi_portal)
|
||||
|
||||
def discover_volume(self, volume):
|
||||
def discover_volume(self, _context, volume):
|
||||
"""Discover volume on a remote host."""
|
||||
iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'],
|
||||
volume['host'])
|
||||
|
||||
+11
-1
@@ -137,7 +137,7 @@ class VolumeManager(manager.Manager):
|
||||
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
|
||||
path = self.driver.local_path(volume_ref)
|
||||
else:
|
||||
path = self.driver.discover_volume(volume_ref)
|
||||
path = self.driver.discover_volume(context, volume_ref)
|
||||
return path
|
||||
|
||||
def remove_compute_volume(self, context, volume_id):
|
||||
@@ -148,3 +148,13 @@ class VolumeManager(manager.Manager):
|
||||
return True
|
||||
else:
|
||||
self.driver.undiscover_volume(volume_ref)
|
||||
|
||||
def check_for_export(self, context, instance_id):
|
||||
"""Make sure whether volume is exported."""
|
||||
if FLAGS.volume_driver == 'nova.volume.driver.AOEDriver':
|
||||
try:
|
||||
for vol in self.db.volume_get_all_by_instance(context,
|
||||
instance_id):
|
||||
self.driver.check_for_export(context, vol['id'])
|
||||
except exception.NotFound:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user