Replace eventlet.event.Event with threading.Event

As part of removing Eventlet from the codebase,
this patch replaces eventlet.event.Event with the standard
library's threading.Event.

To maintain the existing interface behavior,
a helper class ThreadingEventWithResult is introduced.
This class mimics Eventlet's Event by supporting result
passing and retrieval, which threading.Event does
not natively support.

The interface between eventlet.event.Event and
ThreadingEventWithResult maps as follows:
send(value) → set()
wait() → wait()
ready() → is_set()

Change-Id: I469ca9592a5c6d1f7ea1f54e4d34546224ce7ada
Signed-off-by: Kamil Sambor <kamil.sambor@gmail.com>
This commit is contained in:
Kamil Sambor
2025-05-05 12:46:43 +02:00
parent 940d85fc6b
commit 48ddc7c4d1
9 changed files with 353 additions and 104 deletions
+57 -33
View File
@@ -40,8 +40,6 @@ import typing as ty
from cinderclient import exceptions as cinder_exception
from cursive import exception as cursive_exception
import eventlet.event
import eventlet.timeout
import futurist
from keystoneauth1 import exceptions as keystone_exception
from openstack import exceptions as sdk_exc
@@ -239,9 +237,34 @@ def delete_image_on_error(function):
return decorated_function
class ThreadingEventWithResult(threading.Event):
UNSET_SENTINEL = object()
FAILED_SENTINEL = object()
def __init__(self):
super().__init__()
self._result = self.UNSET_SENTINEL
self._lock = threading.Lock()
def set(self, result=None):
with self._lock:
if super().is_set() and result != self._result:
raise ValueError('Cannot change the result once it is set')
self._result = result
super().set()
def wait(self, timeout=None):
succeeded = super().wait(timeout)
if succeeded:
return self._result
else:
return self.FAILED_SENTINEL
# Each collection of events is a dict of eventlet Events keyed by a tuple of
# event name and associated tag
_InstanceEvents = ty.Dict[ty.Tuple[str, str], eventlet.event.Event]
_InstanceEvents = ty.Dict[ty.Tuple[str, str], ThreadingEventWithResult]
class InstanceEvents(object):
@@ -257,12 +280,12 @@ class InstanceEvents(object):
instance: 'objects.Instance',
name: str,
tag: str,
) -> eventlet.event.Event:
) -> ThreadingEventWithResult:
"""Prepare to receive an event for an instance.
This will register an event for the given instance that we will
wait on later. This should be called before initiating whatever
action will trigger the event. The resulting eventlet.event.Event
action will trigger the event. The resulting ThreadingEventWithResult
object should be wait()'d on to ensure completion.
:param instance: the instance for which the event will be generated
@@ -280,7 +303,7 @@ class InstanceEvents(object):
instance_events = self._events.setdefault(instance.uuid, {})
return instance_events.setdefault((name, tag),
eventlet.event.Event())
ThreadingEventWithResult())
LOG.debug('Preparing to wait for external event %(name)s-%(tag)s',
{'name': name, 'tag': tag}, instance=instance)
return _create_or_get_event()
@@ -294,7 +317,7 @@ class InstanceEvents(object):
:param instance: the instance for which the event was generated
:param event: the nova.objects.external_event.InstanceExternalEvent
that describes the event
:returns: the eventlet.event.Event object on which the waiters
:returns: the ThreadingEventWithResult object on which the waiters
are blocked
"""
no_events_sentinel = object()
@@ -344,7 +367,7 @@ class InstanceEvents(object):
and return them (indexed by event name).
:param instance: the instance for which events should be purged
:returns: a dictionary of {event_name: eventlet.event.Event}
:returns: a dictionary of {event_name: ThreadingEventWithResult}
"""
@utils.synchronized(self._lock_name(instance))
def _clear_events():
@@ -378,7 +401,7 @@ class InstanceEvents(object):
instance_uuid=instance_uuid,
name=name, status='failed',
tag=tag, data={})
eventlet_event.send(event)
eventlet_event.set(event)
class ComputeVirtAPI(virtapi.VirtAPI):
@@ -414,7 +437,7 @@ class ComputeVirtAPI(virtapi.VirtAPI):
TIMED_OUT = "timed out"
RECEIVED_NOT_PROCESSED = "received but not processed"
def __init__(self, name: str, event: eventlet.event.Event) -> None:
def __init__(self, name: str, event: ThreadingEventWithResult) -> None:
self.name = name
self.event = event
self.status = self.EXPECTED
@@ -427,19 +450,17 @@ class ComputeVirtAPI(virtapi.VirtAPI):
return self.status == self.RECEIVED_EARLY
def _update_status_no_wait(self):
if self.status == self.EXPECTED and self.event.ready():
if self.status == self.EXPECTED and self.event.is_set():
self.status = self.RECEIVED_NOT_PROCESSED
def wait(self) -> 'objects.InstanceExternalEvent':
def wait(self, timeout) -> 'objects.InstanceExternalEvent':
self.status = self.WAITING
try:
with timeutils.StopWatch() as sw:
instance_event = self.event.wait()
except eventlet.timeout.Timeout:
with timeutils.StopWatch() as sw:
instance_event = self.event.wait(timeout)
if instance_event is ThreadingEventWithResult.FAILED_SENTINEL:
self.status = self.TIMED_OUT
self.wait_time = sw.elapsed()
raise
raise exception.InstanceEventTimeout()
self.status = self.RECEIVED
self.wait_time = sw.elapsed()
@@ -464,14 +485,18 @@ class ComputeVirtAPI(virtapi.VirtAPI):
instance: 'objects.Instance',
events: dict,
error_callback: ty.Callable,
timeout: int,
) -> None:
deadline = time.monotonic() + timeout
for event_name, event in events.items():
if event.is_received_early():
continue
else:
actual_event = event.wait()
if actual_event.status == 'completed':
continue
remaining_time = deadline - time.monotonic()
if remaining_time <= 0:
raise exception.InstanceEventTimeout()
actual_event = event.wait(timeout=remaining_time)
if actual_event.status == 'completed':
continue
# If we get here, we have an event that was not completed,
# nor skipped via exit_wait_early(). Decide whether to
# keep waiting by calling the error_callback() hook.
@@ -488,12 +513,12 @@ class ComputeVirtAPI(virtapi.VirtAPI):
provided event_names, yield, and then wait for all the scheduled
events to complete.
Note that this uses an eventlet.timeout.Timeout to bound the
Note that this uses an InstanceEventTimeout to bound the
operation, so callers should be prepared to catch that
failure and handle that situation appropriately.
If the event is not received by the specified timeout deadline,
eventlet.timeout.Timeout is raised.
InstanceEventTimeout is raised.
If the event is received but did not have a 'completed'
status, a NovaException is raised. If an error_callback is
@@ -556,10 +581,9 @@ class ComputeVirtAPI(virtapi.VirtAPI):
sw = timeutils.StopWatch()
sw.start()
try:
with eventlet.timeout.Timeout(deadline):
self._wait_for_instance_events(
instance, events, error_callback)
except eventlet.timeout.Timeout:
self._wait_for_instance_events(
instance, events, error_callback, timeout=deadline)
except exception.InstanceEventTimeout:
LOG.warning(
'Timeout waiting for %(events)s for instance with '
'vm_state %(vm_state)s and task_state %(task_state)s. '
@@ -2830,7 +2854,7 @@ class ComputeManager(manager.Manager):
arqs, requested_networks)
LOG.debug("ARQs for spec:%s, ARQs for network:%s",
spec_arqs, network_arqs)
except (Exception, eventlet.timeout.Timeout) as exc:
except (Exception, exception.InstanceEventTimeout) as exc:
LOG.exception(exc)
# ARQs created for instance or ports.
# The port binding isn't done yet.
@@ -3724,7 +3748,7 @@ class ComputeManager(manager.Manager):
try:
accel_info = self._get_bound_arq_resources(
context, instance, accel_uuids or [])
except (Exception, eventlet.timeout.Timeout) as exc:
except (Exception, exception.InstanceEventTimeout) as exc:
LOG.exception(exc)
self._build_resources_cleanup(instance, network_info)
msg = _('Failure getting accelerator resources.')
@@ -7681,7 +7705,7 @@ class ComputeManager(manager.Manager):
try:
accel_info = self._get_bound_arq_resources(
context, instance, accel_uuids)
except (Exception, eventlet.timeout.Timeout) as exc:
except (Exception, exception.InstanceEventTimeout) as exc:
LOG.exception('Failure getting accelerator requests '
'with the exception: %s', exc,
instance=instance)
@@ -9542,7 +9566,7 @@ class ComputeManager(manager.Manager):
self._cleanup_pre_live_migration(
context, dest, instance, migration, migrate_data,
source_bdms)
except eventlet.timeout.Timeout:
except exception.InstanceEventTimeout:
# We only get here if wait_for_vif_plugged is True which means
# live_migration_wait_for_vif_plug=True on the destination host.
msg = (
@@ -11535,7 +11559,7 @@ class ComputeManager(manager.Manager):
if _event:
LOG.debug('Processing event %(event)s',
{'event': event.key}, instance=instance)
_event.send(event)
_event.set(event)
else:
# If it's a network-vif-unplugged event and the instance is being
# deleted or live migrated then we don't need to make this a
+5
View File
@@ -2661,3 +2661,8 @@ class EphemeralEncryptionCleanupFailed(NovaException):
class HostConflict(Exception):
pass
class InstanceEventTimeout(Exception):
"""A custom timeout exception to replace eventlet.timeout.Timeout."""
pass
+235 -14
View File
@@ -16,14 +16,13 @@ import contextlib
import copy
import datetime
import fixtures as std_fixtures
import threading
import time
from unittest import mock
from cinderclient import exceptions as cinder_exception
from cursive import exception as cursive_exception
import ddt
from eventlet import event as eventlet_event
from eventlet import timeout as eventlet_timeout
from keystoneauth1 import exceptions as keystone_exception
import netaddr
from openstack import exceptions as sdk_exc
@@ -5491,12 +5490,12 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
result,
self.compute.instance_events._events[uuids.instance]
[('test-event', None)])
self.assertTrue(hasattr(result, 'send'))
self.assertTrue(hasattr(result, 'set'))
lock_name_mock.assert_called_once_with(inst_obj)
@mock.patch('nova.compute.manager.InstanceEvents._lock_name')
def test_pop_instance_event(self, lock_name_mock):
event = eventlet_event.Event()
event = manager.ThreadingEventWithResult()
self.compute.instance_events._events = {
uuids.instance: {
('network-vif-plugged', None): event,
@@ -5512,7 +5511,7 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
@mock.patch('nova.compute.manager.InstanceEvents._lock_name')
def test_clear_events_for_instance(self, lock_name_mock):
event = eventlet_event.Event()
event = manager.ThreadingEventWithResult()
self.compute.instance_events._events = {
uuids.instance: {
('test-event', None): event,
@@ -5544,10 +5543,10 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
result,
self.compute.instance_events._events[uuids.instance]
[('test-event', None)])
self.assertTrue(hasattr(result, 'send'))
self.assertTrue(hasattr(result, 'set'))
def test_process_instance_event(self):
event = eventlet_event.Event()
event = manager.ThreadingEventWithResult()
self.compute.instance_events._events = {
uuids.instance: {
('network-vif-plugged', None): event,
@@ -5557,7 +5556,7 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
event_obj = objects.InstanceExternalEvent(name='network-vif-plugged',
tag=None)
self.compute._process_instance_event(inst_obj, event_obj)
self.assertTrue(event.ready())
self.assertTrue(event.is_set())
self.assertEqual(event_obj, event.wait())
self.assertEqual({}, self.compute.instance_events._events)
@@ -6033,8 +6032,8 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
self.compute.instance_events.cancel_all_events()
# call it again to make sure we handle that gracefully
self.compute.instance_events.cancel_all_events()
self.assertTrue(fake_eventlet_event.send.called)
event = fake_eventlet_event.send.call_args_list[0][0][0]
self.assertTrue(fake_eventlet_event.set.called)
event = fake_eventlet_event.set.call_args_list[0][0][0]
self.assertEqual('network-vif-plugged', event.name)
self.assertEqual(uuids.portid, event.tag)
self.assertEqual('failed', event.status)
@@ -8607,9 +8606,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
arq_uuids = [arq['uuid'] for arq in arq_list]
mock_get_arqs.return_value = arq_list
mock_wait_inst_ev.side_effect = eventlet_timeout.Timeout
mock_wait_inst_ev.side_effect = exception.InstanceEventTimeout
self.assertRaises(eventlet_timeout.Timeout,
self.assertRaises(exception.InstanceEventTimeout,
self.compute._get_bound_arq_resources,
self.context, self.instance, arq_uuids)
@@ -12032,7 +12031,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
wait_for_event.return_value.__enter__.side_effect = (
eventlet_timeout.Timeout())
exception.InstanceEventTimeout())
ex = self.assertRaises(
exception.MigrationError, self.compute._do_live_migration,
self.context, 'dest-host', self.instance, None,
@@ -12070,7 +12069,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
wait_for_event.return_value.__enter__.side_effect = (
eventlet_timeout.Timeout())
exception.InstanceEventTimeout())
self.compute._do_live_migration(
self.context, 'dest-host', self.instance, None,
self.migration, migrate_data)
@@ -15112,3 +15111,225 @@ class ComputeManagerBDMUpdateTestCase(test.TestCase):
mock_bdm_destroy.assert_called_once()
self.instance.get_bdms.assert_called_once()
class ThreadingEventWithResultTestCase(test.NoDBTestCase):
"""Test case for ThreadingEventWithResult class."""
def test_init(self):
event = manager.ThreadingEventWithResult()
self.assertEqual(
event._result,
manager.ThreadingEventWithResult.UNSET_SENTINEL)
self.assertFalse(event.is_set())
def test_set_with_result(self):
event = manager.ThreadingEventWithResult()
result = "test_result"
event.set(result)
self.assertEqual(result, event._result)
self.assertTrue(event.is_set())
def test_set_without_result(self):
event = manager.ThreadingEventWithResult()
event.set()
self.assertIsNone(event._result)
self.assertTrue(event.is_set())
def test_wait_with_result(self):
event = manager.ThreadingEventWithResult()
result = "test_result"
event.set(result)
self.assertEqual(result, event.wait())
def test_wait_timeout(self):
event = manager.ThreadingEventWithResult()
self.assertEqual(event.wait(timeout=0.01),
manager.ThreadingEventWithResult.FAILED_SENTINEL)
def test_change_result_raises_value_error(self):
event = manager.ThreadingEventWithResult()
event.set("original_result")
with self.assertRaisesRegex(ValueError,
'Cannot change the result once it is set'):
event.set("new_result")
with self.assertRaisesRegex(ValueError,
'Cannot change the result once it is set'):
event.set(None)
def test_set_same_result_again(self):
event = manager.ThreadingEventWithResult()
result = "test_result"
event.set(result)
# Setting the same result again should not raise an error
event.set(result)
self.assertEqual(result, event._result)
self.assertTrue(event.is_set())
def test_multiple_wait_calls(self):
event = manager.ThreadingEventWithResult()
result = "test_result"
event.set(result)
self.assertEqual(result, event.wait())
self.assertEqual(result, event.wait())
self.assertEqual(result, event.wait(timeout=0.01))
def test_wait_in_another_thread(self):
event = manager.ThreadingEventWithResult()
result_from_thread = []
expected_result = "data from main thread"
def _wait_for_event():
res = event.wait()
result_from_thread.append(res)
waiter_thread = threading.Thread(target=_wait_for_event)
waiter_thread.start()
# Give the waiter thread a moment to start and block on wait()
time.sleep(0.05)
self.assertTrue(waiter_thread.is_alive())
self.assertFalse(event.is_set())
# Set the event from the main thread, which should unblock the waiter
event.set(expected_result)
# The waiter thread should finish promptly
waiter_thread.join(timeout=1)
self.assertFalse(
waiter_thread.is_alive(),
"Waiter thread should have finished.")
# Verify the result was received correctly
self.assertEqual(len(result_from_thread), 1)
self.assertEqual(result_from_thread[0], expected_result)
self.assertEqual(event._result, expected_result)
def test_set_race_condition(self):
event = manager.ThreadingEventWithResult()
outcomes = []
num_threads = 20
# A barrier synchronizes threads, making a race condition more likely
barrier = threading.Barrier(num_threads)
def _set_in_thread(result_to_set):
try:
# All threads wait here until all are ready
barrier.wait()
event.set(result_to_set)
outcomes.append("success")
except ValueError:
outcomes.append("failure")
except Exception as e:
outcomes.append(e)
threads = []
possible_results = [
f"result_from_thread_{i}" for i in range(num_threads)]
for i in range(num_threads):
thread = threading.Thread(
target=_set_in_thread,
args=(possible_results[i],))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# 1. Verify that the event was ultimately set
self.assertTrue(event.is_set())
# 2. Verify its result is one of the candidates
self.assertIn(event._result, possible_results)
# 3. Verify that exactly ONE thread succeeded and the rest failed
success_count = outcomes.count("success")
failure_count = outcomes.count("failure")
self.assertEqual(
success_count,
1, f"Expected 1 success, but got {success_count}.")
self.assertEqual(
failure_count,
num_threads - 1, f"Expected {num_threads - 1} failures.")
class TestWaitForInstanceEvents(test.NoDBTestCase):
def setUp(self):
super().setUp()
self.instance = mock.Mock()
self.event1 = mock.Mock()
self.event2 = mock.Mock()
self.error_callback = mock.Mock()
self.timeout = 1
def test_all_events_completed(self):
self.event1.is_received_early.return_value = False
self.event1.wait.return_value = mock.Mock(status='completed')
self.event2.is_received_early.return_value = False
self.event2.wait.return_value = mock.Mock(status='completed')
events = {'event1': self.event1, 'event2': self.event2}
manager.ComputeVirtAPI._wait_for_instance_events(
self.instance, events, self.error_callback, self.timeout)
self.event1.wait.assert_called()
self.event2.wait.assert_called()
self.error_callback.assert_not_called()
def test_event_received_early(self):
self.event1.is_received_early.return_value = True
self.event2.is_received_early.return_value = False
self.event2.wait.return_value = mock.Mock(status='completed')
events = {'event1': self.event1, 'event2': self.event2}
manager.ComputeVirtAPI._wait_for_instance_events(
self.instance, events, self.error_callback, self.timeout)
self.event1.wait.assert_not_called()
self.event2.wait.assert_called()
self.error_callback.assert_not_called()
def test_event_timeout(self):
self.event1.is_received_early.return_value = False
self.event1.wait.side_effect = exception.InstanceEventTimeout
events = {'event1': self.event1}
with self.assertRaisesRegex(exception.InstanceEventTimeout, ""):
manager.ComputeVirtAPI._wait_for_instance_events(
self.instance, events, self.error_callback, self.timeout)
def test_event_wait_returns_failed_sentinel(self):
self.event1.is_received_early.return_value = False
self.event1.wait.return_value = mock.Mock(status='completed')
events = {'event1': self.event1}
x = 100
with mock.patch('time.monotonic', side_effect=[x, x + 1]):
with self.assertRaisesRegex(exception.InstanceEventTimeout, ""):
manager.ComputeVirtAPI._wait_for_instance_events(
self.instance, events, self.error_callback, self.timeout)
self.event1.wait.assert_not_called()
def test_multiple_events_some_early_some_completed(self):
self.event1.is_received_early.return_value = True
self.event2.is_received_early.return_value = False
self.event2.wait.return_value = mock.Mock(status='completed')
self.event3 = mock.Mock()
self.event3.is_received_early.return_value = False
self.event3.wait.return_value = mock.Mock(status='completed')
events = {
'event1': self.event1,
'event2': self.event2,
'event3': self.event3
}
manager.ComputeVirtAPI._wait_for_instance_events(
self.instance, events, self.error_callback, self.timeout)
self.event1.wait.assert_not_called()
self.event2.wait.assert_called()
self.event3.wait.assert_called()
self.error_callback.assert_not_called()
def test_no_events(self):
events = {}
manager.ComputeVirtAPI._wait_for_instance_events(
self.instance, events, self.error_callback, self.timeout)
self.error_callback.assert_not_called()
+2 -3
View File
@@ -12,7 +12,6 @@
from unittest import mock
import eventlet
from oslo_utils import fixture as utils_fixture
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import timeutils
@@ -448,9 +447,9 @@ class ShelveComputeManagerTestCase(test_compute.BaseTestCase):
mock_bdms, mock_nil):
instance = self._create_fake_instance_obj()
mock_get_arqs.side_effect = eventlet.timeout.Timeout()
mock_get_arqs.side_effect = exception.InstanceEventTimeout()
self.assertRaises(eventlet.timeout.Timeout,
self.assertRaises(exception.InstanceEventTimeout,
self.test_unshelve,
accel_uuids=[uuids.fake],
instance=instance)
+17 -15
View File
@@ -15,7 +15,6 @@
import collections
from unittest import mock
import eventlet.timeout
import os_traits
from oslo_utils.fixture import uuidsentinel as uuids
@@ -102,7 +101,7 @@ class FakeCompute(object):
self, context, rp_uuid, traits, generation=None):
self.provider_traits[rp_uuid] = traits
def _event_waiter(self):
def _event_waiter(self, *args, **kwargs):
event = mock.MagicMock()
event.status = 'completed'
return event
@@ -151,13 +150,13 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
for event in self.compute._events:
self.assertEqual('instance', event.instance)
self.assertIn((event.name, event.tag), events.keys())
event.wait.assert_called_once_with()
event.wait.assert_called_once()
mock_log.debug.assert_called_once_with(
'Instance event wait completed in %i seconds for %s',
mock.ANY, 'event1,event2', instance=event.instance)
def test_wait_for_instance_event_failed(self):
def _failer():
def _failer(*args, **kwargs):
event = mock.MagicMock()
event.status = 'failed'
return event
@@ -171,7 +170,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
self.assertRaises(exception.NovaException, do_test)
def test_wait_for_instance_event_failed_callback(self):
def _failer():
def _failer(*args, **kwargs):
event = mock.MagicMock()
event.status = 'failed'
return event
@@ -198,14 +197,17 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
mock_log = mock.Mock()
@mock.patch.object(compute_manager, 'LOG', new=mock_log)
@mock.patch.object(self.virtapi._compute, '_event_waiter',
side_effect=eventlet.timeout.Timeout())
@mock.patch.object(
self.virtapi._compute,
'_event_waiter',
return_value=compute_manager.
ThreadingEventWithResult.FAILED_SENTINEL)
def do_test(mock_waiter):
with self.virtapi.wait_for_instance_event(
instance, [('foo', 'bar')]):
pass
self.assertRaises(eventlet.timeout.Timeout, do_test)
self.assertRaises(exception.InstanceEventTimeout, do_test)
mock_log.warning.assert_called_once_with(
'Timeout waiting for %(events)s for instance with vm_state '
'%(vm_state)s and task_state %(task_state)s. '
@@ -238,7 +240,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
event = mock.Mock(status="completed")
return event
else:
raise eventlet.timeout.Timeout()
return compute_manager.ThreadingEventWithResult.FAILED_SENTINEL
@mock.patch.object(compute_manager, 'LOG', new=mock_log)
@mock.patch.object(self.virtapi._compute, '_event_waiter',
@@ -248,7 +250,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
instance, [('foo', 'bar'), ('missing', 'event')]):
pass
self.assertRaises(eventlet.timeout.Timeout, do_test)
self.assertRaises(exception.InstanceEventTimeout, do_test)
mock_log.warning.assert_called_once_with(
'Timeout waiting for %(events)s for instance with vm_state '
'%(vm_state)s and task_state %(task_state)s. '
@@ -282,7 +284,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
event = mock.Mock(status="completed")
return event
else:
raise eventlet.timeout.Timeout()
return compute_manager.ThreadingEventWithResult.FAILED_SENTINEL
def fake_prepare_for_instance_event(instance, name, tag):
m = mock.MagicMock()
@@ -292,9 +294,9 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
m.event_name = '%s-%s' % (name, tag)
m.wait.side_effect = fake_event_waiter
if name == 'received-but-not-waited':
m.ready.return_value = True
m.is_set.return_value = True
if name == 'missing-but-not-waited':
m.ready.return_value = False
m.is_set.return_value = False
return m
self.virtapi._compute.instance_events.prepare_for_instance_event.\
@@ -314,7 +316,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
):
self.virtapi.exit_wait_early([('early', 'event')])
self.assertRaises(eventlet.timeout.Timeout, do_test)
self.assertRaises(exception.InstanceEventTimeout, do_test)
mock_log.warning.assert_called_once_with(
'Timeout waiting for %(events)s for instance with vm_state '
'%(vm_state)s and task_state %(task_state)s. '
@@ -353,7 +355,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
self.assertEqual(2, len(self.compute._events))
for event in self.compute._events:
if event.tag == 'bar':
event.wait.assert_called_once_with()
event.wait.assert_called_once()
else:
event.wait.assert_not_called()
+30 -30
View File
@@ -423,9 +423,9 @@ def get_injection_info(network_info=None, admin_pass=None, files=None):
def _concurrency(signal, wait, done, target, is_block_dev=False):
signal.send()
signal.set()
wait.wait()
done.send()
done.set()
class FakeVirtDomain(object):
@@ -565,9 +565,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
uuid = uuids.fake
backend = imagebackend.Backend(False)
wait1 = eventlet.event.Event()
done1 = eventlet.event.Event()
sig1 = eventlet.event.Event()
wait1 = threading.Event()
done1 = threading.Event()
sig1 = threading.Event()
thr1 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
'name').cache,
_concurrency, 'fname', None,
@@ -576,23 +576,23 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
# Thread 1 should run before thread 2.
sig1.wait()
wait2 = eventlet.event.Event()
done2 = eventlet.event.Event()
sig2 = eventlet.event.Event()
wait2 = threading.Event()
done2 = threading.Event()
sig2 = threading.Event()
thr2 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
'name').cache,
_concurrency, 'fname', None,
signal=sig2, wait=wait2, done=done2)
wait2.send()
wait2.set()
utils.cooperative_yield()
try:
self.assertFalse(done2.ready())
self.assertFalse(done2.is_set())
finally:
wait1.send()
wait1.set()
done1.wait()
utils.cooperative_yield()
self.assertTrue(done2.ready())
self.assertTrue(done2.is_set())
# Wait on greenthreads to assert they didn't raise exceptions
# during execution
thr1.wait()
@@ -603,9 +603,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
uuid = uuids.fake
backend = imagebackend.Backend(False)
wait1 = eventlet.event.Event()
done1 = eventlet.event.Event()
sig1 = eventlet.event.Event()
wait1 = threading.Event()
done1 = threading.Event()
sig1 = threading.Event()
thr1 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
'name').cache,
_concurrency, 'fname2', None,
@@ -614,9 +614,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
# Thread 1 should run before thread 2.
sig1.wait()
wait2 = eventlet.event.Event()
done2 = eventlet.event.Event()
sig2 = eventlet.event.Event()
wait2 = threading.Event()
done2 = threading.Event()
sig2 = threading.Event()
thr2 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
'name').cache,
_concurrency, 'fname1', None,
@@ -625,15 +625,15 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
# Wait for thread 2 to start.
sig2.wait()
wait2.send()
wait2.set()
tries = 0
while not done2.ready() and tries < 10:
while not done2.is_set() and tries < 10:
utils.cooperative_yield()
tries += 1
try:
self.assertTrue(done2.ready())
self.assertTrue(done2.is_set())
finally:
wait1.send()
wait1.set()
utils.cooperative_yield()
# Wait on greenthreads to assert they didn't raise exceptions
# during execution
@@ -14158,7 +14158,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
drvr.active_migrations[instance.uuid] = collections.deque()
dom = fakelibvirt.Domain(drvr._get_connection(), "<domain/>", True)
guest = libvirt_guest.Guest(dom)
finish_event = eventlet.event.Event()
finish_event = threading.Event()
def fake_job_info():
while True:
@@ -14167,7 +14167,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
if type(rec) is str:
if rec == "thread-finish":
finish_event.send()
finish_event.set()
elif rec == "domain-stop":
dom.destroy()
elif rec == "force_complete":
@@ -14839,9 +14839,9 @@ class LibvirtConnTestCase(test.NoDBTestCase,
mock_copy_disk_path.assert_called_once_with(self.context, instance,
guest)
class AnyEventletEvent(object):
class AnyEvent(object):
def __eq__(self, other):
return type(other) is eventlet.event.Event
return type(other) is threading.Event
mock_thread.assert_called_once_with(
drvr._live_migration_operation,
@@ -14850,7 +14850,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
mock_monitor.assert_called_once_with(
self.context, instance, guest, "fakehost",
fake_post, fake_recover, True,
migrate_data, AnyEventletEvent(), disks_to_copy[0])
migrate_data, AnyEvent(), disks_to_copy[0])
def test_live_migration_main(self):
self._test_live_migration_main()
@@ -21062,10 +21062,10 @@ class LibvirtConnTestCase(test.NoDBTestCase,
):
generated_events = []
def wait_timeout():
def wait_timeout(*args, **kwargs):
event = mock.MagicMock()
if neutron_failure == 'timeout':
raise eventlet.timeout.Timeout()
raise exception.InstanceEventTimeout()
elif neutron_failure == 'error':
event.status = 'failed'
else:
@@ -21122,7 +21122,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
self.assertEqual(0, event.call_count)
elif (neutron_failure == 'error' and
not CONF.vif_plugging_is_fatal):
event.wait.assert_called_once_with()
event.wait.assert_called_once()
else:
self.assertEqual(0, prepare.call_count)
@@ -82,7 +82,7 @@ class ThreadController(object):
# The last epoch we waited at
self.last_epoch = 0
self.start_event = eventlet.event.Event()
self.start_event = threading.Event()
self.running = False
self.complete = False
@@ -107,7 +107,7 @@ class ThreadController(object):
def _ensure_running(self):
if not self.running:
self.running = True
self.start_event.send()
self.start_event.set()
def waitpoint(self, name):
"""Called by the test thread. Wait at a waitpoint called name"""
+4 -5
View File
@@ -48,7 +48,6 @@ import uuid
from castellan import key_manager
from copy import deepcopy
import eventlet
from eventlet import tpool
from lxml import etree
from os_brick import encryptors
@@ -8252,7 +8251,7 @@ class LibvirtDriver(driver.ComputeDriver):
context, xml, instance,
pause=pause, power_on=power_on,
post_xml_callback=post_xml_callback)
except eventlet.timeout.Timeout:
except exception.InstanceEventTimeout:
# We did not receive all expected events from Neutron, a warning
# has already been logged by wait_for_instance_event, but we need
# to decide if the issue is fatal.
@@ -11138,7 +11137,7 @@ class LibvirtDriver(driver.ComputeDriver):
if info.type == libvirt.VIR_DOMAIN_JOB_NONE:
# Either still running, or failed or completed,
# lets untangle the mess
if not finish_event.ready():
if not finish_event.is_set():
LOG.debug("Operation thread is still running",
instance=instance)
else:
@@ -11325,14 +11324,14 @@ class LibvirtDriver(driver.ComputeDriver):
migrate_data, guest,
device_names)
finish_event = eventlet.event.Event()
finish_event = threading.Event()
self.active_migrations[instance.uuid] = deque()
def thread_finished(_):
LOG.debug("Migration operation thread notification",
instance=instance)
finish_event.send()
finish_event.set()
future.add_done_callback(thread_finished)
# Let eventlet schedule the new thread right away
+1 -2
View File
@@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import os
import time
@@ -289,7 +288,7 @@ class ZVMDriver(driver.ComputeDriver):
instance, event, deadline=timeout,
error_callback=self._neutron_failed_callback):
self._setup_network(vm_name, os_distro, network_info, instance)
except eventlet.timeout.Timeout:
except exception.InstanceEventTimeout:
LOG.warning("Timeout waiting for vif plugging callback.",
instance=instance)
if CONF.vif_plugging_is_fatal: