ensure correct cleanup of multi-attach volumes
If a host has multiple instance with the same shared multi attach volume and you delete them in parallel nova need to correctly clean up the volume connection on the host when the last instance is removed. currently we do not have a volume level lock to guard the critical section that determins if the current disconnect is removing the final usage of the volume. This can lead to leaking the volume or other issues as noted in bug: #2048837 This change introduces a FairLockGuard to ensure we acquire and release the locks in a fair and orderd manner. The FairLockGuard is used to lock the server delete with one lock per multi attach volume. This will ensure that disconnects of diffrent volumes can happen in parallel but if we are disconnecting the same volume in multiple greenthread concurrently they will be serialised. Assisted-By: Cursor Auto Closes-Bug: #2048837 Change-Id: I67e10cace451259127a5d7da8fbdf7739afe3e51 Signed-off-by: Sean Mooney <work@seanmooney.info>
This commit is contained in:
+41
-24
@@ -22,7 +22,6 @@ handles RPC calls relating to creating instances. It is responsible for
|
|||||||
building a disk image, launching it via the underlying virtualization driver,
|
building a disk image, launching it via the underlying virtualization driver,
|
||||||
responding to calls to check its state, attaching persistent storage, and
|
responding to calls to check its state, attaching persistent storage, and
|
||||||
terminating it.
|
terminating it.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
@@ -3328,6 +3327,21 @@ class ComputeManager(manager.Manager):
|
|||||||
if original_exception is not None and raise_exc:
|
if original_exception is not None and raise_exc:
|
||||||
raise original_exception
|
raise original_exception
|
||||||
|
|
||||||
|
def _get_multiattach_volume_lock_names_bdms(
|
||||||
|
self, bdms: objects.BlockDeviceMappingList) -> ty.List[str]:
|
||||||
|
"""Get the lock names for multiattach volumes.
|
||||||
|
|
||||||
|
:param bdms: BlockDeviceMappingList object
|
||||||
|
:return: List of lock names
|
||||||
|
"""
|
||||||
|
if not bdms:
|
||||||
|
return []
|
||||||
|
return [
|
||||||
|
f"multi_attach_volume_{bdm.volume_id}"
|
||||||
|
for bdm in bdms
|
||||||
|
if bdm.is_multiattach
|
||||||
|
]
|
||||||
|
|
||||||
def _delete_instance(self, context, instance, bdms):
|
def _delete_instance(self, context, instance, bdms):
|
||||||
"""Delete an instance on this host.
|
"""Delete an instance on this host.
|
||||||
|
|
||||||
@@ -3345,35 +3359,38 @@ class ComputeManager(manager.Manager):
|
|||||||
compute_utils.notify_about_instance_action(context, instance,
|
compute_utils.notify_about_instance_action(context, instance,
|
||||||
self.host, action=fields.NotificationAction.DELETE,
|
self.host, action=fields.NotificationAction.DELETE,
|
||||||
phase=fields.NotificationPhase.START, bdms=bdms)
|
phase=fields.NotificationPhase.START, bdms=bdms)
|
||||||
|
with utils.FairLockGuard(
|
||||||
|
self._get_multiattach_volume_lock_names_bdms(bdms)
|
||||||
|
):
|
||||||
|
self._shutdown_instance(context, instance, bdms)
|
||||||
|
|
||||||
self._shutdown_instance(context, instance, bdms)
|
# NOTE(vish): We have already deleted the instance, so we have
|
||||||
|
# to ignore problems cleaning up the volumes. It
|
||||||
|
# would be nice to let the user know somehow that
|
||||||
|
# the volume deletion failed, but it is not
|
||||||
|
# acceptable to have an instance that can not be
|
||||||
|
# deleted. Perhaps this could be reworked in the
|
||||||
|
# future to set an instance fault the first time
|
||||||
|
# and to only ignore the failure if the instance
|
||||||
|
# is already in ERROR.
|
||||||
|
|
||||||
# NOTE(vish): We have already deleted the instance, so we have
|
# NOTE(ameeda): The volumes have already been detached during
|
||||||
# to ignore problems cleaning up the volumes. It
|
# the above _shutdown_instance() call and this is
|
||||||
# would be nice to let the user know somehow that
|
# why detach is not requested from
|
||||||
# the volume deletion failed, but it is not
|
# _cleanup_volumes() in this case
|
||||||
# acceptable to have an instance that can not be
|
|
||||||
# deleted. Perhaps this could be reworked in the
|
|
||||||
# future to set an instance fault the first time
|
|
||||||
# and to only ignore the failure if the instance
|
|
||||||
# is already in ERROR.
|
|
||||||
|
|
||||||
# NOTE(ameeda): The volumes have already been detached during
|
self._cleanup_volumes(context, instance, bdms,
|
||||||
# the above _shutdown_instance() call and this is
|
raise_exc=False, detach=False)
|
||||||
# why detach is not requested from
|
# if a delete task succeeded, always update vm state and task
|
||||||
# _cleanup_volumes() in this case
|
# state without expecting task state to be DELETING
|
||||||
|
instance.vm_state = vm_states.DELETED
|
||||||
|
instance.task_state = None
|
||||||
|
instance.power_state = power_state.NOSTATE
|
||||||
|
instance.terminated_at = timeutils.utcnow()
|
||||||
|
instance.save()
|
||||||
|
|
||||||
self._cleanup_volumes(context, instance, bdms,
|
|
||||||
raise_exc=False, detach=False)
|
|
||||||
# Delete Cyborg ARQs if the instance has a device profile.
|
# Delete Cyborg ARQs if the instance has a device profile.
|
||||||
compute_utils.delete_arqs_if_needed(context, instance)
|
compute_utils.delete_arqs_if_needed(context, instance)
|
||||||
# if a delete task succeeded, always update vm state and task
|
|
||||||
# state without expecting task state to be DELETING
|
|
||||||
instance.vm_state = vm_states.DELETED
|
|
||||||
instance.task_state = None
|
|
||||||
instance.power_state = power_state.NOSTATE
|
|
||||||
instance.terminated_at = timeutils.utcnow()
|
|
||||||
instance.save()
|
|
||||||
|
|
||||||
self._complete_deletion(context, instance)
|
self._complete_deletion(context, instance)
|
||||||
# only destroy the instance in the db if the _complete_deletion
|
# only destroy the instance in the db if the _complete_deletion
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
from oslo_db import api as oslo_db_api
|
from oslo_db import api as oslo_db_api
|
||||||
from oslo_db.sqlalchemy import update_match
|
from oslo_db.sqlalchemy import update_match
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
from oslo_utils import versionutils
|
from oslo_utils import versionutils
|
||||||
|
|
||||||
@@ -316,6 +317,26 @@ class BlockDeviceMapping(base.NovaPersistentObject, base.NovaObject,
|
|||||||
return (self.destination_type ==
|
return (self.destination_type ==
|
||||||
fields.BlockDeviceDestinationType.VOLUME)
|
fields.BlockDeviceDestinationType.VOLUME)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_multiattach(self):
|
||||||
|
"""Return whether the volume is multiattach.
|
||||||
|
|
||||||
|
Note that this property is only valid for volumes that are attached to
|
||||||
|
an instance. If the volume is not attached to an instance, this
|
||||||
|
property will return False.
|
||||||
|
|
||||||
|
:returns: True if the volume is multiattach, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not self.is_volume:
|
||||||
|
return False
|
||||||
|
if 'connection_info' not in self:
|
||||||
|
return False
|
||||||
|
if not self.connection_info:
|
||||||
|
return False
|
||||||
|
info = jsonutils.loads(self.connection_info)
|
||||||
|
return info.get("multiattach", False)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_image(self):
|
def is_image(self):
|
||||||
return self.source_type == fields.BlockDeviceSourceType.IMAGE
|
return self.source_type == fields.BlockDeviceSourceType.IMAGE
|
||||||
|
|||||||
@@ -11,13 +11,11 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
import threading
|
|
||||||
import time
|
from oslo_concurrency import lockutils
|
||||||
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from nova import context as nova_context
|
|
||||||
from nova import objects
|
|
||||||
from nova.tests.functional.libvirt import base
|
from nova.tests.functional.libvirt import base
|
||||||
from nova.virt import libvirt
|
from nova.virt import libvirt
|
||||||
|
|
||||||
@@ -66,15 +64,22 @@ class TestConcurrentMultiAttachCleanup(base.ServersTestBase):
|
|||||||
self.server_b = self._create_server(networks='none')
|
self.server_b = self._create_server(networks='none')
|
||||||
self.notifier.wait_for_versioned_notifications('instance.create.end')
|
self.notifier.wait_for_versioned_notifications('instance.create.end')
|
||||||
self._attach_volume(self.server_b, self.volume_id)
|
self._attach_volume(self.server_b, self.volume_id)
|
||||||
self.lock = threading.Lock()
|
# we use a reader writer lock because the allow any number of readers
|
||||||
|
# to progress as long as no one holds the write lock.
|
||||||
|
self.lock = lockutils.ReaderWriterLock()
|
||||||
# run periodics to allow async tasks to complete
|
# run periodics to allow async tasks to complete
|
||||||
self._run_periodics()
|
self._run_periodics()
|
||||||
|
|
||||||
def _should_disconnect(self, *args, **kwargs):
|
def _should_disconnect(self, *args, **kwargs):
|
||||||
with self.lock:
|
# use a read locks to not block concurrent requests
|
||||||
|
# when the write lock is not held.
|
||||||
|
self.lock.acquire_read_lock()
|
||||||
|
try:
|
||||||
result = self._orgi_should_disconnect(
|
result = self._orgi_should_disconnect(
|
||||||
self.compute_manager.driver, *args, **kwargs)
|
self.compute_manager.driver, *args, **kwargs)
|
||||||
return result
|
return result
|
||||||
|
finally:
|
||||||
|
self.lock.release_read_lock()
|
||||||
|
|
||||||
def test_serial_server_delete(self):
|
def test_serial_server_delete(self):
|
||||||
# Now that we have 2 vms both using the same multi attach volume
|
# Now that we have 2 vms both using the same multi attach volume
|
||||||
@@ -83,10 +88,12 @@ class TestConcurrentMultiAttachCleanup(base.ServersTestBase):
|
|||||||
self.disconnect_volume_mock.assert_not_called()
|
self.disconnect_volume_mock.assert_not_called()
|
||||||
|
|
||||||
self._delete_server(self.server_a)
|
self._delete_server(self.server_a)
|
||||||
self.should_disconnect_mock.assert_called()
|
self.should_disconnect_mock.assert_called_once()
|
||||||
|
self.should_disconnect_mock.reset_mock()
|
||||||
self.disconnect_volume_mock.assert_not_called()
|
self.disconnect_volume_mock.assert_not_called()
|
||||||
self._delete_server(self.server_b)
|
self._delete_server(self.server_b)
|
||||||
self.disconnect_volume_mock.assert_called()
|
self.disconnect_volume_mock.assert_called()
|
||||||
|
self.should_disconnect_mock.assert_called_once()
|
||||||
|
|
||||||
def test_concurrent_server_delete(self):
|
def test_concurrent_server_delete(self):
|
||||||
# Now that we have 2 vms both using the same multi attach volume
|
# Now that we have 2 vms both using the same multi attach volume
|
||||||
@@ -94,33 +101,20 @@ class TestConcurrentMultiAttachCleanup(base.ServersTestBase):
|
|||||||
# cleaning up
|
# cleaning up
|
||||||
self.should_disconnect_mock.assert_not_called()
|
self.should_disconnect_mock.assert_not_called()
|
||||||
self.disconnect_volume_mock.assert_not_called()
|
self.disconnect_volume_mock.assert_not_called()
|
||||||
# emulate concurrent delete
|
# emulate concurrent delete by acquiring the lock to prevent
|
||||||
context = nova_context.get_admin_context()
|
# the delete from progressing to far. We want to pause
|
||||||
servers_this_host = objects.InstanceList.get_uuids_by_host(
|
# at the call to _should_disconnect so we acquire the write
|
||||||
context, self.hostname)
|
# lock to block all readers.
|
||||||
with mock.patch('nova.objects.InstanceList.get_uuids_by_host',
|
self.lock.acquire_write_lock()
|
||||||
return_value=servers_this_host):
|
self.api.delete_server(self.server_a['id'])
|
||||||
self.lock.acquire()
|
self.api.delete_server(self.server_b['id'])
|
||||||
self.api.delete_server(self.server_a['id'])
|
self.disconnect_volume_mock.assert_not_called()
|
||||||
self.api.delete_server(self.server_b['id'])
|
# now that both delete are submitted and we are stopped at
|
||||||
self.disconnect_volume_mock.assert_not_called()
|
# nova.virt.libvirt.LibvirtDriver._should_disconnect_target
|
||||||
# this mostly stabilizes the test but it may not be 100% reliable.
|
# we can release the lock and allow the deletes to complete.
|
||||||
# locally with time.sleep(1) it passed 42 back to back executions
|
self.lock.release_write_lock()
|
||||||
# im not sure why this is required given the lock but it is likely
|
|
||||||
# due to a the lock being released before a background task is
|
|
||||||
# completed. i.e. the conductor or resource trakcer updating state.
|
|
||||||
time.sleep(1)
|
|
||||||
self.lock.release()
|
|
||||||
self._wait_until_deleted(self.server_a)
|
self._wait_until_deleted(self.server_a)
|
||||||
self._wait_until_deleted(self.server_b)
|
self._wait_until_deleted(self.server_b)
|
||||||
self.should_disconnect_mock.assert_called()
|
self.assertEqual(2, len(self.should_disconnect_mock.call_args_list))
|
||||||
# Fixme(sean-k-mooney): this is bug 2048837
|
# this validates bug 2048837
|
||||||
try:
|
self.disconnect_volume_mock.assert_called_once()
|
||||||
self.disconnect_volume_mock.assert_not_called()
|
|
||||||
except AssertionError:
|
|
||||||
# NOTE(sean-k-mooney): this reproducer is not 100%
|
|
||||||
# reliable so we convert a failure to a skip to avoid
|
|
||||||
# gating issues. the bug is addressed in the follow up patch
|
|
||||||
# and that test is stable so it is not worth fixing this test
|
|
||||||
# beyond the time.sleep(1) above.
|
|
||||||
self.skipTest("Bug 2048837: volume disconnect not called")
|
|
||||||
|
|||||||
@@ -398,6 +398,26 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
|||||||
startup=False,
|
startup=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test__get_multiattach_volume_lock_names_bdms(self):
|
||||||
|
bdms = objects.BlockDeviceMappingList(
|
||||||
|
objects=[
|
||||||
|
objects.BlockDeviceMapping(
|
||||||
|
volume_id=uuids.volume1,
|
||||||
|
destination_type='volume',
|
||||||
|
instance_uuid=uuids.instance,
|
||||||
|
attachment_id=uuids.attachment1),
|
||||||
|
objects.BlockDeviceMapping(
|
||||||
|
volume_id=uuids.volume2,
|
||||||
|
destination_type='volume',
|
||||||
|
instance_uuid=uuids.instance,
|
||||||
|
attachment_id=uuids.attachment2,
|
||||||
|
connection_info='{"multiattach": true}'),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
lock_names = self.compute._get_multiattach_volume_lock_names_bdms(bdms)
|
||||||
|
expected_lock_names = [f"multi_attach_volume_{uuids.volume2}"]
|
||||||
|
self.assertEqual(expected_lock_names, lock_names)
|
||||||
|
|
||||||
@mock.patch('nova.compute.manager.LOG')
|
@mock.patch('nova.compute.manager.LOG')
|
||||||
def test_update_available_resource_for_node_reshape_failed(self, log_mock):
|
def test_update_available_resource_for_node_reshape_failed(self, log_mock):
|
||||||
"""ReshapeFailed logs and reraises."""
|
"""ReshapeFailed logs and reraises."""
|
||||||
|
|||||||
@@ -251,6 +251,19 @@ class _TestBlockDeviceMappingObject(object):
|
|||||||
destination_type='local')
|
destination_type='local')
|
||||||
self.assertFalse(bdm.is_volume)
|
self.assertFalse(bdm.is_volume)
|
||||||
|
|
||||||
|
def test_is_mutiattach_true(self):
|
||||||
|
bdm = objects.BlockDeviceMapping(
|
||||||
|
context=self.context, volume_id=uuids.volume_id,
|
||||||
|
destination_type='volume',
|
||||||
|
connection_info='{"multiattach": true}')
|
||||||
|
self.assertTrue(bdm.is_multiattach)
|
||||||
|
|
||||||
|
def test_is_multiattach_false(self):
|
||||||
|
bdm = objects.BlockDeviceMapping(
|
||||||
|
context=self.context, destination_type='volume',
|
||||||
|
volume_id=uuids.volume_id)
|
||||||
|
self.assertFalse(bdm.is_multiattach)
|
||||||
|
|
||||||
def test_is_local(self):
|
def test_is_local(self):
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
objects.BlockDeviceMapping(
|
objects.BlockDeviceMapping(
|
||||||
|
|||||||
@@ -15,7 +15,9 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
|
import os.path
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
@@ -1705,3 +1707,261 @@ class OsloServiceBackendSelectionTestCase(test.NoDBTestCase):
|
|||||||
ex = self.assertRaises(ValueError, monkey_patch.patch, backend='foo')
|
ex = self.assertRaises(ValueError, monkey_patch.patch, backend='foo')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
"the backend can only be 'eventlet' or 'threading'", str(ex))
|
"the backend can only be 'eventlet' or 'threading'", str(ex))
|
||||||
|
|
||||||
|
|
||||||
|
class TestFairLockGuard(test.NoDBTestCase):
|
||||||
|
|
||||||
|
def test_aquire_single_lock(self):
|
||||||
|
lock_name = 'test_aquire_single_lock'
|
||||||
|
test_lock = utils.NOVA_FAIR_LOCKS.get(lock_name)
|
||||||
|
self.assertFalse(test_lock.has_pending_writers)
|
||||||
|
lock_guard = utils.FairLockGuard([lock_name])
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertIs(lock_guard.locks[0], test_lock)
|
||||||
|
self.assertTrue(test_lock.is_writer())
|
||||||
|
|
||||||
|
def test_aquire_multiple_locks(self):
|
||||||
|
lock_names = ['test_aquire_multiple_locks1',
|
||||||
|
'test_aquire_multiple_locks2']
|
||||||
|
test_locks = [utils.NOVA_FAIR_LOCKS.get(
|
||||||
|
lock_name) for lock_name in lock_names]
|
||||||
|
for lock in test_locks:
|
||||||
|
self.assertFalse(lock.has_pending_writers)
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names)
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
for i, lock in enumerate(lock_guard.locks):
|
||||||
|
self.assertIs(lock, test_locks[i])
|
||||||
|
self.assertTrue(lock_guard.locks[i].is_writer())
|
||||||
|
|
||||||
|
def test_aquire_multiple_locks_sorted(self):
|
||||||
|
lock_names = ['test_aquire_multiple_locks_sorted2',
|
||||||
|
'test_aquire_multiple_locks_sorted1']
|
||||||
|
self.assertNotEqual(lock_names, sorted(lock_names))
|
||||||
|
test_locks = [utils.NOVA_FAIR_LOCKS.get(
|
||||||
|
lock_name) for lock_name in lock_names]
|
||||||
|
for lock in test_locks:
|
||||||
|
self.assertFalse(lock.has_pending_writers)
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names)
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
for i, name in enumerate(sorted(lock_names)):
|
||||||
|
self.assertIs(lock_guard.locks[i],
|
||||||
|
utils.NOVA_FAIR_LOCKS.get(name))
|
||||||
|
self.assertTrue(lock_guard.locks[i].is_writer())
|
||||||
|
|
||||||
|
def test_locks_are_released_on_exception(self):
|
||||||
|
lock_names = ['test_locks_are_released_on_exception1',
|
||||||
|
'test_locks_are_released_on_exception2']
|
||||||
|
test_locks = [utils.NOVA_FAIR_LOCKS.get(
|
||||||
|
lock_name) for lock_name in lock_names]
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names)
|
||||||
|
try:
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
raise ValueError()
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
self.assertFalse(lock_guard.is_locked())
|
||||||
|
for lock in test_locks:
|
||||||
|
self.assertFalse(lock.is_writer())
|
||||||
|
|
||||||
|
def test_partial_acquire(self):
|
||||||
|
lock_names = ['test_partial_acquire1', 'test_partial_acquire2']
|
||||||
|
test_locks = [utils.NOVA_FAIR_LOCKS.get(
|
||||||
|
lock_name) for lock_name in lock_names]
|
||||||
|
|
||||||
|
lock_acquired = threading.Event()
|
||||||
|
lock_released = threading.Event()
|
||||||
|
worker_continue = threading.Event()
|
||||||
|
|
||||||
|
def worker():
|
||||||
|
test_locks[0].acquire_write_lock()
|
||||||
|
lock_acquired.set()
|
||||||
|
worker_continue.wait()
|
||||||
|
time.sleep(1)
|
||||||
|
test_locks[0].release_write_lock()
|
||||||
|
lock_released.set()
|
||||||
|
|
||||||
|
thread = threading.Thread(target=worker)
|
||||||
|
thread.start()
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names)
|
||||||
|
self.assertFalse(lock_guard.is_locked())
|
||||||
|
lock_acquired.wait()
|
||||||
|
# we should not have acquired the lock yet
|
||||||
|
# as the worker thread is still holding it
|
||||||
|
self.assertFalse(test_locks[0].is_writer())
|
||||||
|
# Note the first lock is still held by the worker thread
|
||||||
|
# so this will block until the worker releases it
|
||||||
|
worker_continue.set()
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
self.assertTrue(lock_released.is_set())
|
||||||
|
thread.join()
|
||||||
|
self.assertFalse(lock_guard.is_locked())
|
||||||
|
self.assertFalse(test_locks[0].is_writer())
|
||||||
|
self.assertFalse(test_locks[1].is_writer())
|
||||||
|
|
||||||
|
def test_thread_ordering_subset_locks(self):
|
||||||
|
"""Test that a thread requesting a subset of locks waits for the
|
||||||
|
first thread that requested all locks to complete first.
|
||||||
|
"""
|
||||||
|
lock_names_t1 = ['test_thread_ordering_A',
|
||||||
|
'test_thread_ordering_B',
|
||||||
|
'test_thread_ordering_C']
|
||||||
|
lock_names_t2 = ['test_thread_ordering_A',
|
||||||
|
'test_thread_ordering_B']
|
||||||
|
test_locks = [utils.NOVA_FAIR_LOCKS.get(
|
||||||
|
lock_name) for lock_name in lock_names_t1]
|
||||||
|
|
||||||
|
t1_acquired = threading.Event()
|
||||||
|
t1_acquired_time = None
|
||||||
|
t1_released = threading.Event()
|
||||||
|
t2_started = threading.Event()
|
||||||
|
t2_acquired = threading.Event()
|
||||||
|
t2_acquired_time = None
|
||||||
|
t1_continue = threading.Event()
|
||||||
|
|
||||||
|
def thread1():
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names_t1)
|
||||||
|
with lock_guard:
|
||||||
|
nonlocal t1_acquired_time
|
||||||
|
t1_acquired_time = time.time()
|
||||||
|
t1_acquired.set()
|
||||||
|
# Wait for T2 to start trying to acquire locks
|
||||||
|
t2_started.wait()
|
||||||
|
# Give T2 time to block on the locks
|
||||||
|
time.sleep(0.1)
|
||||||
|
t1_continue.wait()
|
||||||
|
# Hold locks for a bit to ensure T2 is waiting
|
||||||
|
time.sleep(0.5)
|
||||||
|
t1_released.set()
|
||||||
|
|
||||||
|
def thread2():
|
||||||
|
t2_started.set()
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names_t2)
|
||||||
|
# This should block until T1 releases all locks
|
||||||
|
with lock_guard:
|
||||||
|
nonlocal t2_acquired_time
|
||||||
|
t2_acquired_time = time.time()
|
||||||
|
t2_acquired.set()
|
||||||
|
|
||||||
|
t1 = threading.Thread(target=thread1)
|
||||||
|
t2 = threading.Thread(target=thread2)
|
||||||
|
|
||||||
|
t1.start()
|
||||||
|
# Wait for T1 to acquire all locks
|
||||||
|
self.assertTrue(t1_acquired.wait(timeout=2))
|
||||||
|
self.assertIsNotNone(t1_acquired_time)
|
||||||
|
t2.start()
|
||||||
|
# Wait for T2 to start trying to acquire locks
|
||||||
|
self.assertTrue(t2_started.wait(timeout=2))
|
||||||
|
# T2 should not have acquired locks yet
|
||||||
|
self.assertIsNone(t2_acquired_time)
|
||||||
|
t1_continue.set()
|
||||||
|
t1.join()
|
||||||
|
t2.join()
|
||||||
|
|
||||||
|
# T1 should have released all locks
|
||||||
|
self.assertTrue(t1_released.is_set())
|
||||||
|
# T2 should have acquired its subset of locks
|
||||||
|
self.assertTrue(t2_acquired.is_set())
|
||||||
|
self.assertIsNotNone(t2_acquired_time)
|
||||||
|
# T1 should have acquired its locks before T2
|
||||||
|
self.assertLess(t1_acquired_time, t2_acquired_time)
|
||||||
|
|
||||||
|
# and since all thread are now finished,
|
||||||
|
# all locks should be released
|
||||||
|
for lock in test_locks:
|
||||||
|
self.assertFalse(lock.is_writer())
|
||||||
|
|
||||||
|
def test_context_manager_shared_between_threads(self):
|
||||||
|
"""Test that a context manager can be shared between threads
|
||||||
|
and that the locks are acquired in the correct order.
|
||||||
|
Do not do this in production code, while it works this is not
|
||||||
|
the intended usage pattern.
|
||||||
|
"""
|
||||||
|
lock_names = ['test_context_manager_shared_between_threads1',
|
||||||
|
'test_context_manager_shared_between_threads2']
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names)
|
||||||
|
sum = 0
|
||||||
|
started = threading.Event()
|
||||||
|
|
||||||
|
def worker(id, lock_guard):
|
||||||
|
started.wait()
|
||||||
|
nonlocal sum
|
||||||
|
with lock_guard:
|
||||||
|
sum += id
|
||||||
|
thread1 = threading.Thread(target=worker, args=(1, lock_guard))
|
||||||
|
thread2 = threading.Thread(target=worker, args=(2, lock_guard))
|
||||||
|
thread1.start()
|
||||||
|
thread2.start()
|
||||||
|
started.set()
|
||||||
|
thread1.join()
|
||||||
|
thread2.join()
|
||||||
|
self.assertEqual(3, sum)
|
||||||
|
|
||||||
|
def test_context_manager_can_be_acquired_multiple_times(self):
|
||||||
|
lock_names = ['test_context_manager_can_be_acquired_multiple_times1',
|
||||||
|
'test_context_manager_can_be_acquired_multiple_times2']
|
||||||
|
test_locks = [utils.NOVA_FAIR_LOCKS.get(
|
||||||
|
lock_name) for lock_name in lock_names]
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names)
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
|
||||||
|
self.assertFalse(lock_guard.is_locked())
|
||||||
|
for lock in test_locks:
|
||||||
|
self.assertFalse(lock.is_writer())
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
|
||||||
|
def test_nested_context_managers(self):
|
||||||
|
lock_names = ['test_nested_context_managers1',
|
||||||
|
'test_nested_context_managers2']
|
||||||
|
test_locks = [utils.NOVA_FAIR_LOCKS.get(
|
||||||
|
lock_name) for lock_name in lock_names]
|
||||||
|
lock_guard = utils.FairLockGuard(lock_names)
|
||||||
|
lock_guard2 = utils.FairLockGuard(lock_names)
|
||||||
|
|
||||||
|
# on any one thread we can acquire the same lock
|
||||||
|
# multiple times provided we use separate context managers.
|
||||||
|
# In other words the locks are renterent.
|
||||||
|
# This is an implementation detail of the ReaderWriterLock
|
||||||
|
# provided by fasteners. if this test fails the api behavior
|
||||||
|
# of the FairLock has changed.
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
with lock_guard2:
|
||||||
|
self.assertTrue(lock_guard2.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
|
||||||
|
# attempting to nest the same context manager instance
|
||||||
|
# should raise a TypeError.
|
||||||
|
with lock_guard:
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
with self.assertRaisesRegex(
|
||||||
|
TypeError,
|
||||||
|
"Cannot enter FairLockGuard while it is already active."):
|
||||||
|
with lock_guard:
|
||||||
|
pass
|
||||||
|
# after the TypeError, the outer context should still
|
||||||
|
# be active.
|
||||||
|
self.assertTrue(lock_guard.is_locked())
|
||||||
|
self.assertTrue(test_locks[0].is_writer())
|
||||||
|
self.assertTrue(test_locks[1].is_writer())
|
||||||
|
|||||||
+102
@@ -59,6 +59,7 @@ CONF = nova.conf.CONF
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
synchronized = lockutils.synchronized_with_prefix('nova-')
|
synchronized = lockutils.synchronized_with_prefix('nova-')
|
||||||
|
NOVA_FAIR_LOCKS = lockutils.FairLocks()
|
||||||
|
|
||||||
SM_IMAGE_PROP_PREFIX = "image_"
|
SM_IMAGE_PROP_PREFIX = "image_"
|
||||||
SM_INHERITABLE_KEYS = (
|
SM_INHERITABLE_KEYS = (
|
||||||
@@ -1171,6 +1172,107 @@ def latch_error_on_raise(retryable=(_SentinelException,)):
|
|||||||
return outer_wrapper
|
return outer_wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class FairLockGuard:
|
||||||
|
"""A lock guard context manager
|
||||||
|
|
||||||
|
This class support acquiring multiple locks safely by name
|
||||||
|
and releasing them via the context manager protocol
|
||||||
|
i.e. with FairLockGuard([list of lock names]):
|
||||||
|
|
||||||
|
The intended usage model for this context manager is to
|
||||||
|
mediate access to a set of locks by name, not by pre-creating
|
||||||
|
the locks and passing them in. Lock creation and management
|
||||||
|
is handled entirely internally.
|
||||||
|
|
||||||
|
If you are using this between threads, Thread-A and Thread-B
|
||||||
|
should both create there own context manager instead of sharing
|
||||||
|
a single context manager between treads.
|
||||||
|
|
||||||
|
Nesting is supported by creating a new context manager instance
|
||||||
|
for each nested context with the same or different lock names
|
||||||
|
as the outer context. Attempting to nest the same context manager
|
||||||
|
instance should raise a TypeError.
|
||||||
|
|
||||||
|
Example Valid Usage:
|
||||||
|
```
|
||||||
|
with FairLockGuard(['lock1', 'lock2']) as lock_guard:
|
||||||
|
with FairLockGuard(['lock1', 'lock2']) as lock_guard2:
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
|
||||||
|
Example Invalid Usage:
|
||||||
|
```
|
||||||
|
with FairLockGuard(['lock1', 'lock2']) as lock_guard:
|
||||||
|
with lock_guard:
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
This will raise a TypeError because the same context manager instance
|
||||||
|
is being nested.
|
||||||
|
|
||||||
|
In general you should avoid naming the context manager instance
|
||||||
|
and only construct it in the with statement to avoid incorrect usage.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, names):
|
||||||
|
# we need to sort the lock to ensure we acquire
|
||||||
|
# them in the same order. This ensures we do not
|
||||||
|
# fall afoul of the Dining philosophers problem
|
||||||
|
# https://en.wikipedia.org/wiki/Dining_philosophers_problem
|
||||||
|
# by implementing the simplest Ordered solution
|
||||||
|
# https://howardhinnant.github.io/dining_philosophers.html
|
||||||
|
self.names = sorted(names)
|
||||||
|
self.locks = []
|
||||||
|
# NOTE(sean-k-mooney): this is technically not required
|
||||||
|
# but it protect the internal state of self.locks list
|
||||||
|
# from incorrect usage where a context manager instance is
|
||||||
|
# shared between threads. We use a reader-writer lock to
|
||||||
|
# allow concurrent reads in is_locked() while ensuring
|
||||||
|
# exclusive access for state modifications.
|
||||||
|
self.locks_lock = lockutils.ReaderWriterLock()
|
||||||
|
self._active = False
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
with self.locks_lock.write_lock():
|
||||||
|
if self._active:
|
||||||
|
raise TypeError(
|
||||||
|
"Cannot enter FairLockGuard while it is already active. "
|
||||||
|
"Create a new instance for nested usage or wait for the "
|
||||||
|
"current context to exit.")
|
||||||
|
for name in self.names:
|
||||||
|
named_lock = NOVA_FAIR_LOCKS.get(name)
|
||||||
|
self.locks.append(named_lock)
|
||||||
|
# we ensure we add it to the list
|
||||||
|
# before we acquire the lock to make sure
|
||||||
|
# we release it if there is an exception
|
||||||
|
named_lock.acquire_write_lock()
|
||||||
|
self._active = True
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
with self.locks_lock.write_lock():
|
||||||
|
for lock in self.locks:
|
||||||
|
# This should always be true but since
|
||||||
|
# we add the lock to the list before we
|
||||||
|
# acquire it check anyway.
|
||||||
|
if lock.is_writer():
|
||||||
|
lock.release_write_lock()
|
||||||
|
self.locks = []
|
||||||
|
self._active = False
|
||||||
|
|
||||||
|
def is_locked(self):
|
||||||
|
# NOTE(sean-k-mooney): LockGuards exist in several programming
|
||||||
|
# languages such as c++, in general a LockGuard can support a
|
||||||
|
# single lock or many but by convention a LockGuard is only
|
||||||
|
# considered locked if all locks are acquired. FairLocks in
|
||||||
|
# oslo are implemented as ReaderWriter inter-process locks.
|
||||||
|
# that means that we hold the lock if we are hold the writer
|
||||||
|
# lock aka if the lock is the writer. We use a read lock here
|
||||||
|
# to allow concurrent reads while state modifications use write
|
||||||
|
# locks.
|
||||||
|
with self.locks_lock.read_lock():
|
||||||
|
return (
|
||||||
|
self.locks and all(lock.is_writer() for lock in self.locks))
|
||||||
|
|
||||||
|
|
||||||
def concurrency_mode_threading():
|
def concurrency_mode_threading():
|
||||||
"""Returns true if the service is running in threading mode, false if
|
"""Returns true if the service is running in threading mode, false if
|
||||||
running in Eventlet mode
|
running in Eventlet mode
|
||||||
|
|||||||
@@ -0,0 +1,11 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
When deleting multiple instances that share the same multi-attach volume
|
||||||
|
on the same compute host in parallel, the volume connection could be left
|
||||||
|
attached to the host even after all instances were deleted, preventing the
|
||||||
|
volume from being properly detached and reused. Nova now correctly cleans
|
||||||
|
up multi-attach volume connections when deleting instances, including when
|
||||||
|
multiple instances sharing the same volume are deleted concurrently. For
|
||||||
|
more details, see `bug 2048837 <https://bugs.launchpad.net/nova/+bug/2048837>`__.
|
||||||
|
|
||||||
Reference in New Issue
Block a user