Merge "Replace eventlet.event.Event with threading.Event"
This commit is contained in:
+57
-33
@@ -40,8 +40,6 @@ import typing as ty
|
||||
|
||||
from cinderclient import exceptions as cinder_exception
|
||||
from cursive import exception as cursive_exception
|
||||
import eventlet.event
|
||||
import eventlet.timeout
|
||||
import futurist
|
||||
from keystoneauth1 import exceptions as keystone_exception
|
||||
from openstack import exceptions as sdk_exc
|
||||
@@ -239,9 +237,34 @@ def delete_image_on_error(function):
|
||||
return decorated_function
|
||||
|
||||
|
||||
class ThreadingEventWithResult(threading.Event):
|
||||
|
||||
UNSET_SENTINEL = object()
|
||||
FAILED_SENTINEL = object()
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._result = self.UNSET_SENTINEL
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def set(self, result=None):
|
||||
with self._lock:
|
||||
if super().is_set() and result != self._result:
|
||||
raise ValueError('Cannot change the result once it is set')
|
||||
self._result = result
|
||||
super().set()
|
||||
|
||||
def wait(self, timeout=None):
|
||||
succeeded = super().wait(timeout)
|
||||
if succeeded:
|
||||
return self._result
|
||||
else:
|
||||
return self.FAILED_SENTINEL
|
||||
|
||||
|
||||
# Each collection of events is a dict of eventlet Events keyed by a tuple of
|
||||
# event name and associated tag
|
||||
_InstanceEvents = ty.Dict[ty.Tuple[str, str], eventlet.event.Event]
|
||||
_InstanceEvents = ty.Dict[ty.Tuple[str, str], ThreadingEventWithResult]
|
||||
|
||||
|
||||
class InstanceEvents(object):
|
||||
@@ -257,12 +280,12 @@ class InstanceEvents(object):
|
||||
instance: 'objects.Instance',
|
||||
name: str,
|
||||
tag: str,
|
||||
) -> eventlet.event.Event:
|
||||
) -> ThreadingEventWithResult:
|
||||
"""Prepare to receive an event for an instance.
|
||||
|
||||
This will register an event for the given instance that we will
|
||||
wait on later. This should be called before initiating whatever
|
||||
action will trigger the event. The resulting eventlet.event.Event
|
||||
action will trigger the event. The resulting ThreadingEventWithResult
|
||||
object should be wait()'d on to ensure completion.
|
||||
|
||||
:param instance: the instance for which the event will be generated
|
||||
@@ -280,7 +303,7 @@ class InstanceEvents(object):
|
||||
|
||||
instance_events = self._events.setdefault(instance.uuid, {})
|
||||
return instance_events.setdefault((name, tag),
|
||||
eventlet.event.Event())
|
||||
ThreadingEventWithResult())
|
||||
LOG.debug('Preparing to wait for external event %(name)s-%(tag)s',
|
||||
{'name': name, 'tag': tag}, instance=instance)
|
||||
return _create_or_get_event()
|
||||
@@ -294,7 +317,7 @@ class InstanceEvents(object):
|
||||
:param instance: the instance for which the event was generated
|
||||
:param event: the nova.objects.external_event.InstanceExternalEvent
|
||||
that describes the event
|
||||
:returns: the eventlet.event.Event object on which the waiters
|
||||
:returns: the ThreadingEventWithResult object on which the waiters
|
||||
are blocked
|
||||
"""
|
||||
no_events_sentinel = object()
|
||||
@@ -344,7 +367,7 @@ class InstanceEvents(object):
|
||||
and return them (indexed by event name).
|
||||
|
||||
:param instance: the instance for which events should be purged
|
||||
:returns: a dictionary of {event_name: eventlet.event.Event}
|
||||
:returns: a dictionary of {event_name: ThreadingEventWithResult}
|
||||
"""
|
||||
@utils.synchronized(self._lock_name(instance))
|
||||
def _clear_events():
|
||||
@@ -378,7 +401,7 @@ class InstanceEvents(object):
|
||||
instance_uuid=instance_uuid,
|
||||
name=name, status='failed',
|
||||
tag=tag, data={})
|
||||
eventlet_event.send(event)
|
||||
eventlet_event.set(event)
|
||||
|
||||
|
||||
class ComputeVirtAPI(virtapi.VirtAPI):
|
||||
@@ -414,7 +437,7 @@ class ComputeVirtAPI(virtapi.VirtAPI):
|
||||
TIMED_OUT = "timed out"
|
||||
RECEIVED_NOT_PROCESSED = "received but not processed"
|
||||
|
||||
def __init__(self, name: str, event: eventlet.event.Event) -> None:
|
||||
def __init__(self, name: str, event: ThreadingEventWithResult) -> None:
|
||||
self.name = name
|
||||
self.event = event
|
||||
self.status = self.EXPECTED
|
||||
@@ -427,19 +450,17 @@ class ComputeVirtAPI(virtapi.VirtAPI):
|
||||
return self.status == self.RECEIVED_EARLY
|
||||
|
||||
def _update_status_no_wait(self):
|
||||
if self.status == self.EXPECTED and self.event.ready():
|
||||
if self.status == self.EXPECTED and self.event.is_set():
|
||||
self.status = self.RECEIVED_NOT_PROCESSED
|
||||
|
||||
def wait(self) -> 'objects.InstanceExternalEvent':
|
||||
def wait(self, timeout) -> 'objects.InstanceExternalEvent':
|
||||
self.status = self.WAITING
|
||||
try:
|
||||
with timeutils.StopWatch() as sw:
|
||||
instance_event = self.event.wait()
|
||||
except eventlet.timeout.Timeout:
|
||||
with timeutils.StopWatch() as sw:
|
||||
instance_event = self.event.wait(timeout)
|
||||
if instance_event is ThreadingEventWithResult.FAILED_SENTINEL:
|
||||
self.status = self.TIMED_OUT
|
||||
self.wait_time = sw.elapsed()
|
||||
|
||||
raise
|
||||
raise exception.InstanceEventTimeout()
|
||||
|
||||
self.status = self.RECEIVED
|
||||
self.wait_time = sw.elapsed()
|
||||
@@ -464,14 +485,18 @@ class ComputeVirtAPI(virtapi.VirtAPI):
|
||||
instance: 'objects.Instance',
|
||||
events: dict,
|
||||
error_callback: ty.Callable,
|
||||
timeout: int,
|
||||
) -> None:
|
||||
deadline = time.monotonic() + timeout
|
||||
for event_name, event in events.items():
|
||||
if event.is_received_early():
|
||||
continue
|
||||
else:
|
||||
actual_event = event.wait()
|
||||
if actual_event.status == 'completed':
|
||||
continue
|
||||
remaining_time = deadline - time.monotonic()
|
||||
if remaining_time <= 0:
|
||||
raise exception.InstanceEventTimeout()
|
||||
actual_event = event.wait(timeout=remaining_time)
|
||||
if actual_event.status == 'completed':
|
||||
continue
|
||||
# If we get here, we have an event that was not completed,
|
||||
# nor skipped via exit_wait_early(). Decide whether to
|
||||
# keep waiting by calling the error_callback() hook.
|
||||
@@ -488,12 +513,12 @@ class ComputeVirtAPI(virtapi.VirtAPI):
|
||||
provided event_names, yield, and then wait for all the scheduled
|
||||
events to complete.
|
||||
|
||||
Note that this uses an eventlet.timeout.Timeout to bound the
|
||||
Note that this uses an InstanceEventTimeout to bound the
|
||||
operation, so callers should be prepared to catch that
|
||||
failure and handle that situation appropriately.
|
||||
|
||||
If the event is not received by the specified timeout deadline,
|
||||
eventlet.timeout.Timeout is raised.
|
||||
InstanceEventTimeout is raised.
|
||||
|
||||
If the event is received but did not have a 'completed'
|
||||
status, a NovaException is raised. If an error_callback is
|
||||
@@ -556,10 +581,9 @@ class ComputeVirtAPI(virtapi.VirtAPI):
|
||||
sw = timeutils.StopWatch()
|
||||
sw.start()
|
||||
try:
|
||||
with eventlet.timeout.Timeout(deadline):
|
||||
self._wait_for_instance_events(
|
||||
instance, events, error_callback)
|
||||
except eventlet.timeout.Timeout:
|
||||
self._wait_for_instance_events(
|
||||
instance, events, error_callback, timeout=deadline)
|
||||
except exception.InstanceEventTimeout:
|
||||
LOG.warning(
|
||||
'Timeout waiting for %(events)s for instance with '
|
||||
'vm_state %(vm_state)s and task_state %(task_state)s. '
|
||||
@@ -2830,7 +2854,7 @@ class ComputeManager(manager.Manager):
|
||||
arqs, requested_networks)
|
||||
LOG.debug("ARQs for spec:%s, ARQs for network:%s",
|
||||
spec_arqs, network_arqs)
|
||||
except (Exception, eventlet.timeout.Timeout) as exc:
|
||||
except (Exception, exception.InstanceEventTimeout) as exc:
|
||||
LOG.exception(exc)
|
||||
# ARQs created for instance or ports.
|
||||
# The port binding isn't done yet.
|
||||
@@ -3724,7 +3748,7 @@ class ComputeManager(manager.Manager):
|
||||
try:
|
||||
accel_info = self._get_bound_arq_resources(
|
||||
context, instance, accel_uuids or [])
|
||||
except (Exception, eventlet.timeout.Timeout) as exc:
|
||||
except (Exception, exception.InstanceEventTimeout) as exc:
|
||||
LOG.exception(exc)
|
||||
self._build_resources_cleanup(instance, network_info)
|
||||
msg = _('Failure getting accelerator resources.')
|
||||
@@ -7681,7 +7705,7 @@ class ComputeManager(manager.Manager):
|
||||
try:
|
||||
accel_info = self._get_bound_arq_resources(
|
||||
context, instance, accel_uuids)
|
||||
except (Exception, eventlet.timeout.Timeout) as exc:
|
||||
except (Exception, exception.InstanceEventTimeout) as exc:
|
||||
LOG.exception('Failure getting accelerator requests '
|
||||
'with the exception: %s', exc,
|
||||
instance=instance)
|
||||
@@ -9542,7 +9566,7 @@ class ComputeManager(manager.Manager):
|
||||
self._cleanup_pre_live_migration(
|
||||
context, dest, instance, migration, migrate_data,
|
||||
source_bdms)
|
||||
except eventlet.timeout.Timeout:
|
||||
except exception.InstanceEventTimeout:
|
||||
# We only get here if wait_for_vif_plugged is True which means
|
||||
# live_migration_wait_for_vif_plug=True on the destination host.
|
||||
msg = (
|
||||
@@ -11535,7 +11559,7 @@ class ComputeManager(manager.Manager):
|
||||
if _event:
|
||||
LOG.debug('Processing event %(event)s',
|
||||
{'event': event.key}, instance=instance)
|
||||
_event.send(event)
|
||||
_event.set(event)
|
||||
else:
|
||||
# If it's a network-vif-unplugged event and the instance is being
|
||||
# deleted or live migrated then we don't need to make this a
|
||||
|
||||
@@ -2661,3 +2661,8 @@ class EphemeralEncryptionCleanupFailed(NovaException):
|
||||
|
||||
class HostConflict(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InstanceEventTimeout(Exception):
|
||||
"""A custom timeout exception to replace eventlet.timeout.Timeout."""
|
||||
pass
|
||||
|
||||
@@ -16,14 +16,13 @@ import contextlib
|
||||
import copy
|
||||
import datetime
|
||||
import fixtures as std_fixtures
|
||||
import threading
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from cinderclient import exceptions as cinder_exception
|
||||
from cursive import exception as cursive_exception
|
||||
import ddt
|
||||
from eventlet import event as eventlet_event
|
||||
from eventlet import timeout as eventlet_timeout
|
||||
from keystoneauth1 import exceptions as keystone_exception
|
||||
import netaddr
|
||||
from openstack import exceptions as sdk_exc
|
||||
@@ -5491,12 +5490,12 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
||||
result,
|
||||
self.compute.instance_events._events[uuids.instance]
|
||||
[('test-event', None)])
|
||||
self.assertTrue(hasattr(result, 'send'))
|
||||
self.assertTrue(hasattr(result, 'set'))
|
||||
lock_name_mock.assert_called_once_with(inst_obj)
|
||||
|
||||
@mock.patch('nova.compute.manager.InstanceEvents._lock_name')
|
||||
def test_pop_instance_event(self, lock_name_mock):
|
||||
event = eventlet_event.Event()
|
||||
event = manager.ThreadingEventWithResult()
|
||||
self.compute.instance_events._events = {
|
||||
uuids.instance: {
|
||||
('network-vif-plugged', None): event,
|
||||
@@ -5512,7 +5511,7 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
||||
|
||||
@mock.patch('nova.compute.manager.InstanceEvents._lock_name')
|
||||
def test_clear_events_for_instance(self, lock_name_mock):
|
||||
event = eventlet_event.Event()
|
||||
event = manager.ThreadingEventWithResult()
|
||||
self.compute.instance_events._events = {
|
||||
uuids.instance: {
|
||||
('test-event', None): event,
|
||||
@@ -5544,10 +5543,10 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
||||
result,
|
||||
self.compute.instance_events._events[uuids.instance]
|
||||
[('test-event', None)])
|
||||
self.assertTrue(hasattr(result, 'send'))
|
||||
self.assertTrue(hasattr(result, 'set'))
|
||||
|
||||
def test_process_instance_event(self):
|
||||
event = eventlet_event.Event()
|
||||
event = manager.ThreadingEventWithResult()
|
||||
self.compute.instance_events._events = {
|
||||
uuids.instance: {
|
||||
('network-vif-plugged', None): event,
|
||||
@@ -5557,7 +5556,7 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
||||
event_obj = objects.InstanceExternalEvent(name='network-vif-plugged',
|
||||
tag=None)
|
||||
self.compute._process_instance_event(inst_obj, event_obj)
|
||||
self.assertTrue(event.ready())
|
||||
self.assertTrue(event.is_set())
|
||||
self.assertEqual(event_obj, event.wait())
|
||||
self.assertEqual({}, self.compute.instance_events._events)
|
||||
|
||||
@@ -6033,8 +6032,8 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
||||
self.compute.instance_events.cancel_all_events()
|
||||
# call it again to make sure we handle that gracefully
|
||||
self.compute.instance_events.cancel_all_events()
|
||||
self.assertTrue(fake_eventlet_event.send.called)
|
||||
event = fake_eventlet_event.send.call_args_list[0][0][0]
|
||||
self.assertTrue(fake_eventlet_event.set.called)
|
||||
event = fake_eventlet_event.set.call_args_list[0][0][0]
|
||||
self.assertEqual('network-vif-plugged', event.name)
|
||||
self.assertEqual(uuids.portid, event.tag)
|
||||
self.assertEqual('failed', event.status)
|
||||
@@ -8607,9 +8606,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
|
||||
arq_uuids = [arq['uuid'] for arq in arq_list]
|
||||
|
||||
mock_get_arqs.return_value = arq_list
|
||||
mock_wait_inst_ev.side_effect = eventlet_timeout.Timeout
|
||||
mock_wait_inst_ev.side_effect = exception.InstanceEventTimeout
|
||||
|
||||
self.assertRaises(eventlet_timeout.Timeout,
|
||||
self.assertRaises(exception.InstanceEventTimeout,
|
||||
self.compute._get_bound_arq_resources,
|
||||
self.context, self.instance, arq_uuids)
|
||||
|
||||
@@ -12032,7 +12031,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
|
||||
self.compute.virtapi,
|
||||
'wait_for_instance_event') as wait_for_event:
|
||||
wait_for_event.return_value.__enter__.side_effect = (
|
||||
eventlet_timeout.Timeout())
|
||||
exception.InstanceEventTimeout())
|
||||
ex = self.assertRaises(
|
||||
exception.MigrationError, self.compute._do_live_migration,
|
||||
self.context, 'dest-host', self.instance, None,
|
||||
@@ -12070,7 +12069,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
|
||||
self.compute.virtapi,
|
||||
'wait_for_instance_event') as wait_for_event:
|
||||
wait_for_event.return_value.__enter__.side_effect = (
|
||||
eventlet_timeout.Timeout())
|
||||
exception.InstanceEventTimeout())
|
||||
self.compute._do_live_migration(
|
||||
self.context, 'dest-host', self.instance, None,
|
||||
self.migration, migrate_data)
|
||||
@@ -15112,3 +15111,225 @@ class ComputeManagerBDMUpdateTestCase(test.TestCase):
|
||||
|
||||
mock_bdm_destroy.assert_called_once()
|
||||
self.instance.get_bdms.assert_called_once()
|
||||
|
||||
|
||||
class ThreadingEventWithResultTestCase(test.NoDBTestCase):
|
||||
"""Test case for ThreadingEventWithResult class."""
|
||||
|
||||
def test_init(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
self.assertEqual(
|
||||
event._result,
|
||||
manager.ThreadingEventWithResult.UNSET_SENTINEL)
|
||||
self.assertFalse(event.is_set())
|
||||
|
||||
def test_set_with_result(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
result = "test_result"
|
||||
event.set(result)
|
||||
self.assertEqual(result, event._result)
|
||||
self.assertTrue(event.is_set())
|
||||
|
||||
def test_set_without_result(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
event.set()
|
||||
self.assertIsNone(event._result)
|
||||
self.assertTrue(event.is_set())
|
||||
|
||||
def test_wait_with_result(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
result = "test_result"
|
||||
event.set(result)
|
||||
self.assertEqual(result, event.wait())
|
||||
|
||||
def test_wait_timeout(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
self.assertEqual(event.wait(timeout=0.01),
|
||||
manager.ThreadingEventWithResult.FAILED_SENTINEL)
|
||||
|
||||
def test_change_result_raises_value_error(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
event.set("original_result")
|
||||
with self.assertRaisesRegex(ValueError,
|
||||
'Cannot change the result once it is set'):
|
||||
event.set("new_result")
|
||||
with self.assertRaisesRegex(ValueError,
|
||||
'Cannot change the result once it is set'):
|
||||
event.set(None)
|
||||
|
||||
def test_set_same_result_again(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
result = "test_result"
|
||||
event.set(result)
|
||||
# Setting the same result again should not raise an error
|
||||
event.set(result)
|
||||
self.assertEqual(result, event._result)
|
||||
self.assertTrue(event.is_set())
|
||||
|
||||
def test_multiple_wait_calls(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
result = "test_result"
|
||||
event.set(result)
|
||||
self.assertEqual(result, event.wait())
|
||||
self.assertEqual(result, event.wait())
|
||||
self.assertEqual(result, event.wait(timeout=0.01))
|
||||
|
||||
def test_wait_in_another_thread(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
result_from_thread = []
|
||||
expected_result = "data from main thread"
|
||||
|
||||
def _wait_for_event():
|
||||
res = event.wait()
|
||||
result_from_thread.append(res)
|
||||
|
||||
waiter_thread = threading.Thread(target=_wait_for_event)
|
||||
waiter_thread.start()
|
||||
|
||||
# Give the waiter thread a moment to start and block on wait()
|
||||
time.sleep(0.05)
|
||||
self.assertTrue(waiter_thread.is_alive())
|
||||
self.assertFalse(event.is_set())
|
||||
|
||||
# Set the event from the main thread, which should unblock the waiter
|
||||
event.set(expected_result)
|
||||
|
||||
# The waiter thread should finish promptly
|
||||
waiter_thread.join(timeout=1)
|
||||
self.assertFalse(
|
||||
waiter_thread.is_alive(),
|
||||
"Waiter thread should have finished.")
|
||||
|
||||
# Verify the result was received correctly
|
||||
self.assertEqual(len(result_from_thread), 1)
|
||||
self.assertEqual(result_from_thread[0], expected_result)
|
||||
self.assertEqual(event._result, expected_result)
|
||||
|
||||
def test_set_race_condition(self):
|
||||
event = manager.ThreadingEventWithResult()
|
||||
outcomes = []
|
||||
num_threads = 20
|
||||
# A barrier synchronizes threads, making a race condition more likely
|
||||
barrier = threading.Barrier(num_threads)
|
||||
|
||||
def _set_in_thread(result_to_set):
|
||||
try:
|
||||
# All threads wait here until all are ready
|
||||
barrier.wait()
|
||||
event.set(result_to_set)
|
||||
outcomes.append("success")
|
||||
except ValueError:
|
||||
outcomes.append("failure")
|
||||
except Exception as e:
|
||||
outcomes.append(e)
|
||||
|
||||
threads = []
|
||||
possible_results = [
|
||||
f"result_from_thread_{i}" for i in range(num_threads)]
|
||||
|
||||
for i in range(num_threads):
|
||||
thread = threading.Thread(
|
||||
target=_set_in_thread,
|
||||
args=(possible_results[i],))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
# Wait for all threads to complete
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# 1. Verify that the event was ultimately set
|
||||
self.assertTrue(event.is_set())
|
||||
|
||||
# 2. Verify its result is one of the candidates
|
||||
self.assertIn(event._result, possible_results)
|
||||
|
||||
# 3. Verify that exactly ONE thread succeeded and the rest failed
|
||||
success_count = outcomes.count("success")
|
||||
failure_count = outcomes.count("failure")
|
||||
|
||||
self.assertEqual(
|
||||
success_count,
|
||||
1, f"Expected 1 success, but got {success_count}.")
|
||||
self.assertEqual(
|
||||
failure_count,
|
||||
num_threads - 1, f"Expected {num_threads - 1} failures.")
|
||||
|
||||
|
||||
class TestWaitForInstanceEvents(test.NoDBTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.instance = mock.Mock()
|
||||
self.event1 = mock.Mock()
|
||||
self.event2 = mock.Mock()
|
||||
self.error_callback = mock.Mock()
|
||||
self.timeout = 1
|
||||
|
||||
def test_all_events_completed(self):
|
||||
self.event1.is_received_early.return_value = False
|
||||
self.event1.wait.return_value = mock.Mock(status='completed')
|
||||
self.event2.is_received_early.return_value = False
|
||||
self.event2.wait.return_value = mock.Mock(status='completed')
|
||||
events = {'event1': self.event1, 'event2': self.event2}
|
||||
manager.ComputeVirtAPI._wait_for_instance_events(
|
||||
self.instance, events, self.error_callback, self.timeout)
|
||||
self.event1.wait.assert_called()
|
||||
self.event2.wait.assert_called()
|
||||
self.error_callback.assert_not_called()
|
||||
|
||||
def test_event_received_early(self):
|
||||
self.event1.is_received_early.return_value = True
|
||||
self.event2.is_received_early.return_value = False
|
||||
self.event2.wait.return_value = mock.Mock(status='completed')
|
||||
events = {'event1': self.event1, 'event2': self.event2}
|
||||
manager.ComputeVirtAPI._wait_for_instance_events(
|
||||
self.instance, events, self.error_callback, self.timeout)
|
||||
self.event1.wait.assert_not_called()
|
||||
self.event2.wait.assert_called()
|
||||
self.error_callback.assert_not_called()
|
||||
|
||||
def test_event_timeout(self):
|
||||
self.event1.is_received_early.return_value = False
|
||||
self.event1.wait.side_effect = exception.InstanceEventTimeout
|
||||
events = {'event1': self.event1}
|
||||
with self.assertRaisesRegex(exception.InstanceEventTimeout, ""):
|
||||
manager.ComputeVirtAPI._wait_for_instance_events(
|
||||
self.instance, events, self.error_callback, self.timeout)
|
||||
|
||||
def test_event_wait_returns_failed_sentinel(self):
|
||||
self.event1.is_received_early.return_value = False
|
||||
self.event1.wait.return_value = mock.Mock(status='completed')
|
||||
|
||||
events = {'event1': self.event1}
|
||||
x = 100
|
||||
with mock.patch('time.monotonic', side_effect=[x, x + 1]):
|
||||
with self.assertRaisesRegex(exception.InstanceEventTimeout, ""):
|
||||
manager.ComputeVirtAPI._wait_for_instance_events(
|
||||
self.instance, events, self.error_callback, self.timeout)
|
||||
|
||||
self.event1.wait.assert_not_called()
|
||||
|
||||
def test_multiple_events_some_early_some_completed(self):
|
||||
self.event1.is_received_early.return_value = True
|
||||
self.event2.is_received_early.return_value = False
|
||||
self.event2.wait.return_value = mock.Mock(status='completed')
|
||||
self.event3 = mock.Mock()
|
||||
self.event3.is_received_early.return_value = False
|
||||
self.event3.wait.return_value = mock.Mock(status='completed')
|
||||
events = {
|
||||
'event1': self.event1,
|
||||
'event2': self.event2,
|
||||
'event3': self.event3
|
||||
}
|
||||
manager.ComputeVirtAPI._wait_for_instance_events(
|
||||
self.instance, events, self.error_callback, self.timeout)
|
||||
self.event1.wait.assert_not_called()
|
||||
self.event2.wait.assert_called()
|
||||
self.event3.wait.assert_called()
|
||||
self.error_callback.assert_not_called()
|
||||
|
||||
def test_no_events(self):
|
||||
events = {}
|
||||
manager.ComputeVirtAPI._wait_for_instance_events(
|
||||
self.instance, events, self.error_callback, self.timeout)
|
||||
self.error_callback.assert_not_called()
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import eventlet
|
||||
from oslo_utils import fixture as utils_fixture
|
||||
from oslo_utils.fixture import uuidsentinel as uuids
|
||||
from oslo_utils import timeutils
|
||||
@@ -448,9 +447,9 @@ class ShelveComputeManagerTestCase(test_compute.BaseTestCase):
|
||||
mock_bdms, mock_nil):
|
||||
instance = self._create_fake_instance_obj()
|
||||
|
||||
mock_get_arqs.side_effect = eventlet.timeout.Timeout()
|
||||
mock_get_arqs.side_effect = exception.InstanceEventTimeout()
|
||||
|
||||
self.assertRaises(eventlet.timeout.Timeout,
|
||||
self.assertRaises(exception.InstanceEventTimeout,
|
||||
self.test_unshelve,
|
||||
accel_uuids=[uuids.fake],
|
||||
instance=instance)
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
import collections
|
||||
from unittest import mock
|
||||
|
||||
import eventlet.timeout
|
||||
import os_traits
|
||||
from oslo_utils.fixture import uuidsentinel as uuids
|
||||
|
||||
@@ -102,7 +101,7 @@ class FakeCompute(object):
|
||||
self, context, rp_uuid, traits, generation=None):
|
||||
self.provider_traits[rp_uuid] = traits
|
||||
|
||||
def _event_waiter(self):
|
||||
def _event_waiter(self, *args, **kwargs):
|
||||
event = mock.MagicMock()
|
||||
event.status = 'completed'
|
||||
return event
|
||||
@@ -151,13 +150,13 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
for event in self.compute._events:
|
||||
self.assertEqual('instance', event.instance)
|
||||
self.assertIn((event.name, event.tag), events.keys())
|
||||
event.wait.assert_called_once_with()
|
||||
event.wait.assert_called_once()
|
||||
mock_log.debug.assert_called_once_with(
|
||||
'Instance event wait completed in %i seconds for %s',
|
||||
mock.ANY, 'event1,event2', instance=event.instance)
|
||||
|
||||
def test_wait_for_instance_event_failed(self):
|
||||
def _failer():
|
||||
def _failer(*args, **kwargs):
|
||||
event = mock.MagicMock()
|
||||
event.status = 'failed'
|
||||
return event
|
||||
@@ -171,7 +170,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
self.assertRaises(exception.NovaException, do_test)
|
||||
|
||||
def test_wait_for_instance_event_failed_callback(self):
|
||||
def _failer():
|
||||
def _failer(*args, **kwargs):
|
||||
event = mock.MagicMock()
|
||||
event.status = 'failed'
|
||||
return event
|
||||
@@ -198,14 +197,17 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
mock_log = mock.Mock()
|
||||
|
||||
@mock.patch.object(compute_manager, 'LOG', new=mock_log)
|
||||
@mock.patch.object(self.virtapi._compute, '_event_waiter',
|
||||
side_effect=eventlet.timeout.Timeout())
|
||||
@mock.patch.object(
|
||||
self.virtapi._compute,
|
||||
'_event_waiter',
|
||||
return_value=compute_manager.
|
||||
ThreadingEventWithResult.FAILED_SENTINEL)
|
||||
def do_test(mock_waiter):
|
||||
with self.virtapi.wait_for_instance_event(
|
||||
instance, [('foo', 'bar')]):
|
||||
pass
|
||||
|
||||
self.assertRaises(eventlet.timeout.Timeout, do_test)
|
||||
self.assertRaises(exception.InstanceEventTimeout, do_test)
|
||||
mock_log.warning.assert_called_once_with(
|
||||
'Timeout waiting for %(events)s for instance with vm_state '
|
||||
'%(vm_state)s and task_state %(task_state)s. '
|
||||
@@ -238,7 +240,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
event = mock.Mock(status="completed")
|
||||
return event
|
||||
else:
|
||||
raise eventlet.timeout.Timeout()
|
||||
return compute_manager.ThreadingEventWithResult.FAILED_SENTINEL
|
||||
|
||||
@mock.patch.object(compute_manager, 'LOG', new=mock_log)
|
||||
@mock.patch.object(self.virtapi._compute, '_event_waiter',
|
||||
@@ -248,7 +250,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
instance, [('foo', 'bar'), ('missing', 'event')]):
|
||||
pass
|
||||
|
||||
self.assertRaises(eventlet.timeout.Timeout, do_test)
|
||||
self.assertRaises(exception.InstanceEventTimeout, do_test)
|
||||
mock_log.warning.assert_called_once_with(
|
||||
'Timeout waiting for %(events)s for instance with vm_state '
|
||||
'%(vm_state)s and task_state %(task_state)s. '
|
||||
@@ -282,7 +284,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
event = mock.Mock(status="completed")
|
||||
return event
|
||||
else:
|
||||
raise eventlet.timeout.Timeout()
|
||||
return compute_manager.ThreadingEventWithResult.FAILED_SENTINEL
|
||||
|
||||
def fake_prepare_for_instance_event(instance, name, tag):
|
||||
m = mock.MagicMock()
|
||||
@@ -292,9 +294,9 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
m.event_name = '%s-%s' % (name, tag)
|
||||
m.wait.side_effect = fake_event_waiter
|
||||
if name == 'received-but-not-waited':
|
||||
m.ready.return_value = True
|
||||
m.is_set.return_value = True
|
||||
if name == 'missing-but-not-waited':
|
||||
m.ready.return_value = False
|
||||
m.is_set.return_value = False
|
||||
return m
|
||||
|
||||
self.virtapi._compute.instance_events.prepare_for_instance_event.\
|
||||
@@ -314,7 +316,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
):
|
||||
self.virtapi.exit_wait_early([('early', 'event')])
|
||||
|
||||
self.assertRaises(eventlet.timeout.Timeout, do_test)
|
||||
self.assertRaises(exception.InstanceEventTimeout, do_test)
|
||||
mock_log.warning.assert_called_once_with(
|
||||
'Timeout waiting for %(events)s for instance with vm_state '
|
||||
'%(vm_state)s and task_state %(task_state)s. '
|
||||
@@ -353,7 +355,7 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
self.assertEqual(2, len(self.compute._events))
|
||||
for event in self.compute._events:
|
||||
if event.tag == 'bar':
|
||||
event.wait.assert_called_once_with()
|
||||
event.wait.assert_called_once()
|
||||
else:
|
||||
event.wait.assert_not_called()
|
||||
|
||||
|
||||
@@ -423,9 +423,9 @@ def get_injection_info(network_info=None, admin_pass=None, files=None):
|
||||
|
||||
|
||||
def _concurrency(signal, wait, done, target, is_block_dev=False):
|
||||
signal.send()
|
||||
signal.set()
|
||||
wait.wait()
|
||||
done.send()
|
||||
done.set()
|
||||
|
||||
|
||||
class FakeVirtDomain(object):
|
||||
@@ -565,9 +565,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
|
||||
uuid = uuids.fake
|
||||
|
||||
backend = imagebackend.Backend(False)
|
||||
wait1 = eventlet.event.Event()
|
||||
done1 = eventlet.event.Event()
|
||||
sig1 = eventlet.event.Event()
|
||||
wait1 = threading.Event()
|
||||
done1 = threading.Event()
|
||||
sig1 = threading.Event()
|
||||
thr1 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
|
||||
'name').cache,
|
||||
_concurrency, 'fname', None,
|
||||
@@ -576,23 +576,23 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
|
||||
# Thread 1 should run before thread 2.
|
||||
sig1.wait()
|
||||
|
||||
wait2 = eventlet.event.Event()
|
||||
done2 = eventlet.event.Event()
|
||||
sig2 = eventlet.event.Event()
|
||||
wait2 = threading.Event()
|
||||
done2 = threading.Event()
|
||||
sig2 = threading.Event()
|
||||
thr2 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
|
||||
'name').cache,
|
||||
_concurrency, 'fname', None,
|
||||
signal=sig2, wait=wait2, done=done2)
|
||||
|
||||
wait2.send()
|
||||
wait2.set()
|
||||
utils.cooperative_yield()
|
||||
try:
|
||||
self.assertFalse(done2.ready())
|
||||
self.assertFalse(done2.is_set())
|
||||
finally:
|
||||
wait1.send()
|
||||
wait1.set()
|
||||
done1.wait()
|
||||
utils.cooperative_yield()
|
||||
self.assertTrue(done2.ready())
|
||||
self.assertTrue(done2.is_set())
|
||||
# Wait on greenthreads to assert they didn't raise exceptions
|
||||
# during execution
|
||||
thr1.wait()
|
||||
@@ -603,9 +603,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
|
||||
uuid = uuids.fake
|
||||
|
||||
backend = imagebackend.Backend(False)
|
||||
wait1 = eventlet.event.Event()
|
||||
done1 = eventlet.event.Event()
|
||||
sig1 = eventlet.event.Event()
|
||||
wait1 = threading.Event()
|
||||
done1 = threading.Event()
|
||||
sig1 = threading.Event()
|
||||
thr1 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
|
||||
'name').cache,
|
||||
_concurrency, 'fname2', None,
|
||||
@@ -614,9 +614,9 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
|
||||
# Thread 1 should run before thread 2.
|
||||
sig1.wait()
|
||||
|
||||
wait2 = eventlet.event.Event()
|
||||
done2 = eventlet.event.Event()
|
||||
sig2 = eventlet.event.Event()
|
||||
wait2 = threading.Event()
|
||||
done2 = threading.Event()
|
||||
sig2 = threading.Event()
|
||||
thr2 = eventlet.spawn(backend.by_name(self._fake_instance(uuid),
|
||||
'name').cache,
|
||||
_concurrency, 'fname1', None,
|
||||
@@ -625,15 +625,15 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
|
||||
# Wait for thread 2 to start.
|
||||
sig2.wait()
|
||||
|
||||
wait2.send()
|
||||
wait2.set()
|
||||
tries = 0
|
||||
while not done2.ready() and tries < 10:
|
||||
while not done2.is_set() and tries < 10:
|
||||
utils.cooperative_yield()
|
||||
tries += 1
|
||||
try:
|
||||
self.assertTrue(done2.ready())
|
||||
self.assertTrue(done2.is_set())
|
||||
finally:
|
||||
wait1.send()
|
||||
wait1.set()
|
||||
utils.cooperative_yield()
|
||||
# Wait on greenthreads to assert they didn't raise exceptions
|
||||
# during execution
|
||||
@@ -14274,7 +14274,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
|
||||
drvr.active_migrations[instance.uuid] = collections.deque()
|
||||
dom = fakelibvirt.Domain(drvr._get_connection(), "<domain/>", True)
|
||||
guest = libvirt_guest.Guest(dom)
|
||||
finish_event = eventlet.event.Event()
|
||||
finish_event = threading.Event()
|
||||
|
||||
def fake_job_info():
|
||||
while True:
|
||||
@@ -14283,7 +14283,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
|
||||
|
||||
if type(rec) is str:
|
||||
if rec == "thread-finish":
|
||||
finish_event.send()
|
||||
finish_event.set()
|
||||
elif rec == "domain-stop":
|
||||
dom.destroy()
|
||||
elif rec == "force_complete":
|
||||
@@ -14955,9 +14955,9 @@ class LibvirtConnTestCase(test.NoDBTestCase,
|
||||
mock_copy_disk_path.assert_called_once_with(self.context, instance,
|
||||
guest)
|
||||
|
||||
class AnyEventletEvent(object):
|
||||
class AnyEvent(object):
|
||||
def __eq__(self, other):
|
||||
return type(other) is eventlet.event.Event
|
||||
return type(other) is threading.Event
|
||||
|
||||
mock_thread.assert_called_once_with(
|
||||
drvr._live_migration_operation,
|
||||
@@ -14966,7 +14966,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
|
||||
mock_monitor.assert_called_once_with(
|
||||
self.context, instance, guest, "fakehost",
|
||||
fake_post, fake_recover, True,
|
||||
migrate_data, AnyEventletEvent(), disks_to_copy[0])
|
||||
migrate_data, AnyEvent(), disks_to_copy[0])
|
||||
|
||||
def test_live_migration_main(self):
|
||||
self._test_live_migration_main()
|
||||
@@ -21178,10 +21178,10 @@ class LibvirtConnTestCase(test.NoDBTestCase,
|
||||
):
|
||||
generated_events = []
|
||||
|
||||
def wait_timeout():
|
||||
def wait_timeout(*args, **kwargs):
|
||||
event = mock.MagicMock()
|
||||
if neutron_failure == 'timeout':
|
||||
raise eventlet.timeout.Timeout()
|
||||
raise exception.InstanceEventTimeout()
|
||||
elif neutron_failure == 'error':
|
||||
event.status = 'failed'
|
||||
else:
|
||||
@@ -21238,7 +21238,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
|
||||
self.assertEqual(0, event.call_count)
|
||||
elif (neutron_failure == 'error' and
|
||||
not CONF.vif_plugging_is_fatal):
|
||||
event.wait.assert_called_once_with()
|
||||
event.wait.assert_called_once()
|
||||
else:
|
||||
self.assertEqual(0, prepare.call_count)
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ class ThreadController(object):
|
||||
# The last epoch we waited at
|
||||
self.last_epoch = 0
|
||||
|
||||
self.start_event = eventlet.event.Event()
|
||||
self.start_event = threading.Event()
|
||||
self.running = False
|
||||
self.complete = False
|
||||
|
||||
@@ -107,7 +107,7 @@ class ThreadController(object):
|
||||
def _ensure_running(self):
|
||||
if not self.running:
|
||||
self.running = True
|
||||
self.start_event.send()
|
||||
self.start_event.set()
|
||||
|
||||
def waitpoint(self, name):
|
||||
"""Called by the test thread. Wait at a waitpoint called name"""
|
||||
|
||||
@@ -48,7 +48,6 @@ import uuid
|
||||
|
||||
from castellan import key_manager
|
||||
from copy import deepcopy
|
||||
import eventlet
|
||||
from eventlet import tpool
|
||||
from lxml import etree
|
||||
from os_brick import encryptors
|
||||
@@ -8273,7 +8272,7 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
context, xml, instance,
|
||||
pause=pause, power_on=power_on,
|
||||
post_xml_callback=post_xml_callback)
|
||||
except eventlet.timeout.Timeout:
|
||||
except exception.InstanceEventTimeout:
|
||||
# We did not receive all expected events from Neutron, a warning
|
||||
# has already been logged by wait_for_instance_event, but we need
|
||||
# to decide if the issue is fatal.
|
||||
@@ -11160,7 +11159,7 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
if info.type == libvirt.VIR_DOMAIN_JOB_NONE:
|
||||
# Either still running, or failed or completed,
|
||||
# lets untangle the mess
|
||||
if not finish_event.ready():
|
||||
if not finish_event.is_set():
|
||||
LOG.debug("Operation thread is still running",
|
||||
instance=instance)
|
||||
else:
|
||||
@@ -11347,14 +11346,14 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
migrate_data, guest,
|
||||
device_names)
|
||||
|
||||
finish_event = eventlet.event.Event()
|
||||
finish_event = threading.Event()
|
||||
self.active_migrations[instance.uuid] = deque()
|
||||
|
||||
def thread_finished(_):
|
||||
LOG.debug("Migration operation thread notification",
|
||||
instance=instance)
|
||||
finish_event.send()
|
||||
|
||||
finish_event.set()
|
||||
future.add_done_callback(thread_finished)
|
||||
|
||||
# Let eventlet schedule the new thread right away
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import os
|
||||
import time
|
||||
|
||||
@@ -289,7 +288,7 @@ class ZVMDriver(driver.ComputeDriver):
|
||||
instance, event, deadline=timeout,
|
||||
error_callback=self._neutron_failed_callback):
|
||||
self._setup_network(vm_name, os_distro, network_info, instance)
|
||||
except eventlet.timeout.Timeout:
|
||||
except exception.InstanceEventTimeout:
|
||||
LOG.warning("Timeout waiting for vif plugging callback.",
|
||||
instance=instance)
|
||||
if CONF.vif_plugging_is_fatal:
|
||||
|
||||
Reference in New Issue
Block a user