From 48ddc7c4d13028a0ebcbabfa75a19828129d0d0b Mon Sep 17 00:00:00 2001 From: Kamil Sambor Date: Mon, 5 May 2025 12:46:43 +0200 Subject: [PATCH] Replace eventlet.event.Event with threading.Event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As part of removing Eventlet from the codebase, this patch replaces eventlet.event.Event with the standard library's threading.Event. To maintain the existing interface behavior, a helper class ThreadingEventWithResult is introduced. This class mimics Eventlet's Event by supporting result passing and retrieval, which threading.Event does not natively support. The interface between eventlet.event.Event and ThreadingEventWithResult maps as follows: send(value) → set() wait() → wait() ready() → is_set() Change-Id: I469ca9592a5c6d1f7ea1f54e4d34546224ce7ada Signed-off-by: Kamil Sambor --- nova/compute/manager.py | 90 ++++--- nova/exception.py | 5 + nova/tests/unit/compute/test_compute_mgr.py | 249 +++++++++++++++++- nova/tests/unit/compute/test_shelve.py | 5 +- nova/tests/unit/compute/test_virtapi.py | 32 +-- nova/tests/unit/virt/libvirt/test_driver.py | 60 ++--- .../unit/virt/libvirt/volume/test_mount.py | 4 +- nova/virt/libvirt/driver.py | 9 +- nova/virt/zvm/driver.py | 3 +- 9 files changed, 353 insertions(+), 104 deletions(-) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 283a488a49..7b7da4fd56 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -40,8 +40,6 @@ import typing as ty from cinderclient import exceptions as cinder_exception from cursive import exception as cursive_exception -import eventlet.event -import eventlet.timeout import futurist from keystoneauth1 import exceptions as keystone_exception from openstack import exceptions as sdk_exc @@ -239,9 +237,34 @@ def delete_image_on_error(function): return decorated_function +class ThreadingEventWithResult(threading.Event): + + UNSET_SENTINEL = object() + FAILED_SENTINEL = object() + + def __init__(self): + super().__init__() + self._result = self.UNSET_SENTINEL + self._lock = threading.Lock() + + def set(self, result=None): + with self._lock: + if super().is_set() and result != self._result: + raise ValueError('Cannot change the result once it is set') + self._result = result + super().set() + + def wait(self, timeout=None): + succeeded = super().wait(timeout) + if succeeded: + return self._result + else: + return self.FAILED_SENTINEL + + # Each collection of events is a dict of eventlet Events keyed by a tuple of # event name and associated tag -_InstanceEvents = ty.Dict[ty.Tuple[str, str], eventlet.event.Event] +_InstanceEvents = ty.Dict[ty.Tuple[str, str], ThreadingEventWithResult] class InstanceEvents(object): @@ -257,12 +280,12 @@ class InstanceEvents(object): instance: 'objects.Instance', name: str, tag: str, - ) -> eventlet.event.Event: + ) -> ThreadingEventWithResult: """Prepare to receive an event for an instance. This will register an event for the given instance that we will wait on later. This should be called before initiating whatever - action will trigger the event. The resulting eventlet.event.Event + action will trigger the event. The resulting ThreadingEventWithResult object should be wait()'d on to ensure completion. :param instance: the instance for which the event will be generated @@ -280,7 +303,7 @@ class InstanceEvents(object): instance_events = self._events.setdefault(instance.uuid, {}) return instance_events.setdefault((name, tag), - eventlet.event.Event()) + ThreadingEventWithResult()) LOG.debug('Preparing to wait for external event %(name)s-%(tag)s', {'name': name, 'tag': tag}, instance=instance) return _create_or_get_event() @@ -294,7 +317,7 @@ class InstanceEvents(object): :param instance: the instance for which the event was generated :param event: the nova.objects.external_event.InstanceExternalEvent that describes the event - :returns: the eventlet.event.Event object on which the waiters + :returns: the ThreadingEventWithResult object on which the waiters are blocked """ no_events_sentinel = object() @@ -344,7 +367,7 @@ class InstanceEvents(object): and return them (indexed by event name). :param instance: the instance for which events should be purged - :returns: a dictionary of {event_name: eventlet.event.Event} + :returns: a dictionary of {event_name: ThreadingEventWithResult} """ @utils.synchronized(self._lock_name(instance)) def _clear_events(): @@ -378,7 +401,7 @@ class InstanceEvents(object): instance_uuid=instance_uuid, name=name, status='failed', tag=tag, data={}) - eventlet_event.send(event) + eventlet_event.set(event) class ComputeVirtAPI(virtapi.VirtAPI): @@ -414,7 +437,7 @@ class ComputeVirtAPI(virtapi.VirtAPI): TIMED_OUT = "timed out" RECEIVED_NOT_PROCESSED = "received but not processed" - def __init__(self, name: str, event: eventlet.event.Event) -> None: + def __init__(self, name: str, event: ThreadingEventWithResult) -> None: self.name = name self.event = event self.status = self.EXPECTED @@ -427,19 +450,17 @@ class ComputeVirtAPI(virtapi.VirtAPI): return self.status == self.RECEIVED_EARLY def _update_status_no_wait(self): - if self.status == self.EXPECTED and self.event.ready(): + if self.status == self.EXPECTED and self.event.is_set(): self.status = self.RECEIVED_NOT_PROCESSED - def wait(self) -> 'objects.InstanceExternalEvent': + def wait(self, timeout) -> 'objects.InstanceExternalEvent': self.status = self.WAITING - try: - with timeutils.StopWatch() as sw: - instance_event = self.event.wait() - except eventlet.timeout.Timeout: + with timeutils.StopWatch() as sw: + instance_event = self.event.wait(timeout) + if instance_event is ThreadingEventWithResult.FAILED_SENTINEL: self.status = self.TIMED_OUT self.wait_time = sw.elapsed() - - raise + raise exception.InstanceEventTimeout() self.status = self.RECEIVED self.wait_time = sw.elapsed() @@ -464,14 +485,18 @@ class ComputeVirtAPI(virtapi.VirtAPI): instance: 'objects.Instance', events: dict, error_callback: ty.Callable, + timeout: int, ) -> None: + deadline = time.monotonic() + timeout for event_name, event in events.items(): if event.is_received_early(): continue - else: - actual_event = event.wait() - if actual_event.status == 'completed': - continue + remaining_time = deadline - time.monotonic() + if remaining_time <= 0: + raise exception.InstanceEventTimeout() + actual_event = event.wait(timeout=remaining_time) + if actual_event.status == 'completed': + continue # If we get here, we have an event that was not completed, # nor skipped via exit_wait_early(). Decide whether to # keep waiting by calling the error_callback() hook. @@ -488,12 +513,12 @@ class ComputeVirtAPI(virtapi.VirtAPI): provided event_names, yield, and then wait for all the scheduled events to complete. - Note that this uses an eventlet.timeout.Timeout to bound the + Note that this uses an InstanceEventTimeout to bound the operation, so callers should be prepared to catch that failure and handle that situation appropriately. If the event is not received by the specified timeout deadline, - eventlet.timeout.Timeout is raised. + InstanceEventTimeout is raised. If the event is received but did not have a 'completed' status, a NovaException is raised. If an error_callback is @@ -556,10 +581,9 @@ class ComputeVirtAPI(virtapi.VirtAPI): sw = timeutils.StopWatch() sw.start() try: - with eventlet.timeout.Timeout(deadline): - self._wait_for_instance_events( - instance, events, error_callback) - except eventlet.timeout.Timeout: + self._wait_for_instance_events( + instance, events, error_callback, timeout=deadline) + except exception.InstanceEventTimeout: LOG.warning( 'Timeout waiting for %(events)s for instance with ' 'vm_state %(vm_state)s and task_state %(task_state)s. ' @@ -2830,7 +2854,7 @@ class ComputeManager(manager.Manager): arqs, requested_networks) LOG.debug("ARQs for spec:%s, ARQs for network:%s", spec_arqs, network_arqs) - except (Exception, eventlet.timeout.Timeout) as exc: + except (Exception, exception.InstanceEventTimeout) as exc: LOG.exception(exc) # ARQs created for instance or ports. # The port binding isn't done yet. @@ -3724,7 +3748,7 @@ class ComputeManager(manager.Manager): try: accel_info = self._get_bound_arq_resources( context, instance, accel_uuids or []) - except (Exception, eventlet.timeout.Timeout) as exc: + except (Exception, exception.InstanceEventTimeout) as exc: LOG.exception(exc) self._build_resources_cleanup(instance, network_info) msg = _('Failure getting accelerator resources.') @@ -7681,7 +7705,7 @@ class ComputeManager(manager.Manager): try: accel_info = self._get_bound_arq_resources( context, instance, accel_uuids) - except (Exception, eventlet.timeout.Timeout) as exc: + except (Exception, exception.InstanceEventTimeout) as exc: LOG.exception('Failure getting accelerator requests ' 'with the exception: %s', exc, instance=instance) @@ -9542,7 +9566,7 @@ class ComputeManager(manager.Manager): self._cleanup_pre_live_migration( context, dest, instance, migration, migrate_data, source_bdms) - except eventlet.timeout.Timeout: + except exception.InstanceEventTimeout: # We only get here if wait_for_vif_plugged is True which means # live_migration_wait_for_vif_plug=True on the destination host. msg = ( @@ -11535,7 +11559,7 @@ class ComputeManager(manager.Manager): if _event: LOG.debug('Processing event %(event)s', {'event': event.key}, instance=instance) - _event.send(event) + _event.set(event) else: # If it's a network-vif-unplugged event and the instance is being # deleted or live migrated then we don't need to make this a diff --git a/nova/exception.py b/nova/exception.py index 34b4ff2718..b7a01300c4 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -2661,3 +2661,8 @@ class EphemeralEncryptionCleanupFailed(NovaException): class HostConflict(Exception): pass + + +class InstanceEventTimeout(Exception): + """A custom timeout exception to replace eventlet.timeout.Timeout.""" + pass diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py index dc0301a7d4..fc3acc437d 100644 --- a/nova/tests/unit/compute/test_compute_mgr.py +++ b/nova/tests/unit/compute/test_compute_mgr.py @@ -16,14 +16,13 @@ import contextlib import copy import datetime import fixtures as std_fixtures +import threading import time from unittest import mock from cinderclient import exceptions as cinder_exception from cursive import exception as cursive_exception import ddt -from eventlet import event as eventlet_event -from eventlet import timeout as eventlet_timeout from keystoneauth1 import exceptions as keystone_exception import netaddr from openstack import exceptions as sdk_exc @@ -5491,12 +5490,12 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, result, self.compute.instance_events._events[uuids.instance] [('test-event', None)]) - self.assertTrue(hasattr(result, 'send')) + self.assertTrue(hasattr(result, 'set')) lock_name_mock.assert_called_once_with(inst_obj) @mock.patch('nova.compute.manager.InstanceEvents._lock_name') def test_pop_instance_event(self, lock_name_mock): - event = eventlet_event.Event() + event = manager.ThreadingEventWithResult() self.compute.instance_events._events = { uuids.instance: { ('network-vif-plugged', None): event, @@ -5512,7 +5511,7 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, @mock.patch('nova.compute.manager.InstanceEvents._lock_name') def test_clear_events_for_instance(self, lock_name_mock): - event = eventlet_event.Event() + event = manager.ThreadingEventWithResult() self.compute.instance_events._events = { uuids.instance: { ('test-event', None): event, @@ -5544,10 +5543,10 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, result, self.compute.instance_events._events[uuids.instance] [('test-event', None)]) - self.assertTrue(hasattr(result, 'send')) + self.assertTrue(hasattr(result, 'set')) def test_process_instance_event(self): - event = eventlet_event.Event() + event = manager.ThreadingEventWithResult() self.compute.instance_events._events = { uuids.instance: { ('network-vif-plugged', None): event, @@ -5557,7 +5556,7 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, event_obj = objects.InstanceExternalEvent(name='network-vif-plugged', tag=None) self.compute._process_instance_event(inst_obj, event_obj) - self.assertTrue(event.ready()) + self.assertTrue(event.is_set()) self.assertEqual(event_obj, event.wait()) self.assertEqual({}, self.compute.instance_events._events) @@ -6033,8 +6032,8 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, self.compute.instance_events.cancel_all_events() # call it again to make sure we handle that gracefully self.compute.instance_events.cancel_all_events() - self.assertTrue(fake_eventlet_event.send.called) - event = fake_eventlet_event.send.call_args_list[0][0][0] + self.assertTrue(fake_eventlet_event.set.called) + event = fake_eventlet_event.set.call_args_list[0][0][0] self.assertEqual('network-vif-plugged', event.name) self.assertEqual(uuids.portid, event.tag) self.assertEqual('failed', event.status) @@ -8607,9 +8606,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase): arq_uuids = [arq['uuid'] for arq in arq_list] mock_get_arqs.return_value = arq_list - mock_wait_inst_ev.side_effect = eventlet_timeout.Timeout + mock_wait_inst_ev.side_effect = exception.InstanceEventTimeout - self.assertRaises(eventlet_timeout.Timeout, + self.assertRaises(exception.InstanceEventTimeout, self.compute._get_bound_arq_resources, self.context, self.instance, arq_uuids) @@ -12032,7 +12031,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase, self.compute.virtapi, 'wait_for_instance_event') as wait_for_event: wait_for_event.return_value.__enter__.side_effect = ( - eventlet_timeout.Timeout()) + exception.InstanceEventTimeout()) ex = self.assertRaises( exception.MigrationError, self.compute._do_live_migration, self.context, 'dest-host', self.instance, None, @@ -12070,7 +12069,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase, self.compute.virtapi, 'wait_for_instance_event') as wait_for_event: wait_for_event.return_value.__enter__.side_effect = ( - eventlet_timeout.Timeout()) + exception.InstanceEventTimeout()) self.compute._do_live_migration( self.context, 'dest-host', self.instance, None, self.migration, migrate_data) @@ -15112,3 +15111,225 @@ class ComputeManagerBDMUpdateTestCase(test.TestCase): mock_bdm_destroy.assert_called_once() self.instance.get_bdms.assert_called_once() + + +class ThreadingEventWithResultTestCase(test.NoDBTestCase): + """Test case for ThreadingEventWithResult class.""" + + def test_init(self): + event = manager.ThreadingEventWithResult() + self.assertEqual( + event._result, + manager.ThreadingEventWithResult.UNSET_SENTINEL) + self.assertFalse(event.is_set()) + + def test_set_with_result(self): + event = manager.ThreadingEventWithResult() + result = "test_result" + event.set(result) + self.assertEqual(result, event._result) + self.assertTrue(event.is_set()) + + def test_set_without_result(self): + event = manager.ThreadingEventWithResult() + event.set() + self.assertIsNone(event._result) + self.assertTrue(event.is_set()) + + def test_wait_with_result(self): + event = manager.ThreadingEventWithResult() + result = "test_result" + event.set(result) + self.assertEqual(result, event.wait()) + + def test_wait_timeout(self): + event = manager.ThreadingEventWithResult() + self.assertEqual(event.wait(timeout=0.01), + manager.ThreadingEventWithResult.FAILED_SENTINEL) + + def test_change_result_raises_value_error(self): + event = manager.ThreadingEventWithResult() + event.set("original_result") + with self.assertRaisesRegex(ValueError, + 'Cannot change the result once it is set'): + event.set("new_result") + with self.assertRaisesRegex(ValueError, + 'Cannot change the result once it is set'): + event.set(None) + + def test_set_same_result_again(self): + event = manager.ThreadingEventWithResult() + result = "test_result" + event.set(result) + # Setting the same result again should not raise an error + event.set(result) + self.assertEqual(result, event._result) + self.assertTrue(event.is_set()) + + def test_multiple_wait_calls(self): + event = manager.ThreadingEventWithResult() + result = "test_result" + event.set(result) + self.assertEqual(result, event.wait()) + self.assertEqual(result, event.wait()) + self.assertEqual(result, event.wait(timeout=0.01)) + + def test_wait_in_another_thread(self): + event = manager.ThreadingEventWithResult() + result_from_thread = [] + expected_result = "data from main thread" + + def _wait_for_event(): + res = event.wait() + result_from_thread.append(res) + + waiter_thread = threading.Thread(target=_wait_for_event) + waiter_thread.start() + + # Give the waiter thread a moment to start and block on wait() + time.sleep(0.05) + self.assertTrue(waiter_thread.is_alive()) + self.assertFalse(event.is_set()) + + # Set the event from the main thread, which should unblock the waiter + event.set(expected_result) + + # The waiter thread should finish promptly + waiter_thread.join(timeout=1) + self.assertFalse( + waiter_thread.is_alive(), + "Waiter thread should have finished.") + + # Verify the result was received correctly + self.assertEqual(len(result_from_thread), 1) + self.assertEqual(result_from_thread[0], expected_result) + self.assertEqual(event._result, expected_result) + + def test_set_race_condition(self): + event = manager.ThreadingEventWithResult() + outcomes = [] + num_threads = 20 + # A barrier synchronizes threads, making a race condition more likely + barrier = threading.Barrier(num_threads) + + def _set_in_thread(result_to_set): + try: + # All threads wait here until all are ready + barrier.wait() + event.set(result_to_set) + outcomes.append("success") + except ValueError: + outcomes.append("failure") + except Exception as e: + outcomes.append(e) + + threads = [] + possible_results = [ + f"result_from_thread_{i}" for i in range(num_threads)] + + for i in range(num_threads): + thread = threading.Thread( + target=_set_in_thread, + args=(possible_results[i],)) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # 1. Verify that the event was ultimately set + self.assertTrue(event.is_set()) + + # 2. Verify its result is one of the candidates + self.assertIn(event._result, possible_results) + + # 3. Verify that exactly ONE thread succeeded and the rest failed + success_count = outcomes.count("success") + failure_count = outcomes.count("failure") + + self.assertEqual( + success_count, + 1, f"Expected 1 success, but got {success_count}.") + self.assertEqual( + failure_count, + num_threads - 1, f"Expected {num_threads - 1} failures.") + + +class TestWaitForInstanceEvents(test.NoDBTestCase): + def setUp(self): + super().setUp() + self.instance = mock.Mock() + self.event1 = mock.Mock() + self.event2 = mock.Mock() + self.error_callback = mock.Mock() + self.timeout = 1 + + def test_all_events_completed(self): + self.event1.is_received_early.return_value = False + self.event1.wait.return_value = mock.Mock(status='completed') + self.event2.is_received_early.return_value = False + self.event2.wait.return_value = mock.Mock(status='completed') + events = {'event1': self.event1, 'event2': self.event2} + manager.ComputeVirtAPI._wait_for_instance_events( + self.instance, events, self.error_callback, self.timeout) + self.event1.wait.assert_called() + self.event2.wait.assert_called() + self.error_callback.assert_not_called() + + def test_event_received_early(self): + self.event1.is_received_early.return_value = True + self.event2.is_received_early.return_value = False + self.event2.wait.return_value = mock.Mock(status='completed') + events = {'event1': self.event1, 'event2': self.event2} + manager.ComputeVirtAPI._wait_for_instance_events( + self.instance, events, self.error_callback, self.timeout) + self.event1.wait.assert_not_called() + self.event2.wait.assert_called() + self.error_callback.assert_not_called() + + def test_event_timeout(self): + self.event1.is_received_early.return_value = False + self.event1.wait.side_effect = exception.InstanceEventTimeout + events = {'event1': self.event1} + with self.assertRaisesRegex(exception.InstanceEventTimeout, ""): + manager.ComputeVirtAPI._wait_for_instance_events( + self.instance, events, self.error_callback, self.timeout) + + def test_event_wait_returns_failed_sentinel(self): + self.event1.is_received_early.return_value = False + self.event1.wait.return_value = mock.Mock(status='completed') + + events = {'event1': self.event1} + x = 100 + with mock.patch('time.monotonic', side_effect=[x, x + 1]): + with self.assertRaisesRegex(exception.InstanceEventTimeout, ""): + manager.ComputeVirtAPI._wait_for_instance_events( + self.instance, events, self.error_callback, self.timeout) + + self.event1.wait.assert_not_called() + + def test_multiple_events_some_early_some_completed(self): + self.event1.is_received_early.return_value = True + self.event2.is_received_early.return_value = False + self.event2.wait.return_value = mock.Mock(status='completed') + self.event3 = mock.Mock() + self.event3.is_received_early.return_value = False + self.event3.wait.return_value = mock.Mock(status='completed') + events = { + 'event1': self.event1, + 'event2': self.event2, + 'event3': self.event3 + } + manager.ComputeVirtAPI._wait_for_instance_events( + self.instance, events, self.error_callback, self.timeout) + self.event1.wait.assert_not_called() + self.event2.wait.assert_called() + self.event3.wait.assert_called() + self.error_callback.assert_not_called() + + def test_no_events(self): + events = {} + manager.ComputeVirtAPI._wait_for_instance_events( + self.instance, events, self.error_callback, self.timeout) + self.error_callback.assert_not_called() diff --git a/nova/tests/unit/compute/test_shelve.py b/nova/tests/unit/compute/test_shelve.py index 42fef1f7c5..4b39f646e1 100644 --- a/nova/tests/unit/compute/test_shelve.py +++ b/nova/tests/unit/compute/test_shelve.py @@ -12,7 +12,6 @@ from unittest import mock -import eventlet from oslo_utils import fixture as utils_fixture from oslo_utils.fixture import uuidsentinel as uuids from oslo_utils import timeutils @@ -448,9 +447,9 @@ class ShelveComputeManagerTestCase(test_compute.BaseTestCase): mock_bdms, mock_nil): instance = self._create_fake_instance_obj() - mock_get_arqs.side_effect = eventlet.timeout.Timeout() + mock_get_arqs.side_effect = exception.InstanceEventTimeout() - self.assertRaises(eventlet.timeout.Timeout, + self.assertRaises(exception.InstanceEventTimeout, self.test_unshelve, accel_uuids=[uuids.fake], instance=instance) diff --git a/nova/tests/unit/compute/test_virtapi.py b/nova/tests/unit/compute/test_virtapi.py index 71c9097525..af254d541f 100644 --- a/nova/tests/unit/compute/test_virtapi.py +++ b/nova/tests/unit/compute/test_virtapi.py @@ -15,7 +15,6 @@ import collections from unittest import mock -import eventlet.timeout import os_traits from oslo_utils.fixture import uuidsentinel as uuids @@ -102,7 +101,7 @@ class FakeCompute(object): self, context, rp_uuid, traits, generation=None): self.provider_traits[rp_uuid] = traits - def _event_waiter(self): + def _event_waiter(self, *args, **kwargs): event = mock.MagicMock() event.status = 'completed' return event @@ -151,13 +150,13 @@ class ComputeVirtAPITest(VirtAPIBaseTest): for event in self.compute._events: self.assertEqual('instance', event.instance) self.assertIn((event.name, event.tag), events.keys()) - event.wait.assert_called_once_with() + event.wait.assert_called_once() mock_log.debug.assert_called_once_with( 'Instance event wait completed in %i seconds for %s', mock.ANY, 'event1,event2', instance=event.instance) def test_wait_for_instance_event_failed(self): - def _failer(): + def _failer(*args, **kwargs): event = mock.MagicMock() event.status = 'failed' return event @@ -171,7 +170,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest): self.assertRaises(exception.NovaException, do_test) def test_wait_for_instance_event_failed_callback(self): - def _failer(): + def _failer(*args, **kwargs): event = mock.MagicMock() event.status = 'failed' return event @@ -198,14 +197,17 @@ class ComputeVirtAPITest(VirtAPIBaseTest): mock_log = mock.Mock() @mock.patch.object(compute_manager, 'LOG', new=mock_log) - @mock.patch.object(self.virtapi._compute, '_event_waiter', - side_effect=eventlet.timeout.Timeout()) + @mock.patch.object( + self.virtapi._compute, + '_event_waiter', + return_value=compute_manager. + ThreadingEventWithResult.FAILED_SENTINEL) def do_test(mock_waiter): with self.virtapi.wait_for_instance_event( instance, [('foo', 'bar')]): pass - self.assertRaises(eventlet.timeout.Timeout, do_test) + self.assertRaises(exception.InstanceEventTimeout, do_test) mock_log.warning.assert_called_once_with( 'Timeout waiting for %(events)s for instance with vm_state ' '%(vm_state)s and task_state %(task_state)s. ' @@ -238,7 +240,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest): event = mock.Mock(status="completed") return event else: - raise eventlet.timeout.Timeout() + return compute_manager.ThreadingEventWithResult.FAILED_SENTINEL @mock.patch.object(compute_manager, 'LOG', new=mock_log) @mock.patch.object(self.virtapi._compute, '_event_waiter', @@ -248,7 +250,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest): instance, [('foo', 'bar'), ('missing', 'event')]): pass - self.assertRaises(eventlet.timeout.Timeout, do_test) + self.assertRaises(exception.InstanceEventTimeout, do_test) mock_log.warning.assert_called_once_with( 'Timeout waiting for %(events)s for instance with vm_state ' '%(vm_state)s and task_state %(task_state)s. ' @@ -282,7 +284,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest): event = mock.Mock(status="completed") return event else: - raise eventlet.timeout.Timeout() + return compute_manager.ThreadingEventWithResult.FAILED_SENTINEL def fake_prepare_for_instance_event(instance, name, tag): m = mock.MagicMock() @@ -292,9 +294,9 @@ class ComputeVirtAPITest(VirtAPIBaseTest): m.event_name = '%s-%s' % (name, tag) m.wait.side_effect = fake_event_waiter if name == 'received-but-not-waited': - m.ready.return_value = True + m.is_set.return_value = True if name == 'missing-but-not-waited': - m.ready.return_value = False + m.is_set.return_value = False return m self.virtapi._compute.instance_events.prepare_for_instance_event.\ @@ -314,7 +316,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest): ): self.virtapi.exit_wait_early([('early', 'event')]) - self.assertRaises(eventlet.timeout.Timeout, do_test) + self.assertRaises(exception.InstanceEventTimeout, do_test) mock_log.warning.assert_called_once_with( 'Timeout waiting for %(events)s for instance with vm_state ' '%(vm_state)s and task_state %(task_state)s. ' @@ -353,7 +355,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest): self.assertEqual(2, len(self.compute._events)) for event in self.compute._events: if event.tag == 'bar': - event.wait.assert_called_once_with() + event.wait.assert_called_once() else: event.wait.assert_not_called() diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index 044c8fb013..04430688c5 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -423,9 +423,9 @@ def get_injection_info(network_info=None, admin_pass=None, files=None): def _concurrency(signal, wait, done, target, is_block_dev=False): - signal.send() + signal.set() wait.wait() - done.send() + done.set() class FakeVirtDomain(object): @@ -565,9 +565,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase): uuid = uuids.fake backend = imagebackend.Backend(False) - wait1 = eventlet.event.Event() - done1 = eventlet.event.Event() - sig1 = eventlet.event.Event() + wait1 = threading.Event() + done1 = threading.Event() + sig1 = threading.Event() thr1 = eventlet.spawn(backend.by_name(self._fake_instance(uuid), 'name').cache, _concurrency, 'fname', None, @@ -576,23 +576,23 @@ class CacheConcurrencyTestCase(test.NoDBTestCase): # Thread 1 should run before thread 2. sig1.wait() - wait2 = eventlet.event.Event() - done2 = eventlet.event.Event() - sig2 = eventlet.event.Event() + wait2 = threading.Event() + done2 = threading.Event() + sig2 = threading.Event() thr2 = eventlet.spawn(backend.by_name(self._fake_instance(uuid), 'name').cache, _concurrency, 'fname', None, signal=sig2, wait=wait2, done=done2) - wait2.send() + wait2.set() utils.cooperative_yield() try: - self.assertFalse(done2.ready()) + self.assertFalse(done2.is_set()) finally: - wait1.send() + wait1.set() done1.wait() utils.cooperative_yield() - self.assertTrue(done2.ready()) + self.assertTrue(done2.is_set()) # Wait on greenthreads to assert they didn't raise exceptions # during execution thr1.wait() @@ -603,9 +603,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase): uuid = uuids.fake backend = imagebackend.Backend(False) - wait1 = eventlet.event.Event() - done1 = eventlet.event.Event() - sig1 = eventlet.event.Event() + wait1 = threading.Event() + done1 = threading.Event() + sig1 = threading.Event() thr1 = eventlet.spawn(backend.by_name(self._fake_instance(uuid), 'name').cache, _concurrency, 'fname2', None, @@ -614,9 +614,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase): # Thread 1 should run before thread 2. sig1.wait() - wait2 = eventlet.event.Event() - done2 = eventlet.event.Event() - sig2 = eventlet.event.Event() + wait2 = threading.Event() + done2 = threading.Event() + sig2 = threading.Event() thr2 = eventlet.spawn(backend.by_name(self._fake_instance(uuid), 'name').cache, _concurrency, 'fname1', None, @@ -625,15 +625,15 @@ class CacheConcurrencyTestCase(test.NoDBTestCase): # Wait for thread 2 to start. sig2.wait() - wait2.send() + wait2.set() tries = 0 - while not done2.ready() and tries < 10: + while not done2.is_set() and tries < 10: utils.cooperative_yield() tries += 1 try: - self.assertTrue(done2.ready()) + self.assertTrue(done2.is_set()) finally: - wait1.send() + wait1.set() utils.cooperative_yield() # Wait on greenthreads to assert they didn't raise exceptions # during execution @@ -14158,7 +14158,7 @@ class LibvirtConnTestCase(test.NoDBTestCase, drvr.active_migrations[instance.uuid] = collections.deque() dom = fakelibvirt.Domain(drvr._get_connection(), "", True) guest = libvirt_guest.Guest(dom) - finish_event = eventlet.event.Event() + finish_event = threading.Event() def fake_job_info(): while True: @@ -14167,7 +14167,7 @@ class LibvirtConnTestCase(test.NoDBTestCase, if type(rec) is str: if rec == "thread-finish": - finish_event.send() + finish_event.set() elif rec == "domain-stop": dom.destroy() elif rec == "force_complete": @@ -14839,9 +14839,9 @@ class LibvirtConnTestCase(test.NoDBTestCase, mock_copy_disk_path.assert_called_once_with(self.context, instance, guest) - class AnyEventletEvent(object): + class AnyEvent(object): def __eq__(self, other): - return type(other) is eventlet.event.Event + return type(other) is threading.Event mock_thread.assert_called_once_with( drvr._live_migration_operation, @@ -14850,7 +14850,7 @@ class LibvirtConnTestCase(test.NoDBTestCase, mock_monitor.assert_called_once_with( self.context, instance, guest, "fakehost", fake_post, fake_recover, True, - migrate_data, AnyEventletEvent(), disks_to_copy[0]) + migrate_data, AnyEvent(), disks_to_copy[0]) def test_live_migration_main(self): self._test_live_migration_main() @@ -21062,10 +21062,10 @@ class LibvirtConnTestCase(test.NoDBTestCase, ): generated_events = [] - def wait_timeout(): + def wait_timeout(*args, **kwargs): event = mock.MagicMock() if neutron_failure == 'timeout': - raise eventlet.timeout.Timeout() + raise exception.InstanceEventTimeout() elif neutron_failure == 'error': event.status = 'failed' else: @@ -21122,7 +21122,7 @@ class LibvirtConnTestCase(test.NoDBTestCase, self.assertEqual(0, event.call_count) elif (neutron_failure == 'error' and not CONF.vif_plugging_is_fatal): - event.wait.assert_called_once_with() + event.wait.assert_called_once() else: self.assertEqual(0, prepare.call_count) diff --git a/nova/tests/unit/virt/libvirt/volume/test_mount.py b/nova/tests/unit/virt/libvirt/volume/test_mount.py index 5af0d21fe9..8d12b6f8a1 100644 --- a/nova/tests/unit/virt/libvirt/volume/test_mount.py +++ b/nova/tests/unit/virt/libvirt/volume/test_mount.py @@ -82,7 +82,7 @@ class ThreadController(object): # The last epoch we waited at self.last_epoch = 0 - self.start_event = eventlet.event.Event() + self.start_event = threading.Event() self.running = False self.complete = False @@ -107,7 +107,7 @@ class ThreadController(object): def _ensure_running(self): if not self.running: self.running = True - self.start_event.send() + self.start_event.set() def waitpoint(self, name): """Called by the test thread. Wait at a waitpoint called name""" diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 17e6b3fc76..de8e2a06d6 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -48,7 +48,6 @@ import uuid from castellan import key_manager from copy import deepcopy -import eventlet from eventlet import tpool from lxml import etree from os_brick import encryptors @@ -8252,7 +8251,7 @@ class LibvirtDriver(driver.ComputeDriver): context, xml, instance, pause=pause, power_on=power_on, post_xml_callback=post_xml_callback) - except eventlet.timeout.Timeout: + except exception.InstanceEventTimeout: # We did not receive all expected events from Neutron, a warning # has already been logged by wait_for_instance_event, but we need # to decide if the issue is fatal. @@ -11138,7 +11137,7 @@ class LibvirtDriver(driver.ComputeDriver): if info.type == libvirt.VIR_DOMAIN_JOB_NONE: # Either still running, or failed or completed, # lets untangle the mess - if not finish_event.ready(): + if not finish_event.is_set(): LOG.debug("Operation thread is still running", instance=instance) else: @@ -11325,14 +11324,14 @@ class LibvirtDriver(driver.ComputeDriver): migrate_data, guest, device_names) - finish_event = eventlet.event.Event() + finish_event = threading.Event() self.active_migrations[instance.uuid] = deque() def thread_finished(_): LOG.debug("Migration operation thread notification", instance=instance) - finish_event.send() + finish_event.set() future.add_done_callback(thread_finished) # Let eventlet schedule the new thread right away diff --git a/nova/virt/zvm/driver.py b/nova/virt/zvm/driver.py index 274967f0de..4803c18ef8 100644 --- a/nova/virt/zvm/driver.py +++ b/nova/virt/zvm/driver.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet import os import time @@ -289,7 +288,7 @@ class ZVMDriver(driver.ComputeDriver): instance, event, deadline=timeout, error_callback=self._neutron_failed_callback): self._setup_network(vm_name, os_distro, network_info, instance) - except eventlet.timeout.Timeout: + except exception.InstanceEventTimeout: LOG.warning("Timeout waiting for vif plugging callback.", instance=instance) if CONF.vif_plugging_is_fatal: