Merge "Use an executor to delay STOPPED events"
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import futurist
|
||||
import hashlib
|
||||
import os
|
||||
import os.path
|
||||
@@ -1989,3 +1990,505 @@ class TestFairLockGuard(test.NoDBTestCase):
|
||||
self.assertTrue(lock_guard.is_locked())
|
||||
self.assertTrue(test_locks[0].is_writer())
|
||||
self.assertTrue(test_locks[1].is_writer())
|
||||
|
||||
|
||||
class StaticallyDelayingCancellableTaskExecutorWrapperTest(test.NoDBTestCase):
|
||||
def test_submit_one(self):
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.01, utils._get_default_executor())
|
||||
self.addCleanup(executor.shutdown)
|
||||
|
||||
task_done = threading.Event()
|
||||
|
||||
def task(num, foo):
|
||||
self.assertEqual(12, num)
|
||||
self.assertEqual("bar", foo)
|
||||
task_done.set()
|
||||
return foo + str(num)
|
||||
|
||||
future = executor.submit_with_delay(task, 12, foo="bar")
|
||||
|
||||
result = future.result()
|
||||
self.assertTrue(task_done.is_set())
|
||||
self.assertEqual("bar12", result)
|
||||
|
||||
def test_submit_one_exception_result(self):
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.01, utils._get_default_executor())
|
||||
self.addCleanup(executor.shutdown)
|
||||
|
||||
task_done = threading.Event()
|
||||
exc_to_raise = ValueError()
|
||||
|
||||
def task(num, foo):
|
||||
self.assertEqual(12, num)
|
||||
self.assertEqual("bar", foo)
|
||||
task_done.set()
|
||||
raise exc_to_raise
|
||||
|
||||
future = executor.submit_with_delay(task, 12, foo="bar")
|
||||
exc = future.exception()
|
||||
|
||||
self.assertTrue(task_done.is_set())
|
||||
self.assertEqual(exc_to_raise, exc)
|
||||
|
||||
def test_submit_two_non_overlapping(self):
|
||||
|
||||
def task1():
|
||||
return 42
|
||||
|
||||
def task2():
|
||||
return 13
|
||||
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.01, utils._get_default_executor())
|
||||
|
||||
future1 = executor.submit_with_delay(task1)
|
||||
self.assertEqual(42, future1.result())
|
||||
|
||||
future2 = executor.submit_with_delay(task2)
|
||||
self.assertEqual(13, future2.result())
|
||||
|
||||
self.assertTrue(executor._queue.empty())
|
||||
|
||||
def test_submit_second_while_delaying_first(self):
|
||||
task1_started = threading.Event()
|
||||
|
||||
def task1():
|
||||
task1_started.set()
|
||||
return 42
|
||||
|
||||
task2_started = threading.Event()
|
||||
|
||||
def task2():
|
||||
task2_started.set()
|
||||
return 13
|
||||
|
||||
# Create a "long" delay so the task will be actively managed by
|
||||
# the wrapper while we submit the second task
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2, utils._get_default_executor())
|
||||
|
||||
future1 = executor.submit_with_delay(task1)
|
||||
task1_start = time.monotonic()
|
||||
# wait a bit so the wrapper is picking it up and waiting for its
|
||||
# deadline
|
||||
time.sleep(1)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(task1_started.is_set())
|
||||
|
||||
# now submit the second task, it will be queued
|
||||
future2 = executor.submit_with_delay(task2)
|
||||
task2_start = time.monotonic()
|
||||
self.assertFalse(executor._queue.empty())
|
||||
self.assertFalse(task1_started.is_set())
|
||||
|
||||
# eventually both tasks finishes
|
||||
self.assertEqual(42, future1.result())
|
||||
task1_end = time.monotonic()
|
||||
self.assertEqual(13, future2.result())
|
||||
task2_end = time.monotonic()
|
||||
|
||||
# and both tasks took about delay seconds individually, but the two
|
||||
# tasks together took less than 2x delay seconds as they were
|
||||
# overlapped.
|
||||
task1_runtime = task1_end - task1_start
|
||||
self.assertLess(task1_runtime, 2.5)
|
||||
self.assertGreater(task1_runtime, 2.0)
|
||||
task2_runtime = task2_end - task2_start
|
||||
self.assertLess(task2_runtime, 2.5)
|
||||
self.assertGreater(task2_runtime, 2.0)
|
||||
total_runtime = task2_end - task1_start
|
||||
self.assertLess(total_runtime, 4)
|
||||
|
||||
def test_submit_multiple(self):
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.1, utils._get_default_executor())
|
||||
|
||||
def task(i):
|
||||
return 2 * i
|
||||
|
||||
futures = []
|
||||
for i in range(20):
|
||||
futures.append(executor.submit_with_delay(task, i))
|
||||
|
||||
for i, f in enumerate(futures):
|
||||
self.assertEqual(2 * i, f.result())
|
||||
|
||||
executor.shutdown(wait=True)
|
||||
|
||||
def test_submit_multiple_executor_rejects_first_executes_second(self):
|
||||
def task1():
|
||||
return 42
|
||||
|
||||
def task2():
|
||||
return 13
|
||||
|
||||
check_and_reject = mock.Mock(
|
||||
side_effect=[futurist.RejectedSubmission(), None])
|
||||
|
||||
if utils.concurrency_mode_threading():
|
||||
ex = futurist.ThreadPoolExecutor(
|
||||
max_workers=1, check_and_reject=check_and_reject)
|
||||
else:
|
||||
ex = futurist.GreenThreadPoolExecutor(
|
||||
max_workers=1, check_and_reject=check_and_reject)
|
||||
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.1, ex)
|
||||
self.addCleanup(executor.shutdown, wait=True)
|
||||
|
||||
future1 = executor.submit_with_delay(task1)
|
||||
future2 = executor.submit_with_delay(task2)
|
||||
|
||||
self.assertEqual(
|
||||
futurist.RejectedSubmission, type(future1.exception()))
|
||||
self.assertEqual(13, future2.result())
|
||||
|
||||
def test_cancel_during_delay(self):
|
||||
task1_started = threading.Event()
|
||||
|
||||
def task1():
|
||||
task1_started.set()
|
||||
return 42
|
||||
|
||||
def task2():
|
||||
return 13
|
||||
|
||||
# Create a "long" delay so the task will be actively delayed by
|
||||
# the wrapper when we cancel it
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2, utils._get_default_executor())
|
||||
|
||||
future1 = executor.submit_with_delay(task1)
|
||||
# wait a bit to let the task being picked up
|
||||
time.sleep(1)
|
||||
# it is not in the queue, so it is picked up
|
||||
self.assertTrue(executor._queue.empty())
|
||||
# but not executing yet so it is being delayed
|
||||
self.assertFalse(task1_started.is_set())
|
||||
|
||||
# cancel the task
|
||||
future1.cancel()
|
||||
|
||||
# Submit and wait for the execution of the second task to prove
|
||||
# that the executor had time to finish waiting for the deadline of
|
||||
# the first task, detected the cancellation and skipped the task,
|
||||
# then executed the second task.
|
||||
future2 = executor.submit_with_delay(task2)
|
||||
self.assertEqual(13, future2.result())
|
||||
|
||||
# task1 is still not executed
|
||||
self.assertFalse(task1_started.is_set())
|
||||
self.assertTrue(future1.cancelled())
|
||||
# no tasks remaining in the executor queue
|
||||
self.assertTrue(executor._queue.empty())
|
||||
|
||||
def test_cancel_while_in_queue(self):
|
||||
task1_started = threading.Event()
|
||||
|
||||
def task1():
|
||||
task1_started.set()
|
||||
return 42
|
||||
|
||||
task2_started = threading.Event()
|
||||
|
||||
def task2():
|
||||
task2_started.set()
|
||||
return 13
|
||||
|
||||
# Create a "long" delay so one task will be actively delayed while
|
||||
# we submit a second task then cancel the second task.
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2, utils._get_default_executor())
|
||||
|
||||
future1 = executor.submit_with_delay(task1)
|
||||
# Wait a bit to let the task being picked up.
|
||||
time.sleep(1)
|
||||
# It is not in the queue, so it is picked up,
|
||||
self.assertTrue(executor._queue.empty())
|
||||
# but not executing yet, so it is being delayed
|
||||
self.assertFalse(task1_started.is_set())
|
||||
|
||||
# Submit a second task that will be queued.
|
||||
future2 = executor.submit_with_delay(task2)
|
||||
self.assertFalse(executor._queue.empty())
|
||||
self.assertFalse(task2_started.is_set())
|
||||
|
||||
# Cancel the second task while it is in the queue.
|
||||
future2.cancel()
|
||||
|
||||
# The first task should finish normally
|
||||
self.assertEqual(42, future1.result())
|
||||
|
||||
# But the second task should never be executed.
|
||||
# To prove that we shutdown both the wrapper and the real executor
|
||||
# then check the second task again.
|
||||
executor.shutdown(wait=True)
|
||||
utils._get_default_executor().shutdown(wait=True)
|
||||
self.assertFalse(task2_started.is_set())
|
||||
self.assertTrue(future2.cancelled())
|
||||
|
||||
def test_instantaneous_shutdown(self):
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.1, utils._get_default_executor())
|
||||
|
||||
executor.shutdown(wait=False)
|
||||
self.assertTrue(executor._shutdown)
|
||||
|
||||
def test_shutdown_wait(self):
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.1, utils._get_default_executor())
|
||||
|
||||
executor.shutdown(wait=True)
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
# Shutting down the wrapper does not affect the real executor
|
||||
self.assertTrue(executor._executor.alive)
|
||||
|
||||
def test_submit_after_shutdown_rejected(self):
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.1, utils._get_default_executor())
|
||||
|
||||
executor.shutdown()
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
|
||||
exc = self.assertRaises(
|
||||
RuntimeError, executor.submit_with_delay, lambda: None)
|
||||
self.assertEqual(
|
||||
"Cannot schedule new tasks after being shutdown", str(exc))
|
||||
|
||||
def test_submit_while_shutting_down(self):
|
||||
task_started = threading.Event()
|
||||
|
||||
def task():
|
||||
task_started.set()
|
||||
|
||||
# Create a "long" delay so the task will be actively managed by
|
||||
# the wrapper while the test calls shutdown on it.
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2, utils._get_default_executor())
|
||||
self.addCleanup(executor.shutdown, wait=True)
|
||||
|
||||
executor.submit_with_delay(task)
|
||||
executor.shutdown(wait=False)
|
||||
|
||||
# the task is actively managed, delayed, by our wrapper while we are
|
||||
# shutting the executor down, we should not be able to add new tasks
|
||||
# even if the shutdown is not finished yet
|
||||
self.assertFalse(task_started.is_set())
|
||||
exc = self.assertRaises(
|
||||
RuntimeError, executor.submit_with_delay, lambda: None)
|
||||
self.assertEqual(
|
||||
"Cannot schedule new tasks after being shutdown", str(exc))
|
||||
|
||||
def test_shutdown_after_task_finished(self):
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
0.01, utils._get_default_executor())
|
||||
self.addCleanup(executor.shutdown)
|
||||
|
||||
def task():
|
||||
return
|
||||
|
||||
future = executor.submit_with_delay(task)
|
||||
future.result()
|
||||
|
||||
executor.shutdown()
|
||||
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
|
||||
def test_no_wait_shutdown_task_finishes_normally(self):
|
||||
task_started = threading.Event()
|
||||
|
||||
def task():
|
||||
task_started.set()
|
||||
return 42
|
||||
|
||||
# Create a "long" delay so the task will be actively managed by
|
||||
# the wrapper while the test calls shutdown on it.
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2, utils._get_default_executor())
|
||||
self.addCleanup(executor.shutdown, wait=True)
|
||||
|
||||
future = executor.submit_with_delay(task)
|
||||
# Task is not executing it is being delayed
|
||||
self.assertFalse(task_started.is_set())
|
||||
|
||||
# We expect that shutdown returns even though there are tasks being
|
||||
# actively managed, delayed, by the executor as we called it with
|
||||
# wait=False
|
||||
executor.shutdown(wait=False)
|
||||
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertFalse(executor._queue.empty())
|
||||
self.assertTrue(executor.is_alive)
|
||||
|
||||
# Task is still delayed
|
||||
self.assertFalse(task_started.is_set())
|
||||
# and it is not cancelled
|
||||
self.assertFalse(future.cancelled())
|
||||
# and eventually executed
|
||||
self.assertEqual(42, future.result())
|
||||
# and eventually the executor is terminated. This also covers the case
|
||||
# when multiple shutdown call is made to the same executor.
|
||||
executor.shutdown(wait=True)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
|
||||
def test_no_wait_shutdown_multiple_tasks_finishes_normally(self):
|
||||
task1_started = threading.Event()
|
||||
|
||||
def task1():
|
||||
task1_started.set()
|
||||
return 42
|
||||
|
||||
task2_started = threading.Event()
|
||||
|
||||
def task2():
|
||||
task2_started.set()
|
||||
return 13
|
||||
|
||||
# Create a "long" delay so the task will be actively managed by
|
||||
# the wrapper while the test calls shutdown on it.
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2, utils._get_default_executor())
|
||||
|
||||
future1 = executor.submit_with_delay(task1)
|
||||
future2 = executor.submit_with_delay(task2)
|
||||
# Tasks are not executing they are being delayed
|
||||
self.assertFalse(task1_started.is_set())
|
||||
self.assertFalse(task2_started.is_set())
|
||||
|
||||
# wait a bit so the wrapper actually starts waiting for the deadline
|
||||
# of task1
|
||||
time.sleep(1)
|
||||
# Task are still not executing
|
||||
self.assertFalse(task1_started.is_set())
|
||||
self.assertFalse(task2_started.is_set())
|
||||
# task1 is already popped from the queue and the code is waiting for
|
||||
# its deadline, while task2 is in the queue waiting
|
||||
self.assertEqual(1, executor._queue.qsize())
|
||||
|
||||
# Shut down the executor. We expect that the tasks are still executed
|
||||
executor.shutdown(wait=True)
|
||||
|
||||
self.assertEqual(42, future1.result())
|
||||
self.assertEqual(13, future2.result())
|
||||
# and our wrapper is in a shutdown state
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
|
||||
def test_shutdown_wait_task_finishes_normally(self):
|
||||
task_started = threading.Event()
|
||||
|
||||
def task():
|
||||
task_started.set()
|
||||
return 42
|
||||
|
||||
# Create a "long" delay so the task will be actively managed by
|
||||
# the wrapper while the test calls shutdown on it.
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2, utils._get_default_executor())
|
||||
|
||||
future = executor.submit_with_delay(task)
|
||||
# Task is not executing it is being delayed
|
||||
self.assertFalse(task_started.is_set())
|
||||
|
||||
# We expect that shutdown waits for the task to be submitted to the
|
||||
# real executor after the delay
|
||||
executor.shutdown(wait=True)
|
||||
# Task is now submitted to the real executor so it can actually run
|
||||
# and produce a result
|
||||
result = future.result()
|
||||
self.assertTrue(task_started.is_set())
|
||||
self.assertEqual(42, result)
|
||||
# but our wrapper is in a shutdown state
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
|
||||
def test_shutdown_does_not_wait_for_cancelled_task(self):
|
||||
task_started = threading.Event()
|
||||
|
||||
def task():
|
||||
task_started.set()
|
||||
return 42
|
||||
|
||||
# Create a very long delay so the task will be actively managed by
|
||||
# the wrapper while the test calls shutdown on it and we can
|
||||
# check that the wrapper detects cancelled tasks during shutdown and
|
||||
# does not wait for the whole deadline.
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2000, utils._get_default_executor())
|
||||
|
||||
future = executor.submit_with_delay(task)
|
||||
# Task is not executing it is being delayed
|
||||
self.assertFalse(task_started.is_set())
|
||||
|
||||
# Cancel the task, shutdown the executor. We expect that the cancelled
|
||||
# task is not submitted for execution.
|
||||
future.cancel()
|
||||
executor.shutdown(wait=True)
|
||||
self.assertFalse(task_started.is_set())
|
||||
self.assertTrue(future.cancelled())
|
||||
# and our wrapper is in a shutdown state
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
|
||||
def test_shutdown_does_not_wait_for_multiple_cancelled_tasks(self):
|
||||
task1_started = threading.Event()
|
||||
|
||||
def task1():
|
||||
task1_started.set()
|
||||
return 42
|
||||
|
||||
task2_started = threading.Event()
|
||||
|
||||
def task2():
|
||||
task2_started.set()
|
||||
return 13
|
||||
|
||||
# Create a very long delay so the task will be actively managed by
|
||||
# the wrapper while the test calls shutdown on it and we can
|
||||
# check that the wrapper detects cancelled tasks during shutdown and
|
||||
# does not wait for the whole deadline.
|
||||
executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
2000, utils._get_default_executor())
|
||||
|
||||
future1 = executor.submit_with_delay(task1)
|
||||
future2 = executor.submit_with_delay(task2)
|
||||
# Tasks are not executing they are being delayed
|
||||
self.assertFalse(task1_started.is_set())
|
||||
self.assertFalse(task2_started.is_set())
|
||||
|
||||
# wait a bit so the wrapper actually starts waiting for the deadline
|
||||
# of task1
|
||||
time.sleep(1)
|
||||
# Task are still not executing
|
||||
self.assertFalse(task1_started.is_set())
|
||||
self.assertFalse(task2_started.is_set())
|
||||
# task1 is already popped from the queue and the code is waiting for
|
||||
# its deadline, while task2 is in the queue waiting
|
||||
self.assertEqual(1, executor._queue.qsize())
|
||||
|
||||
# Cancel both tasks then shutdown the executor and expect that no
|
||||
# task will be executed.
|
||||
future1.cancel()
|
||||
future2.cancel()
|
||||
executor.shutdown(wait=True)
|
||||
|
||||
self.assertFalse(task1_started.is_set())
|
||||
self.assertTrue(future1.cancelled())
|
||||
self.assertFalse(task2_started.is_set())
|
||||
self.assertTrue(future2.cancelled())
|
||||
# and our wrapper is in a shutdown state
|
||||
self.assertTrue(executor._shutdown)
|
||||
self.assertTrue(executor._queue.empty())
|
||||
self.assertFalse(executor.is_alive)
|
||||
|
||||
@@ -118,8 +118,9 @@ class HostTestCase(test.NoDBTestCase):
|
||||
self.assertEqual(0, len(log_mock.method_calls),
|
||||
'LOG should not be used in _connect_auth_cb.')
|
||||
|
||||
@mock.patch.object(utils, 'spawn_after')
|
||||
def test_event_dispatch(self, mock_spawn_after):
|
||||
@mock.patch.object(
|
||||
utils, 'StaticallyDelayingCancellableTaskExecutorWrapper')
|
||||
def test_event_dispatch(self, mock_wrapper):
|
||||
# Validate that the libvirt self-pipe for forwarding
|
||||
# events between threads is working sanely
|
||||
|
||||
@@ -164,9 +165,11 @@ class HostTestCase(test.NoDBTestCase):
|
||||
want_events = [event1, event2, event3]
|
||||
self.assertEqual(want_events, got_events)
|
||||
|
||||
mock_wrapper.assert_called_once_with(
|
||||
delay=15, executor=utils._get_default_executor())
|
||||
# STOPPED is delayed so it's handled separately
|
||||
mock_spawn_after.assert_called_once_with(
|
||||
hostimpl._lifecycle_delay, hostimpl._event_emit, event4)
|
||||
mock_wrapper.return_value.submit_with_delay(
|
||||
hostimpl._event_emit, event4)
|
||||
|
||||
@mock.patch('nova.virt.libvirt.host.Host._event_emit_delayed')
|
||||
def test_event_lifecycle(self, mock_emit):
|
||||
@@ -275,19 +278,21 @@ class HostTestCase(test.NoDBTestCase):
|
||||
test.MatchType(libvirt_guest.Guest), instance=None,
|
||||
logging_ok=False)
|
||||
|
||||
@mock.patch.object(utils, 'spawn_after')
|
||||
def test_event_emit_delayed_call_delayed(self, mock_spawn_after):
|
||||
@mock.patch('nova.utils.StaticallyDelayingCancellableTaskExecutorWrapper')
|
||||
def test_event_emit_delayed_call_delayed(self, mock_wrapper):
|
||||
ev = event.LifecycleEvent(
|
||||
"cef19ce0-0ca2-11df-855d-b19fbce37686",
|
||||
event.EVENT_LIFECYCLE_STOPPED)
|
||||
hostimpl = host.Host(
|
||||
'qemu:///system', lifecycle_event_handler=lambda e: None)
|
||||
hostimpl._event_emit_delayed(ev)
|
||||
mock_spawn_after.assert_called_once_with(
|
||||
15, hostimpl._event_emit, ev)
|
||||
mock_wrapper.assert_called_once_with(
|
||||
delay=15, executor=utils._get_default_executor())
|
||||
mock_wrapper.return_value.submit_with_delay.assert_called_once_with(
|
||||
hostimpl._event_emit, ev)
|
||||
|
||||
@mock.patch.object(utils, 'spawn_after')
|
||||
def test_event_emit_delayed_call_delayed_pending(self, spawn_after_mock):
|
||||
@mock.patch('nova.utils.StaticallyDelayingCancellableTaskExecutorWrapper')
|
||||
def test_event_emit_delayed_call_delayed_pending(self, mock_wrapper):
|
||||
hostimpl = host.Host(
|
||||
'qemu:///system', lifecycle_event_handler=lambda e: None)
|
||||
uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686"
|
||||
@@ -295,9 +300,9 @@ class HostTestCase(test.NoDBTestCase):
|
||||
ev = event.LifecycleEvent(
|
||||
uuid, event.EVENT_LIFECYCLE_STOPPED)
|
||||
hostimpl._event_emit_delayed(ev)
|
||||
mock_future = spawn_after_mock.return_value
|
||||
mock_future = mock_wrapper.return_value.submit_with_delay.return_value
|
||||
mock_future.add_done_callback.assert_called_once()
|
||||
self.assertTrue(spawn_after_mock.called)
|
||||
self.assertTrue(mock_wrapper.return_value.submit_with_delay.called)
|
||||
self.assertIs(mock_future, hostimpl._events_delayed[uuid])
|
||||
|
||||
def test_event_delayed_cleanup(self):
|
||||
|
||||
+218
@@ -23,10 +23,12 @@ import hashlib
|
||||
import inspect
|
||||
import multiprocessing
|
||||
import os
|
||||
import queue
|
||||
import random
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import typing as ty
|
||||
|
||||
@@ -1409,3 +1411,219 @@ def tpool_wrap(target, autowrap=()):
|
||||
return target
|
||||
else:
|
||||
return tpool.Proxy(target, autowrap=autowrap)
|
||||
|
||||
|
||||
class StaticallyDelayingCancellableTaskExecutorWrapper:
|
||||
"""Executor wrapper that submit work to another executor but delays each
|
||||
task's submission with a statically defined delay and supports cancelling
|
||||
the task during such delay.
|
||||
|
||||
Note that tasks that are actually started running in the real executor
|
||||
might not be cancellable anymore depending on that executor and the
|
||||
concurrency mode used. See
|
||||
https://docs.python.org/3.12/library/concurrent.futures.html#concurrent.futures.Future.cancel
|
||||
|
||||
Note that shutting down the wrapper only shuts down its own scheduler
|
||||
thread but does not shut down the real executor that is passed in __init__.
|
||||
|
||||
Note that this class does not support a different delay length for
|
||||
different tasks.
|
||||
"""
|
||||
|
||||
class Task:
|
||||
def __init__(
|
||||
self,
|
||||
delay: float,
|
||||
fn: ty.Callable[..., ty.Any],
|
||||
args: tuple,
|
||||
kwargs: dict
|
||||
):
|
||||
self.deadline = time.monotonic() + delay
|
||||
self.future = futurist.Future()
|
||||
self.fn = fn
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
@property
|
||||
def remaining_delay(self):
|
||||
return self.deadline - time.monotonic()
|
||||
|
||||
def __str__(self):
|
||||
return (
|
||||
f"Task(fn={self.fn}, "
|
||||
f"remaining_delay={self.remaining_delay} "
|
||||
f"future={self.future})")
|
||||
|
||||
def __init__(self, delay: float, executor: Executor):
|
||||
"""Initialize the wrapper
|
||||
|
||||
:param delay: delay length in seconds
|
||||
:param executor: executor object to run each task. It supports both
|
||||
native threading and eventlet based executors.
|
||||
"""
|
||||
|
||||
self._queue: queue.Queue = queue.Queue()
|
||||
self._executor = executor
|
||||
self._delay = delay
|
||||
self._shutdown = threading.Condition()
|
||||
self._shutdown_requested = False
|
||||
self._sentinel = self.Task(0, lambda: None, (), {})
|
||||
# We are intentionally not running our _run() in the executor
|
||||
# as we cannot assume that the executor has more than one worker
|
||||
# and our logic never finishes so it would consume one worker
|
||||
# constantly.
|
||||
self._thread = threading.Thread(target=self._run)
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _log(msg, *args):
|
||||
LOG.debug(msg, *args)
|
||||
|
||||
@staticmethod
|
||||
def _task_wrapper(task) -> None:
|
||||
"""This wraps the original task so when it finishes in the real
|
||||
executor the result of the task can be copied to the Future object
|
||||
already returned to our caller from submit_with_delay(). So
|
||||
the caller can get the result or exception from the task.
|
||||
"""
|
||||
try:
|
||||
task.future.set_result(task.fn(*task.args, **task.kwargs))
|
||||
except BaseException as e:
|
||||
task.future.set_exception(e)
|
||||
|
||||
def _wait_for_deadline_then_set_running(self, task) -> bool:
|
||||
"""Waiting for the task's deadline then mark it running
|
||||
|
||||
Wait can be interrupted by shutdown of the wrapper in such a case
|
||||
the task's state is checked. If the task is cancelled then return
|
||||
immediately with False. If the task is not cancelled then wait for
|
||||
its deadline and eventually return True if the task is still not
|
||||
cancelled.
|
||||
|
||||
If True is returned the future in the task is also atomically set to
|
||||
running state and the future cannot be cancelled anymore.
|
||||
"""
|
||||
if task.remaining_delay <= 0:
|
||||
return task.future.set_running_or_notify_cancel()
|
||||
|
||||
self._log("Waitig for the deadline of %s", task)
|
||||
with self._shutdown:
|
||||
shutdown = self._shutdown.wait_for(
|
||||
lambda: self._shutdown_requested, task.remaining_delay)
|
||||
|
||||
if shutdown:
|
||||
self._log(
|
||||
"Shutdown is requested while waiting "
|
||||
"for the deadline of %s", task)
|
||||
|
||||
if task.future.cancelled():
|
||||
return False
|
||||
|
||||
self._log(
|
||||
"%s is not cancelled so still waiting for its "
|
||||
"deadline", task)
|
||||
if task.remaining_delay > 0:
|
||||
# Blocking here is fine as we have the assumption that
|
||||
# no new task can arrive that has a deadline that is sooner
|
||||
# than the deadline of the oldest task in the queue due to
|
||||
# our static delay (and we assume that time travel is not
|
||||
# allowed).
|
||||
time.sleep(task.remaining_delay)
|
||||
|
||||
return task.future.set_running_or_notify_cancel()
|
||||
|
||||
def _run(self):
|
||||
while True:
|
||||
self._log("Waiting for the next task")
|
||||
task: StaticallyDelayingCancellableTaskExecutorWrapper.Task = (
|
||||
self._queue.get())
|
||||
self._log("Received %s", task)
|
||||
|
||||
if task is self._sentinel:
|
||||
# We are asked to terminate so exit the loop.
|
||||
self._queue.task_done()
|
||||
self._log("Sentinel received, thread is exiting")
|
||||
return
|
||||
|
||||
if task.future.cancelled():
|
||||
# The task was cancelled while it was in the queue.
|
||||
# We don't need to run it. Just move on to the next task
|
||||
self._log("%s was cancelled while queued, skipping", task)
|
||||
self._queue.task_done()
|
||||
continue
|
||||
|
||||
# The task is still valid, wait for its deadline
|
||||
run_it = self._wait_for_deadline_then_set_running(task)
|
||||
if not run_it:
|
||||
# The task was cancelled during the delay period.
|
||||
# We don't need to run it. Just move on to the next task.
|
||||
self._log(
|
||||
"%s was cancelled during its delay period, skipping", task)
|
||||
self._queue.task_done()
|
||||
continue
|
||||
|
||||
# Push the task to the real executor. We don't need to wait
|
||||
# for the result here as the client can do that
|
||||
# via the task.future we already returned from submit_with_delay()
|
||||
try:
|
||||
self._executor.submit(self._task_wrapper, task)
|
||||
except BaseException as e:
|
||||
# If for any reason we cannot submit a task then we should not
|
||||
# let the exception escape as that will prevent our thread
|
||||
# to terminate cleanly. Instead, we log and propagate back.
|
||||
LOG.exception(
|
||||
"Failed to submit %s to executor %s", task, self._executor)
|
||||
task.future.set_exception(e)
|
||||
self._queue.task_done()
|
||||
continue
|
||||
|
||||
self._log("%s submitted to %s", task, self._executor)
|
||||
# The task is not done from the executor perspective, but it is
|
||||
# done from the _run() logic perspective. Signal it, so shutdown()
|
||||
# can use Queue.join()
|
||||
self._queue.task_done()
|
||||
|
||||
def submit_with_delay(
|
||||
self, fn: ty.Callable[..., ty.Any], *args: ty.Any, **kwargs: ty.Any
|
||||
) -> futurist.Future:
|
||||
"""Submit work with delay."""
|
||||
# We need this wide locking as we don't want to queue a task behind
|
||||
# the sentinel as that task will never be processed.
|
||||
with self._shutdown:
|
||||
if self._shutdown_requested:
|
||||
raise RuntimeError(
|
||||
"Cannot schedule new tasks after being shutdown")
|
||||
|
||||
task = self.Task(self._delay, fn, args, kwargs)
|
||||
self._queue.put(task)
|
||||
self._log("Queued %s", task)
|
||||
return task.future
|
||||
|
||||
def shutdown(self, wait: bool = True):
|
||||
"""Shutdown the executor"""
|
||||
with self._shutdown:
|
||||
if not self._shutdown_requested:
|
||||
# Ensure that our thread wakes at least one more time to allow
|
||||
# it to exit by queuing up a sentinel task after the shutdown
|
||||
# condition is set. This task won't be executed.
|
||||
self._queue.put(self._sentinel)
|
||||
self._log("Sentinel is queued")
|
||||
self._shutdown_requested = True
|
||||
self._shutdown.notify_all()
|
||||
self._log("Shutdown is set")
|
||||
|
||||
# If wait is set we need to wait for our sentinel to be processed and
|
||||
# therefore our thread to exit.
|
||||
# NOTE(gibi): We are intentionally not shutting down the real executor
|
||||
# as we are not the one created it or owning it so it might be shared
|
||||
# between different callers.
|
||||
if wait:
|
||||
self._queue.join()
|
||||
self._log("Queue joined")
|
||||
self._thread.join()
|
||||
self._log("Scheduler thread joined")
|
||||
|
||||
@property
|
||||
def is_alive(self) -> bool:
|
||||
return self._thread.is_alive()
|
||||
|
||||
+16
-14
@@ -346,7 +346,9 @@ class Host(object):
|
||||
# STARTED events are sent. To prevent shutting
|
||||
# down the domain during a reboot, delay the
|
||||
# STOPPED lifecycle event some seconds.
|
||||
self._lifecycle_delay = 15
|
||||
self._delayed_executor = (
|
||||
utils.StaticallyDelayingCancellableTaskExecutorWrapper(
|
||||
delay=15, executor=utils._get_default_executor()))
|
||||
|
||||
self._initialized = False
|
||||
self._libvirt_proxy_classes = self._get_libvirt_proxy_classes(libvirt)
|
||||
@@ -542,29 +544,29 @@ class Host(object):
|
||||
|
||||
def _event_emit_delayed(self, event):
|
||||
"""Emit events - possibly delayed."""
|
||||
def event_cleanup(event):
|
||||
"""Callback function for greenthread. Called
|
||||
to cleanup the _events_delayed dictionary when an event
|
||||
was called.
|
||||
"""
|
||||
self._events_delayed.pop(event.uuid, None)
|
||||
|
||||
# Cleanup possible delayed stop events.
|
||||
if event.uuid in self._events_delayed.keys():
|
||||
# Cancel possible delayed stop events when we received any other
|
||||
# event for the same domain.
|
||||
if (isinstance(event, virtevent.LifecycleEvent) and
|
||||
event.uuid in self._events_delayed.keys()
|
||||
):
|
||||
self._events_delayed[event.uuid].cancel()
|
||||
self._events_delayed.pop(event.uuid, None)
|
||||
LOG.debug("Removed pending event for %s due to event", event.uuid)
|
||||
LOG.debug(
|
||||
"Removed pending STOPPED event for %s due to new event %s",
|
||||
event.uuid, event)
|
||||
|
||||
if (isinstance(event, virtevent.LifecycleEvent) and
|
||||
event.transition == virtevent.EVENT_LIFECYCLE_STOPPED):
|
||||
# Delay STOPPED event, as they may be followed by a STARTED
|
||||
# event in case the instance is rebooting
|
||||
id_ = utils.spawn_after(
|
||||
self._lifecycle_delay, self._event_emit, event)
|
||||
self._events_delayed[event.uuid] = id_
|
||||
future = self._delayed_executor.submit_with_delay(
|
||||
self._event_emit, event)
|
||||
self._events_delayed[event.uuid] = future
|
||||
# add callback to cleanup self._events_delayed dict after
|
||||
# event was called
|
||||
id_.add_done_callback(lambda _: event_cleanup(event))
|
||||
future.add_done_callback(
|
||||
lambda _: self._events_delayed.pop(event.uuid, None))
|
||||
else:
|
||||
self._event_emit(event)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user