From 8017b721fda5b0a3e6383d730f7017a0f407b419 Mon Sep 17 00:00:00 2001 From: Balazs Gibizer Date: Thu, 29 Jan 2026 10:15:39 +0100 Subject: [PATCH] Cleanup libvirt driver at service stop As libvirt driver's Host object has a new headless thread we need to make sure that thread is exiting cleanly when nova-compute is being stopped. Also at the same time we make sure our unit tests are not leaking such thread across test cases with a new fixture and fixes in the test code. Change-Id: Ide274d6caa3314f9d25d51d1f72850cf77c9dee4 Signed-off-by: Balazs Gibizer --- nova/test.py | 1 + nova/tests/fixtures/nova.py | 33 +++++++++++ nova/tests/unit/test_utils.py | 4 ++ nova/tests/unit/virt/libvirt/test_driver.py | 6 ++ nova/tests/unit/virt/libvirt/test_host.py | 66 +++++++++++++++++---- nova/tests/unit/virt/test_virt_drivers.py | 4 ++ nova/virt/libvirt/driver.py | 5 ++ nova/virt/libvirt/host.py | 26 ++++++-- 8 files changed, 131 insertions(+), 14 deletions(-) diff --git a/nova/test.py b/nova/test.py index 98c9978a11..ddbbd33c73 100644 --- a/nova/test.py +++ b/nova/test.py @@ -197,6 +197,7 @@ class TestCase(base.BaseTestCase): self.useFixture(nova_fixtures.OpenStackSDKFixture()) self.useFixture(nova_fixtures.IsolatedExecutorFixture(self.id())) + self.useFixture(nova_fixtures.DelayingExecutorWrapperCleanupFixture()) self.useFixture(log_fixture.get_logging_handle_error_fixture()) diff --git a/nova/tests/fixtures/nova.py b/nova/tests/fixtures/nova.py index 01a073db6d..2653e0e5ff 100644 --- a/nova/tests/fixtures/nova.py +++ b/nova/tests/fixtures/nova.py @@ -2214,3 +2214,36 @@ class RPCPollerCleanupFixture(fixtures.Fixture): 'instance in your test case e.g. by using ' 'self.addCleanup(...). The test started the poller at the ' 'following place:\n%s' % stack) + + +class DelayingExecutorWrapperCleanupFixture(fixtures.Fixture): + def setUp(self): + super().setUp() + + orig = utils.StaticallyDelayingCancellableTaskExecutorWrapper.__init__ + + def wrapped_init(executor_wrapper, delay, executor): + stack = "".join(traceback.format_stack()) + self.addCleanup( + self._check_wrapper_stopped, executor_wrapper, stack) + orig(executor_wrapper, delay, executor) + + self.useFixture( + fixtures.MonkeyPatch( + 'nova.utils.StaticallyDelayingCancellableTaskExecutorWrapper.' + '__init__', wrapped_init)) + + @staticmethod + def _check_wrapper_stopped( + wrapper: utils.StaticallyDelayingCancellableTaskExecutorWrapper, + stack: str, + ): + if wrapper.is_alive: + raise RuntimeError( + 'The test case leaked an active ' + 'nova.utils.StaticallyDelayingCancellableTaskExecutorWrapper' + 'instance. This can lead to unexpected failures in later test ' + 'case. Please ensure that shutdown(wait=true) is called on ' + 'the wrapper before the end of the test case e.g. by using ' + 'self.addCleanup(...). The test instantiated the wrapper at ' + 'the following place:\n%s' % stack) diff --git a/nova/tests/unit/test_utils.py b/nova/tests/unit/test_utils.py index 1d71693cab..46c109922d 100644 --- a/nova/tests/unit/test_utils.py +++ b/nova/tests/unit/test_utils.py @@ -2030,6 +2030,7 @@ class StaticallyDelayingCancellableTaskExecutorWrapperTest(test.NoDBTestCase): executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( 0.01, utils._get_default_executor()) + self.addCleanup(executor.shutdown, wait=True) future1 = executor.submit_with_delay(task1) self.assertEqual(42, future1.result()) @@ -2056,6 +2057,7 @@ class StaticallyDelayingCancellableTaskExecutorWrapperTest(test.NoDBTestCase): # the wrapper while we submit the second task executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( 2, utils._get_default_executor()) + self.addCleanup(executor.shutdown, wait=True) future1 = executor.submit_with_delay(task1) task1_start = time.monotonic() @@ -2147,6 +2149,7 @@ class StaticallyDelayingCancellableTaskExecutorWrapperTest(test.NoDBTestCase): # the wrapper when we cancel it executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( 2, utils._get_default_executor()) + self.addCleanup(executor.shutdown, wait=True) future1 = executor.submit_with_delay(task1) # wait a bit to let the task being picked up @@ -2220,6 +2223,7 @@ class StaticallyDelayingCancellableTaskExecutorWrapperTest(test.NoDBTestCase): def test_instantaneous_shutdown(self): executor = utils.StaticallyDelayingCancellableTaskExecutorWrapper( 0.1, utils._get_default_executor()) + self.addCleanup(executor.shutdown, wait=True) executor.shutdown(wait=False) self.assertTrue(executor._shutdown) diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index a3568d3d2d..9d7698282a 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -23296,6 +23296,12 @@ class LibvirtConnTestCase(test.NoDBTestCase, expected = '10' self.assertXmlEqual(expected, cfg.features[2].to_xml()) + @mock.patch("nova.virt.libvirt.host.Host.cleanup") + def test_cleanup_host(self, mock_cleanup): + drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True) + drvr.cleanup_host("fake") + mock_cleanup.assert_called_once_with() + class TestGuestConfigSysinfoSerialOS(test.NoDBTestCase): def setUp(self): diff --git a/nova/tests/unit/virt/libvirt/test_host.py b/nova/tests/unit/virt/libvirt/test_host.py index 62a2f4f4b4..640034cd4d 100644 --- a/nova/tests/unit/virt/libvirt/test_host.py +++ b/nova/tests/unit/virt/libvirt/test_host.py @@ -73,7 +73,21 @@ class HostTestCase(test.NoDBTestCase): super(HostTestCase, self).setUp() self.useFixture(nova_fixtures.LibvirtFixture()) - self.host = host.Host("qemu:///system") + self.host = self._create_host("qemu:///system") + + def _create_host(self, *args, **kwargs): + h = host.Host(*args, **kwargs) + self.addCleanup(h.cleanup) + # We need this explicitly as the majority of the test cases using + # a Host object does not care about the event handling thread, and + # it is disabled and poisoned. But test cases in this class trying + # to cover code there. So we partially instantiate the event handling + # logic here and pumping the events in the test cases individually. + h._delayed_executor = ( + utils.StaticallyDelayingCancellableTaskExecutorWrapper( + delay=15, executor=utils._get_default_executor())) + + return h def test_repeat_initialization(self): for i in range(3): @@ -134,8 +148,10 @@ class HostTestCase(test.NoDBTestCase): def handler(event): got_events.append(event) - hostimpl = host.Host("qemu:///system", + hostimpl = self._create_host("qemu:///system", lifecycle_event_handler=handler) + hostimpl.initialize() + got_events = [] event1 = event.LifecycleEvent( @@ -173,7 +189,7 @@ class HostTestCase(test.NoDBTestCase): @mock.patch('nova.virt.libvirt.host.Host._event_emit_delayed') def test_event_lifecycle(self, mock_emit): - hostimpl = host.Host("qemu:///system", + hostimpl = self._create_host("qemu:///system", lifecycle_event_handler=lambda e: None) conn = hostimpl.get_connection() @@ -283,8 +299,10 @@ class HostTestCase(test.NoDBTestCase): ev = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STOPPED) - hostimpl = host.Host( + hostimpl = self._create_host( 'qemu:///system', lifecycle_event_handler=lambda e: None) + hostimpl.initialize() + hostimpl._event_emit_delayed(ev) mock_wrapper.assert_called_once_with( delay=15, executor=utils._get_default_executor()) @@ -293,7 +311,7 @@ class HostTestCase(test.NoDBTestCase): @mock.patch('nova.utils.StaticallyDelayingCancellableTaskExecutorWrapper') def test_event_emit_delayed_call_delayed_pending(self, mock_wrapper): - hostimpl = host.Host( + hostimpl = self._create_host( 'qemu:///system', lifecycle_event_handler=lambda e: None) uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686" @@ -306,7 +324,7 @@ class HostTestCase(test.NoDBTestCase): self.assertIs(mock_future, hostimpl._events_delayed[uuid]) def test_event_delayed_cleanup(self): - hostimpl = host.Host( + hostimpl = self._create_host( 'qemu:///system', lifecycle_event_handler=lambda e: None) uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686" ev = event.LifecycleEvent( @@ -317,6 +335,25 @@ class HostTestCase(test.NoDBTestCase): gt_mock.cancel.assert_called_once_with() self.assertNotIn(uuid, hostimpl._events_delayed.keys()) + def test_host_cleanup_cancels_delayed_events(self): + hostimpl = self._create_host( + 'qemu:///system', lifecycle_event_handler=lambda e: None) + uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686" + ev = event.LifecycleEvent( + uuid, event.EVENT_LIFECYCLE_STOPPED) + + hostimpl._event_emit_delayed(ev) + + self.assertIn(uuid, hostimpl._events_delayed) + future = hostimpl._events_delayed[uuid] + self.assertFalse(future.cancelled()) + self.assertTrue(hostimpl._delayed_executor.is_alive) + + hostimpl.cleanup() + + self.assertTrue(future.cancelled()) + self.assertFalse(hostimpl._delayed_executor.is_alive) + def test_device_removed_event(self): hostimpl = mock.MagicMock() conn = mock.MagicMock() @@ -418,7 +455,7 @@ class HostTestCase(test.NoDBTestCase): @mock.patch.object(host.Host, "_connect") def test_conn_event(self, mock_conn): handler = mock.MagicMock() - h = host.Host("qemu:///system", conn_event_handler=handler) + h = self._create_host("qemu:///system", conn_event_handler=handler) h.get_connection() h._dispatch_conn_event() @@ -428,7 +465,7 @@ class HostTestCase(test.NoDBTestCase): @mock.patch.object(host.Host, "_connect") def test_conn_event_fail(self, mock_conn): handler = mock.MagicMock() - h = host.Host("qemu:///system", conn_event_handler=handler) + h = self._create_host("qemu:///system", conn_event_handler=handler) mock_conn.side_effect = fakelibvirt.libvirtError('test') self.assertRaises(exception.HypervisorUnavailable, h.get_connection) @@ -451,7 +488,7 @@ class HostTestCase(test.NoDBTestCase): @mock.patch.object(host.Host, "_connect") def test_conn_event_up_down(self, mock_conn, mock_test_conn): handler = mock.MagicMock() - h = host.Host("qemu:///system", conn_event_handler=handler) + h = self._create_host("qemu:///system", conn_event_handler=handler) mock_conn.side_effect = (mock.MagicMock(), fakelibvirt.libvirtError('test')) mock_test_conn.return_value = False @@ -473,7 +510,8 @@ class HostTestCase(test.NoDBTestCase): # This emulates LibvirtDriver._handle_conn_event def conn_event_handler(*args, **kwargs): event.set() - h = host.Host("qemu:///system", conn_event_handler=conn_event_handler) + h = self._create_host( + "qemu:///system", conn_event_handler=conn_event_handler) h.initialize() h.get_connection() @@ -2193,6 +2231,10 @@ class TestLibvirtSEV(test.NoDBTestCase): self.useFixture(nova_fixtures.LibvirtFixture()) self.host = host.Host("qemu:///system") + self.addCleanup(self.host.cleanup) + self.host._delayed_executor = ( + utils.StaticallyDelayingCancellableTaskExecutorWrapper( + delay=0.1, executor=utils._get_default_executor())) @ddt.ddt @@ -2360,6 +2402,10 @@ class LibvirtTpoolProxyTestCase(test.NoDBTestCase): self.useFixture(nova_fixtures.LibvirtFixture()) self.host = host.Host("qemu:///system") + self.host._delayed_executor = ( + utils.StaticallyDelayingCancellableTaskExecutorWrapper( + delay=0.1, executor=utils._get_default_executor())) + self.addCleanup(self.host.cleanup) def _stub_xml(uuid): return ("" diff --git a/nova/tests/unit/virt/test_virt_drivers.py b/nova/tests/unit/virt/test_virt_drivers.py index da0091470c..531b557d4b 100644 --- a/nova/tests/unit/virt/test_virt_drivers.py +++ b/nova/tests/unit/virt/test_virt_drivers.py @@ -92,6 +92,10 @@ class _FakeDriverBackendTestCase(object): 'nova.virt.libvirt.host.Host._conn_event_thread', lambda *args: None)) + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.host.Host._event_emit_delayed', + lambda *args: None)) + self.flags(rescue_image_id="2", rescue_kernel_id="3", rescue_ramdisk_id=None, diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 3c5de7aecd..59604b4b6b 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -1489,6 +1489,11 @@ class LibvirtDriver(driver.ComputeDriver): # conversion which will return value of type unicode. return uri and str(uri) + def cleanup_host(self, host): + """Clean up anything that is necessary for the driver gracefully stop. + """ + self._host.cleanup() + def instance_exists(self, instance): """Efficient override of base instance_exists method.""" try: diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 5266481a45..43961a64c8 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -346,9 +346,7 @@ class Host(object): # STARTED events are sent. To prevent shutting # down the domain during a reboot, delay the # STOPPED lifecycle event some seconds. - self._delayed_executor = ( - utils.StaticallyDelayingCancellableTaskExecutorWrapper( - delay=15, executor=utils._get_default_executor())) + self._delayed_executor = None self._initialized = False self._libvirt_proxy_classes = self._get_libvirt_proxy_classes(libvirt) @@ -560,7 +558,7 @@ class Host(object): event.transition == virtevent.EVENT_LIFECYCLE_STOPPED): # Delay STOPPED event, as they may be followed by a STARTED # event in case the instance is rebooting - future = self._delayed_executor.submit_with_delay( + future = self._delayed_executor.submit_with_delay( # type: ignore self._event_emit, event) self._events_delayed[event.uuid] = future # add callback to cleanup self._events_delayed dict after @@ -577,6 +575,9 @@ class Host(object): def _init_events(self): """Initializes the libvirt events subsystem. """ + self._delayed_executor = ( + utils.StaticallyDelayingCancellableTaskExecutorWrapper( + delay=15, executor=utils._get_default_executor())) self._event_handler.start() # This thread is just for async connection closed event handling. @@ -705,6 +706,23 @@ class Host(object): self._initialized = True + def cleanup(self): + # TODO(gibi): We might want to stop the _event_handler here as well + # as the _conn_event_thread loop to have a fully graceful and clean + # shutdown + + # If domain STOPPED events are delayed then the executor shutdown can + # take up to 15 seconds. Also as nova-compute is shutting down not + # having a power off synced to the DB does not matter much as during + # the next nova-compute startup the power states are synced anyhow. So + # lets cancel all the outstanding delayed events instead of waiting a + # lot. + for future in list(self._events_delayed.values()): + future.cancel() + + if self._delayed_executor: + self._delayed_executor.shutdown(wait=True) + def _version_check(self, lv_ver=None, hv_ver=None, hv_type=None, op=operator.lt): """Check libvirt version, hypervisor version, and hypervisor type