Merge "Limit parallel live migrations in progress"
This commit is contained in:
+48
-16
@@ -128,6 +128,14 @@ compute_opts = [
|
||||
cfg.IntOpt('max_concurrent_builds',
|
||||
default=10,
|
||||
help='Maximum number of instance builds to run concurrently'),
|
||||
cfg.IntOpt('max_concurrent_live_migrations',
|
||||
default=1,
|
||||
help='Maximum number of live migrations to run concurrently. '
|
||||
'This limit is enforced to avoid outbound live migrations '
|
||||
'overwhelming the host/network and causing failures. It '
|
||||
'is not recommended that you change this unless you are '
|
||||
'very sure that doing so is safe and stable in your '
|
||||
'environment.'),
|
||||
cfg.IntOpt('block_device_allocate_retries',
|
||||
default=60,
|
||||
help='Number of times to retry block device'
|
||||
@@ -686,6 +694,11 @@ class ComputeManager(manager.Manager):
|
||||
CONF.max_concurrent_builds)
|
||||
else:
|
||||
self._build_semaphore = compute_utils.UnlimitedSemaphore()
|
||||
if max(CONF.max_concurrent_live_migrations, 0) != 0:
|
||||
self._live_migration_semaphore = eventlet.semaphore.Semaphore(
|
||||
CONF.max_concurrent_live_migrations)
|
||||
else:
|
||||
self._live_migration_semaphore = compute_utils.UnlimitedSemaphore()
|
||||
|
||||
super(ComputeManager, self).__init__(service_name="compute",
|
||||
*args, **kwargs)
|
||||
@@ -4915,22 +4928,8 @@ class ComputeManager(manager.Manager):
|
||||
|
||||
return pre_live_migration_data
|
||||
|
||||
@wrap_exception()
|
||||
@wrap_instance_event
|
||||
@wrap_instance_fault
|
||||
def live_migration(self, context, dest, instance, block_migration,
|
||||
migration, migrate_data):
|
||||
"""Executing live migration.
|
||||
|
||||
:param context: security context
|
||||
:param instance: a nova.objects.instance.Instance object
|
||||
:param dest: destination host
|
||||
:param block_migration: if true, prepare for block migration
|
||||
:param migration: an nova.objects.Migration object
|
||||
:param migrate_data: implementation specific params
|
||||
|
||||
"""
|
||||
|
||||
def _do_live_migration(self, context, dest, instance, block_migration,
|
||||
migration, migrate_data):
|
||||
# NOTE(danms): Remove these guards in v5.0 of the RPC API
|
||||
if migration:
|
||||
# NOTE(danms): We should enhance the RT to account for migrations
|
||||
@@ -4985,6 +4984,39 @@ class ComputeManager(manager.Manager):
|
||||
migration.status = 'failed'
|
||||
migration.save()
|
||||
|
||||
@wrap_exception()
|
||||
@wrap_instance_event
|
||||
@wrap_instance_fault
|
||||
def live_migration(self, context, dest, instance, block_migration,
|
||||
migration, migrate_data):
|
||||
"""Executing live migration.
|
||||
|
||||
:param context: security context
|
||||
:param dest: destination host
|
||||
:param instance: a nova.objects.instance.Instance object
|
||||
:param block_migration: if true, prepare for block migration
|
||||
:param migration: an nova.objects.Migration object
|
||||
:param migrate_data: implementation specific params
|
||||
|
||||
"""
|
||||
|
||||
# NOTE(danms): Remove these guards in v5.0 of the RPC API
|
||||
if migration:
|
||||
migration.status = 'queued'
|
||||
migration.save()
|
||||
|
||||
def dispatch_live_migration(*args, **kwargs):
|
||||
with self._live_migration_semaphore:
|
||||
self._do_live_migration(*args, **kwargs)
|
||||
|
||||
# NOTE(danms): We spawn here to return the RPC worker thread back to
|
||||
# the pool. Since what follows could take a really long time, we don't
|
||||
# want to tie up RPC workers.
|
||||
utils.spawn_n(dispatch_live_migration,
|
||||
context, dest, instance,
|
||||
block_migration, migration,
|
||||
migrate_data)
|
||||
|
||||
def _live_migration_cleanup_flags(self, block_migration, migrate_data):
|
||||
"""Determine whether disks or instance path need to be cleaned up after
|
||||
live migration (at source on success, at destination on rollback)
|
||||
|
||||
@@ -3867,3 +3867,56 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
|
||||
self.flags(enabled=True, group=console)
|
||||
self.assertTrue(self.compute._consoles_enabled())
|
||||
self.flags(enabled=False, group=console)
|
||||
|
||||
@mock.patch('nova.utils.spawn_n')
|
||||
@mock.patch('nova.compute.manager.ComputeManager.'
|
||||
'_do_live_migration')
|
||||
def _test_max_concurrent_live(self, mock_lm, mock_spawn):
|
||||
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
|
||||
|
||||
@mock.patch('nova.objects.Migration.save')
|
||||
def _do_it(mock_mig_save):
|
||||
instance = objects.Instance(uuid=str(uuid.uuid4()))
|
||||
migration = objects.Migration()
|
||||
self.compute.live_migration(self.context,
|
||||
mock.sentinel.dest,
|
||||
instance,
|
||||
mock.sentinel.block_migration,
|
||||
migration,
|
||||
mock.sentinel.migrate_data)
|
||||
self.assertEqual('queued', migration.status)
|
||||
migration.save.assert_called_once_with()
|
||||
|
||||
with mock.patch.object(self.compute,
|
||||
'_live_migration_semaphore') as mock_sem:
|
||||
for i in (1, 2, 3):
|
||||
_do_it()
|
||||
self.assertEqual(3, mock_sem.__enter__.call_count)
|
||||
|
||||
def test_max_concurrent_live_limited(self):
|
||||
self.flags(max_concurrent_live_migrations=2)
|
||||
self._test_max_concurrent_live()
|
||||
|
||||
def test_max_concurrent_live_unlimited(self):
|
||||
self.flags(max_concurrent_live_migrations=0)
|
||||
self._test_max_concurrent_live()
|
||||
|
||||
def test_max_concurrent_live_semaphore_limited(self):
|
||||
self.flags(max_concurrent_live_migrations=123)
|
||||
self.assertEqual(
|
||||
123,
|
||||
manager.ComputeManager()._live_migration_semaphore.balance)
|
||||
|
||||
def test_max_concurrent_live_semaphore_unlimited(self):
|
||||
self.flags(max_concurrent_live_migrations=0)
|
||||
compute = manager.ComputeManager()
|
||||
self.assertEqual(0, compute._live_migration_semaphore.balance)
|
||||
self.assertIsInstance(compute._live_migration_semaphore,
|
||||
compute_utils.UnlimitedSemaphore)
|
||||
|
||||
def test_max_concurrent_live_semaphore_negative(self):
|
||||
self.flags(max_concurrent_live_migrations=-2)
|
||||
compute = manager.ComputeManager()
|
||||
self.assertEqual(0, compute._live_migration_semaphore.balance)
|
||||
self.assertIsInstance(compute._live_migration_semaphore,
|
||||
compute_utils.UnlimitedSemaphore)
|
||||
|
||||
@@ -760,7 +760,9 @@ class Domain(object):
|
||||
return 0
|
||||
|
||||
def jobInfo(self):
|
||||
return []
|
||||
# NOTE(danms): This is an array of 12 integers, so just report
|
||||
# something to avoid an IndexError if we look at this
|
||||
return [0] * 12
|
||||
|
||||
def jobStats(self, flags=0):
|
||||
return {}
|
||||
|
||||
@@ -11664,18 +11664,18 @@ class LibvirtConnTestCase(test.NoDBTestCase):
|
||||
dstfile, "qcow2")
|
||||
mock_define.assert_called_once_with(xmldoc)
|
||||
|
||||
@mock.patch.object(utils, "spawn")
|
||||
def test_live_migration_hostname_valid(self, mock_spawn):
|
||||
@mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration")
|
||||
def test_live_migration_hostname_valid(self, mock_lm):
|
||||
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
|
||||
drvr.live_migration(self.context, self.test_instance,
|
||||
"host1.example.com",
|
||||
lambda x: x,
|
||||
lambda x: x)
|
||||
self.assertEqual(1, mock_spawn.call_count)
|
||||
self.assertEqual(1, mock_lm.call_count)
|
||||
|
||||
@mock.patch.object(utils, "spawn")
|
||||
@mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration")
|
||||
@mock.patch.object(fake_libvirt_utils, "is_valid_hostname")
|
||||
def test_live_migration_hostname_invalid(self, mock_hostname, mock_spawn):
|
||||
def test_live_migration_hostname_invalid(self, mock_hostname, mock_lm):
|
||||
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
|
||||
mock_hostname.return_value = False
|
||||
self.assertRaises(exception.InvalidHostname,
|
||||
|
||||
@@ -5475,9 +5475,9 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
if not libvirt_utils.is_valid_hostname(dest):
|
||||
raise exception.InvalidHostname(hostname=dest)
|
||||
|
||||
utils.spawn(self._live_migration, context, instance, dest,
|
||||
post_method, recover_method, block_migration,
|
||||
migrate_data)
|
||||
self._live_migration(context, instance, dest,
|
||||
post_method, recover_method, block_migration,
|
||||
migrate_data)
|
||||
|
||||
def _update_xml(self, xml_str, volume, listen_addrs):
|
||||
xml_doc = etree.fromstring(xml_str)
|
||||
|
||||
Reference in New Issue
Block a user