diff --git a/nova/tests/unit/test_utils.py b/nova/tests/unit/test_utils.py index a2d7f6d873..b086e2266a 100644 --- a/nova/tests/unit/test_utils.py +++ b/nova/tests/unit/test_utils.py @@ -13,6 +13,7 @@ # under the License. import datetime +import futurist import hashlib import os import os.path @@ -1989,3 +1990,505 @@ class TestFairLockGuard(test.NoDBTestCase): self.assertTrue(lock_guard.is_locked()) self.assertTrue(test_locks[0].is_writer()) self.assertTrue(test_locks[1].is_writer()) + + +class StaticallyDelayingCancellableTaskExecutorWrapperTest(test.NoDBTestCase): + def test_submit_one(self): + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.01, utils._get_default_executor()) + self.addCleanup(executor.shutdown) + + task_done = threading.Event() + + def task(num, foo): + self.assertEqual(12, num) + self.assertEqual("bar", foo) + task_done.set() + return foo + str(num) + + future = executor.submit_with_delay(task, 12, foo="bar") + + result = future.result() + self.assertTrue(task_done.is_set()) + self.assertEqual("bar12", result) + + def test_submit_one_exception_result(self): + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.01, utils._get_default_executor()) + self.addCleanup(executor.shutdown) + + task_done = threading.Event() + exc_to_raise = ValueError() + + def task(num, foo): + self.assertEqual(12, num) + self.assertEqual("bar", foo) + task_done.set() + raise exc_to_raise + + future = executor.submit_with_delay(task, 12, foo="bar") + exc = future.exception() + + self.assertTrue(task_done.is_set()) + self.assertEqual(exc_to_raise, exc) + + def test_submit_two_non_overlapping(self): + + def task1(): + return 42 + + def task2(): + return 13 + + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.01, utils._get_default_executor()) + + future1 = executor.submit_with_delay(task1) + self.assertEqual(42, future1.result()) + + future2 = executor.submit_with_delay(task2) + self.assertEqual(13, future2.result()) + + self.assertTrue(executor._queue.empty()) + + def test_submit_second_while_delaying_first(self): + task1_started = threading.Event() + + def task1(): + task1_started.set() + return 42 + + task2_started = threading.Event() + + def task2(): + task2_started.set() + return 13 + + # Create a "long" delay so the task will be actively managed by + # the wrapper while we submit the second task + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2, utils._get_default_executor()) + + future1 = executor.submit_with_delay(task1) + task1_start = time.monotonic() + # wait a bit so the wrapper is picking it up and waiting for its + # deadline + time.sleep(1) + self.assertTrue(executor._queue.empty()) + self.assertFalse(task1_started.is_set()) + + # now submit the second task, it will be queued + future2 = executor.submit_with_delay(task2) + task2_start = time.monotonic() + self.assertFalse(executor._queue.empty()) + self.assertFalse(task1_started.is_set()) + + # eventually both tasks finishes + self.assertEqual(42, future1.result()) + task1_end = time.monotonic() + self.assertEqual(13, future2.result()) + task2_end = time.monotonic() + + # and both tasks took about delay seconds individually, but the two + # tasks together took less than 2x delay seconds as they were + # overlapped. + task1_runtime = task1_end - task1_start + self.assertLess(task1_runtime, 2.5) + self.assertGreater(task1_runtime, 2.0) + task2_runtime = task2_end - task2_start + self.assertLess(task2_runtime, 2.5) + self.assertGreater(task2_runtime, 2.0) + total_runtime = task2_end - task1_start + self.assertLess(total_runtime, 4) + + def test_submit_multiple(self): + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.1, utils._get_default_executor()) + + def task(i): + return 2 * i + + futures = [] + for i in range(20): + futures.append(executor.submit_with_delay(task, i)) + + for i, f in enumerate(futures): + self.assertEqual(2 * i, f.result()) + + executor.shutdown(wait=True) + + def test_submit_multiple_executor_rejects_first_executes_second(self): + def task1(): + return 42 + + def task2(): + return 13 + + check_and_reject = mock.Mock( + side_effect=[futurist.RejectedSubmission(), None]) + + if utils.concurrency_mode_threading(): + ex = futurist.ThreadPoolExecutor( + max_workers=1, check_and_reject=check_and_reject) + else: + ex = futurist.GreenThreadPoolExecutor( + max_workers=1, check_and_reject=check_and_reject) + + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.1, ex) + self.addCleanup(executor.shutdown, wait=True) + + future1 = executor.submit_with_delay(task1) + future2 = executor.submit_with_delay(task2) + + self.assertEqual( + futurist.RejectedSubmission, type(future1.exception())) + self.assertEqual(13, future2.result()) + + def test_cancel_during_delay(self): + task1_started = threading.Event() + + def task1(): + task1_started.set() + return 42 + + def task2(): + return 13 + + # Create a "long" delay so the task will be actively delayed by + # the wrapper when we cancel it + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2, utils._get_default_executor()) + + future1 = executor.submit_with_delay(task1) + # wait a bit to let the task being picked up + time.sleep(1) + # it is not in the queue, so it is picked up + self.assertTrue(executor._queue.empty()) + # but not executing yet so it is being delayed + self.assertFalse(task1_started.is_set()) + + # cancel the task + future1.cancel() + + # Submit and wait for the execution of the second task to prove + # that the executor had time to finish waiting for the deadline of + # the first task, detected the cancellation and skipped the task, + # then executed the second task. + future2 = executor.submit_with_delay(task2) + self.assertEqual(13, future2.result()) + + # task1 is still not executed + self.assertFalse(task1_started.is_set()) + self.assertTrue(future1.cancelled()) + # no tasks remaining in the executor queue + self.assertTrue(executor._queue.empty()) + + def test_cancel_while_in_queue(self): + task1_started = threading.Event() + + def task1(): + task1_started.set() + return 42 + + task2_started = threading.Event() + + def task2(): + task2_started.set() + return 13 + + # Create a "long" delay so one task will be actively delayed while + # we submit a second task then cancel the second task. + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2, utils._get_default_executor()) + + future1 = executor.submit_with_delay(task1) + # Wait a bit to let the task being picked up. + time.sleep(1) + # It is not in the queue, so it is picked up, + self.assertTrue(executor._queue.empty()) + # but not executing yet, so it is being delayed + self.assertFalse(task1_started.is_set()) + + # Submit a second task that will be queued. + future2 = executor.submit_with_delay(task2) + self.assertFalse(executor._queue.empty()) + self.assertFalse(task2_started.is_set()) + + # Cancel the second task while it is in the queue. + future2.cancel() + + # The first task should finish normally + self.assertEqual(42, future1.result()) + + # But the second task should never be executed. + # To prove that we shutdown both the wrapper and the real executor + # then check the second task again. + executor.shutdown(wait=True) + utils._get_default_executor().shutdown(wait=True) + self.assertFalse(task2_started.is_set()) + self.assertTrue(future2.cancelled()) + + def test_instantaneous_shutdown(self): + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.1, utils._get_default_executor()) + + executor.shutdown(wait=False) + self.assertTrue(executor._shutdown) + + def test_shutdown_wait(self): + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.1, utils._get_default_executor()) + + executor.shutdown(wait=True) + self.assertTrue(executor._shutdown) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) + # Shutting down the wrapper does not affect the real executor + self.assertTrue(executor._executor.alive) + + def test_submit_after_shutdown_rejected(self): + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.1, utils._get_default_executor()) + + executor.shutdown() + self.assertTrue(executor._shutdown) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) + + exc = self.assertRaises( + RuntimeError, executor.submit_with_delay, lambda: None) + self.assertEqual( + "Cannot schedule new tasks after being shutdown", str(exc)) + + def test_submit_while_shutting_down(self): + task_started = threading.Event() + + def task(): + task_started.set() + + # Create a "long" delay so the task will be actively managed by + # the wrapper while the test calls shutdown on it. + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2, utils._get_default_executor()) + self.addCleanup(executor.shutdown, wait=True) + + executor.submit_with_delay(task) + executor.shutdown(wait=False) + + # the task is actively managed, delayed, by our wrapper while we are + # shutting the executor down, we should not be able to add new tasks + # even if the shutdown is not finished yet + self.assertFalse(task_started.is_set()) + exc = self.assertRaises( + RuntimeError, executor.submit_with_delay, lambda: None) + self.assertEqual( + "Cannot schedule new tasks after being shutdown", str(exc)) + + def test_shutdown_after_task_finished(self): + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 0.01, utils._get_default_executor()) + self.addCleanup(executor.shutdown) + + def task(): + return + + future = executor.submit_with_delay(task) + future.result() + + executor.shutdown() + + self.assertTrue(executor._shutdown) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) + + def test_no_wait_shutdown_task_finishes_normally(self): + task_started = threading.Event() + + def task(): + task_started.set() + return 42 + + # Create a "long" delay so the task will be actively managed by + # the wrapper while the test calls shutdown on it. + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2, utils._get_default_executor()) + self.addCleanup(executor.shutdown, wait=True) + + future = executor.submit_with_delay(task) + # Task is not executing it is being delayed + self.assertFalse(task_started.is_set()) + + # We expect that shutdown returns even though there are tasks being + # actively managed, delayed, by the executor as we called it with + # wait=False + executor.shutdown(wait=False) + + self.assertTrue(executor._shutdown) + self.assertFalse(executor._queue.empty()) + self.assertTrue(executor.is_alive) + + # Task is still delayed + self.assertFalse(task_started.is_set()) + # and it is not cancelled + self.assertFalse(future.cancelled()) + # and eventually executed + self.assertEqual(42, future.result()) + # and eventually the executor is terminated. This also covers the case + # when multiple shutdown call is made to the same executor. + executor.shutdown(wait=True) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) + + def test_no_wait_shutdown_multiple_tasks_finishes_normally(self): + task1_started = threading.Event() + + def task1(): + task1_started.set() + return 42 + + task2_started = threading.Event() + + def task2(): + task2_started.set() + return 13 + + # Create a "long" delay so the task will be actively managed by + # the wrapper while the test calls shutdown on it. + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2, utils._get_default_executor()) + + future1 = executor.submit_with_delay(task1) + future2 = executor.submit_with_delay(task2) + # Tasks are not executing they are being delayed + self.assertFalse(task1_started.is_set()) + self.assertFalse(task2_started.is_set()) + + # wait a bit so the wrapper actually starts waiting for the deadline + # of task1 + time.sleep(1) + # Task are still not executing + self.assertFalse(task1_started.is_set()) + self.assertFalse(task2_started.is_set()) + # task1 is already popped from the queue and the code is waiting for + # its deadline, while task2 is in the queue waiting + self.assertEqual(1, executor._queue.qsize()) + + # Shut down the executor. We expect that the tasks are still executed + executor.shutdown(wait=True) + + self.assertEqual(42, future1.result()) + self.assertEqual(13, future2.result()) + # and our wrapper is in a shutdown state + self.assertTrue(executor._shutdown) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) + + def test_shutdown_wait_task_finishes_normally(self): + task_started = threading.Event() + + def task(): + task_started.set() + return 42 + + # Create a "long" delay so the task will be actively managed by + # the wrapper while the test calls shutdown on it. + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2, utils._get_default_executor()) + + future = executor.submit_with_delay(task) + # Task is not executing it is being delayed + self.assertFalse(task_started.is_set()) + + # We expect that shutdown waits for the task to be submitted to the + # real executor after the delay + executor.shutdown(wait=True) + # Task is now submitted to the real executor so it can actually run + # and produce a result + result = future.result() + self.assertTrue(task_started.is_set()) + self.assertEqual(42, result) + # but our wrapper is in a shutdown state + self.assertTrue(executor._shutdown) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) + + def test_shutdown_does_not_wait_for_cancelled_task(self): + task_started = threading.Event() + + def task(): + task_started.set() + return 42 + + # Create a very long delay so the task will be actively managed by + # the wrapper while the test calls shutdown on it and we can + # check that the wrapper detects cancelled tasks during shutdown and + # does not wait for the whole deadline. + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2000, utils._get_default_executor()) + + future = executor.submit_with_delay(task) + # Task is not executing it is being delayed + self.assertFalse(task_started.is_set()) + + # Cancel the task, shutdown the executor. We expect that the cancelled + # task is not submitted for execution. + future.cancel() + executor.shutdown(wait=True) + self.assertFalse(task_started.is_set()) + self.assertTrue(future.cancelled()) + # and our wrapper is in a shutdown state + self.assertTrue(executor._shutdown) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) + + def test_shutdown_does_not_wait_for_multiple_cancelled_tasks(self): + task1_started = threading.Event() + + def task1(): + task1_started.set() + return 42 + + task2_started = threading.Event() + + def task2(): + task2_started.set() + return 13 + + # Create a very long delay so the task will be actively managed by + # the wrapper while the test calls shutdown on it and we can + # check that the wrapper detects cancelled tasks during shutdown and + # does not wait for the whole deadline. + executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( + 2000, utils._get_default_executor()) + + future1 = executor.submit_with_delay(task1) + future2 = executor.submit_with_delay(task2) + # Tasks are not executing they are being delayed + self.assertFalse(task1_started.is_set()) + self.assertFalse(task2_started.is_set()) + + # wait a bit so the wrapper actually starts waiting for the deadline + # of task1 + time.sleep(1) + # Task are still not executing + self.assertFalse(task1_started.is_set()) + self.assertFalse(task2_started.is_set()) + # task1 is already popped from the queue and the code is waiting for + # its deadline, while task2 is in the queue waiting + self.assertEqual(1, executor._queue.qsize()) + + # Cancel both tasks then shutdown the executor and expect that no + # task will be executed. + future1.cancel() + future2.cancel() + executor.shutdown(wait=True) + + self.assertFalse(task1_started.is_set()) + self.assertTrue(future1.cancelled()) + self.assertFalse(task2_started.is_set()) + self.assertTrue(future2.cancelled()) + # and our wrapper is in a shutdown state + self.assertTrue(executor._shutdown) + self.assertTrue(executor._queue.empty()) + self.assertFalse(executor.is_alive) diff --git a/nova/tests/unit/virt/libvirt/test_host.py b/nova/tests/unit/virt/libvirt/test_host.py index e575d9a57f..62a2f4f4b4 100644 --- a/nova/tests/unit/virt/libvirt/test_host.py +++ b/nova/tests/unit/virt/libvirt/test_host.py @@ -118,8 +118,9 @@ class HostTestCase(test.NoDBTestCase): self.assertEqual(0, len(log_mock.method_calls), 'LOG should not be used in _connect_auth_cb.') - @mock.patch.object(utils, 'spawn_after') - def test_event_dispatch(self, mock_spawn_after): + @mock.patch.object( + utils, 'StaticallyDelayingCancellableTaskExecutorWrapper') + def test_event_dispatch(self, mock_wrapper): # Validate that the libvirt self-pipe for forwarding # events between threads is working sanely @@ -164,9 +165,11 @@ class HostTestCase(test.NoDBTestCase): want_events = [event1, event2, event3] self.assertEqual(want_events, got_events) + mock_wrapper.assert_called_once_with( + delay=15, executor=utils._get_default_executor()) # STOPPED is delayed so it's handled separately - mock_spawn_after.assert_called_once_with( - hostimpl._lifecycle_delay, hostimpl._event_emit, event4) + mock_wrapper.return_value.submit_with_delay( + hostimpl._event_emit, event4) @mock.patch('nova.virt.libvirt.host.Host._event_emit_delayed') def test_event_lifecycle(self, mock_emit): @@ -275,19 +278,21 @@ class HostTestCase(test.NoDBTestCase): test.MatchType(libvirt_guest.Guest), instance=None, logging_ok=False) - @mock.patch.object(utils, 'spawn_after') - def test_event_emit_delayed_call_delayed(self, mock_spawn_after): + @mock.patch('nova.utils.StaticallyDelayingCancellableTaskExecutorWrapper') + def test_event_emit_delayed_call_delayed(self, mock_wrapper): ev = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STOPPED) hostimpl = host.Host( 'qemu:///system', lifecycle_event_handler=lambda e: None) hostimpl._event_emit_delayed(ev) - mock_spawn_after.assert_called_once_with( - 15, hostimpl._event_emit, ev) + mock_wrapper.assert_called_once_with( + delay=15, executor=utils._get_default_executor()) + mock_wrapper.return_value.submit_with_delay.assert_called_once_with( + hostimpl._event_emit, ev) - @mock.patch.object(utils, 'spawn_after') - def test_event_emit_delayed_call_delayed_pending(self, spawn_after_mock): + @mock.patch('nova.utils.StaticallyDelayingCancellableTaskExecutorWrapper') + def test_event_emit_delayed_call_delayed_pending(self, mock_wrapper): hostimpl = host.Host( 'qemu:///system', lifecycle_event_handler=lambda e: None) uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686" @@ -295,9 +300,9 @@ class HostTestCase(test.NoDBTestCase): ev = event.LifecycleEvent( uuid, event.EVENT_LIFECYCLE_STOPPED) hostimpl._event_emit_delayed(ev) - mock_future = spawn_after_mock.return_value + mock_future = mock_wrapper.return_value.submit_with_delay.return_value mock_future.add_done_callback.assert_called_once() - self.assertTrue(spawn_after_mock.called) + self.assertTrue(mock_wrapper.return_value.submit_with_delay.called) self.assertIs(mock_future, hostimpl._events_delayed[uuid]) def test_event_delayed_cleanup(self): diff --git a/nova/utils.py b/nova/utils.py index 574dd04db0..35ee8ad6ee 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -23,10 +23,12 @@ import hashlib import inspect import multiprocessing import os +import queue import random import re import shutil import tempfile +import threading import time import typing as ty @@ -1409,3 +1411,219 @@ def tpool_wrap(target, autowrap=()): return target else: return tpool.Proxy(target, autowrap=autowrap) + + +class StaticallyDelayingCancellableTaskExecutorWrapper: + """Executor wrapper that submit work to another executor but delays each + task's submission with a statically defined delay and supports cancelling + the task during such delay. + + Note that tasks that are actually started running in the real executor + might not be cancellable anymore depending on that executor and the + concurrency mode used. See + https://docs.python.org/3.12/library/concurrent.futures.html#concurrent.futures.Future.cancel + + Note that shutting down the wrapper only shuts down its own scheduler + thread but does not shut down the real executor that is passed in __init__. + + Note that this class does not support a different delay length for + different tasks. + """ + + class Task: + def __init__( + self, + delay: float, + fn: ty.Callable[..., ty.Any], + args: tuple, + kwargs: dict + ): + self.deadline = time.monotonic() + delay + self.future = futurist.Future() + self.fn = fn + self.args = args + self.kwargs = kwargs + + @property + def remaining_delay(self): + return self.deadline - time.monotonic() + + def __str__(self): + return ( + f"Task(fn={self.fn}, " + f"remaining_delay={self.remaining_delay} " + f"future={self.future})") + + def __init__(self, delay: float, executor: Executor): + """Initialize the wrapper + + :param delay: delay length in seconds + :param executor: executor object to run each task. It supports both + native threading and eventlet based executors. + """ + + self._queue: queue.Queue = queue.Queue() + self._executor = executor + self._delay = delay + self._shutdown = threading.Condition() + self._shutdown_requested = False + self._sentinel = self.Task(0, lambda: None, (), {}) + # We are intentionally not running our _run() in the executor + # as we cannot assume that the executor has more than one worker + # and our logic never finishes so it would consume one worker + # constantly. + self._thread = threading.Thread(target=self._run) + self._thread.daemon = True + self._thread.start() + + @staticmethod + def _log(msg, *args): + LOG.debug(msg, *args) + + @staticmethod + def _task_wrapper(task) -> None: + """This wraps the original task so when it finishes in the real + executor the result of the task can be copied to the Future object + already returned to our caller from submit_with_delay(). So + the caller can get the result or exception from the task. + """ + try: + task.future.set_result(task.fn(*task.args, **task.kwargs)) + except BaseException as e: + task.future.set_exception(e) + + def _wait_for_deadline_then_set_running(self, task) -> bool: + """Waiting for the task's deadline then mark it running + + Wait can be interrupted by shutdown of the wrapper in such a case + the task's state is checked. If the task is cancelled then return + immediately with False. If the task is not cancelled then wait for + its deadline and eventually return True if the task is still not + cancelled. + + If True is returned the future in the task is also atomically set to + running state and the future cannot be cancelled anymore. + """ + if task.remaining_delay <= 0: + return task.future.set_running_or_notify_cancel() + + self._log("Waitig for the deadline of %s", task) + with self._shutdown: + shutdown = self._shutdown.wait_for( + lambda: self._shutdown_requested, task.remaining_delay) + + if shutdown: + self._log( + "Shutdown is requested while waiting " + "for the deadline of %s", task) + + if task.future.cancelled(): + return False + + self._log( + "%s is not cancelled so still waiting for its " + "deadline", task) + if task.remaining_delay > 0: + # Blocking here is fine as we have the assumption that + # no new task can arrive that has a deadline that is sooner + # than the deadline of the oldest task in the queue due to + # our static delay (and we assume that time travel is not + # allowed). + time.sleep(task.remaining_delay) + + return task.future.set_running_or_notify_cancel() + + def _run(self): + while True: + self._log("Waiting for the next task") + task: StaticallyDelayingCancellableTaskExecutorWrapper.Task = ( + self._queue.get()) + self._log("Received %s", task) + + if task is self._sentinel: + # We are asked to terminate so exit the loop. + self._queue.task_done() + self._log("Sentinel received, thread is exiting") + return + + if task.future.cancelled(): + # The task was cancelled while it was in the queue. + # We don't need to run it. Just move on to the next task + self._log("%s was cancelled while queued, skipping", task) + self._queue.task_done() + continue + + # The task is still valid, wait for its deadline + run_it = self._wait_for_deadline_then_set_running(task) + if not run_it: + # The task was cancelled during the delay period. + # We don't need to run it. Just move on to the next task. + self._log( + "%s was cancelled during its delay period, skipping", task) + self._queue.task_done() + continue + + # Push the task to the real executor. We don't need to wait + # for the result here as the client can do that + # via the task.future we already returned from submit_with_delay() + try: + self._executor.submit(self._task_wrapper, task) + except BaseException as e: + # If for any reason we cannot submit a task then we should not + # let the exception escape as that will prevent our thread + # to terminate cleanly. Instead, we log and propagate back. + LOG.exception( + "Failed to submit %s to executor %s", task, self._executor) + task.future.set_exception(e) + self._queue.task_done() + continue + + self._log("%s submitted to %s", task, self._executor) + # The task is not done from the executor perspective, but it is + # done from the _run() logic perspective. Signal it, so shutdown() + # can use Queue.join() + self._queue.task_done() + + def submit_with_delay( + self, fn: ty.Callable[..., ty.Any], *args: ty.Any, **kwargs: ty.Any + ) -> futurist.Future: + """Submit work with delay.""" + # We need this wide locking as we don't want to queue a task behind + # the sentinel as that task will never be processed. + with self._shutdown: + if self._shutdown_requested: + raise RuntimeError( + "Cannot schedule new tasks after being shutdown") + + task = self.Task(self._delay, fn, args, kwargs) + self._queue.put(task) + self._log("Queued %s", task) + return task.future + + def shutdown(self, wait: bool = True): + """Shutdown the executor""" + with self._shutdown: + if not self._shutdown_requested: + # Ensure that our thread wakes at least one more time to allow + # it to exit by queuing up a sentinel task after the shutdown + # condition is set. This task won't be executed. + self._queue.put(self._sentinel) + self._log("Sentinel is queued") + self._shutdown_requested = True + self._shutdown.notify_all() + self._log("Shutdown is set") + + # If wait is set we need to wait for our sentinel to be processed and + # therefore our thread to exit. + # NOTE(gibi): We are intentionally not shutting down the real executor + # as we are not the one created it or owning it so it might be shared + # between different callers. + if wait: + self._queue.join() + self._log("Queue joined") + self._thread.join() + self._log("Scheduler thread joined") + + @property + def is_alive(self) -> bool: + return self._thread.is_alive() diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 1b24f8bbbd..5266481a45 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -346,7 +346,9 @@ class Host(object): # STARTED events are sent. To prevent shutting # down the domain during a reboot, delay the # STOPPED lifecycle event some seconds. - self._lifecycle_delay = 15 + self._delayed_executor = ( + utils.StaticallyDelayingCancellableTaskExecutorWrapper( + delay=15, executor=utils._get_default_executor())) self._initialized = False self._libvirt_proxy_classes = self._get_libvirt_proxy_classes(libvirt) @@ -542,29 +544,29 @@ class Host(object): def _event_emit_delayed(self, event): """Emit events - possibly delayed.""" - def event_cleanup(event): - """Callback function for greenthread. Called - to cleanup the _events_delayed dictionary when an event - was called. - """ - self._events_delayed.pop(event.uuid, None) - # Cleanup possible delayed stop events. - if event.uuid in self._events_delayed.keys(): + # Cancel possible delayed stop events when we received any other + # event for the same domain. + if (isinstance(event, virtevent.LifecycleEvent) and + event.uuid in self._events_delayed.keys() + ): self._events_delayed[event.uuid].cancel() self._events_delayed.pop(event.uuid, None) - LOG.debug("Removed pending event for %s due to event", event.uuid) + LOG.debug( + "Removed pending STOPPED event for %s due to new event %s", + event.uuid, event) if (isinstance(event, virtevent.LifecycleEvent) and event.transition == virtevent.EVENT_LIFECYCLE_STOPPED): # Delay STOPPED event, as they may be followed by a STARTED # event in case the instance is rebooting - id_ = utils.spawn_after( - self._lifecycle_delay, self._event_emit, event) - self._events_delayed[event.uuid] = id_ + future = self._delayed_executor.submit_with_delay( + self._event_emit, event) + self._events_delayed[event.uuid] = future # add callback to cleanup self._events_delayed dict after # event was called - id_.add_done_callback(lambda _: event_cleanup(event)) + future.add_done_callback( + lambda _: self._events_delayed.pop(event.uuid, None)) else: self._event_emit(event)