From a9b8c4aee17c2ac468a137979599bc2024778925 Mon Sep 17 00:00:00 2001 From: Balazs Gibizer Date: Thu, 22 Jan 2026 09:37:13 +0100 Subject: [PATCH] Prevent leaking RPC poller thread between tests When a unit test create a nova.service.Service() and calls start() on it, nova starts an RPC server with the fake oslo_messaging implementation. That implementation uses a thread / greenthread to poll for messages. If the RPC server is not stopped directly or via Service.stop() at the end of the test case then that poller thread is remains running during any subsequent test. This can cause interference between test case. This patch adds a fixture that tracks the started poller threads and fail the test case if the poller is still active at the end of the test case. As a consequence a set of test cases needed fixes to remove the leak. Change-Id: I92dc4ad09c77fd0a9e0bb263d355d9b0204be790 Signed-off-by: Balazs Gibizer --- nova/test.py | 1 + nova/tests/fixtures/nova.py | 33 +++++++++++++++++++++++++++++++++ nova/tests/unit/test_service.py | 2 ++ nova/tests/unit/test_test.py | 1 + 4 files changed, 37 insertions(+) diff --git a/nova/test.py b/nova/test.py index 27ef979d9a..3e5aaec5df 100644 --- a/nova/test.py +++ b/nova/test.py @@ -229,6 +229,7 @@ class TestCase(base.BaseTestCase): # happens only in the RPCFixture CONF.set_default('driver', ['test'], group='oslo_messaging_notifications') + self.useFixture(nova_fixtures.RPCPollerCleanupFixture()) # NOTE(danms): Make sure to reset us back to non-remote objects # for each test to avoid interactions. Also, backup the object diff --git a/nova/tests/fixtures/nova.py b/nova/tests/fixtures/nova.py index 0d9d68229e..01a073db6d 100644 --- a/nova/tests/fixtures/nova.py +++ b/nova/tests/fixtures/nova.py @@ -26,6 +26,7 @@ import logging as std_logging import os import sys import time +import traceback from unittest import mock import warnings @@ -2181,3 +2182,35 @@ class UnifiedLimitsFixture(fixtures.Fixture): pl.region_id = attrs.get('region_id') pl.service_id = attrs.get('service_id') self.limits_list.append(pl) + + +class RPCPollerCleanupFixture(fixtures.Fixture): + def setUp(self): + super().setUp() + orig_start = ( + messaging._drivers.base.PollStyleListenerAdapter.start) + + def wrapped_start(_self, *args, **kwargs): + stack = "".join(traceback.format_stack()) + self.addCleanup(lambda: self._check_listener_stopped(_self, stack)) + return orig_start(_self, *args, **kwargs) + + self.useFixture( + fixtures.MonkeyPatch( + 'oslo_messaging._drivers.base.' + 'PollStyleListenerAdapter.start', + wrapped_start)) + + @staticmethod + def _check_listener_stopped( + listener: messaging._drivers.base.PollStyleListenerAdapter, + stack: str, + ): + if listener._started: + raise RuntimeError( + 'The test case leaked an active oslo_messaging poller thread. ' + 'This can lead to unexpected failures in later test case. ' + 'Please stop the RPC server or the nova.service.Service ' + 'instance in your test case e.g. by using ' + 'self.addCleanup(...). The test started the poller at the ' + 'following place:\n%s' % stack) diff --git a/nova/tests/unit/test_service.py b/nova/tests/unit/test_service.py index 0a7aed2458..151a8901cd 100644 --- a/nova/tests/unit/test_service.py +++ b/nova/tests/unit/test_service.py @@ -124,6 +124,7 @@ class ServiceTestCase(test.NoDBTestCase): self.binary, self.topic, 'nova.tests.unit.test_service.FakeManager') + self.addCleanup(serv.stop) serv.manager = mock_manager serv.manager.service_name = self.topic serv.manager.additional_endpoints = [] @@ -165,6 +166,7 @@ class ServiceTestCase(test.NoDBTestCase): serv = service.Service(self.host, self.binary, self.topic, 'nova.tests.unit.test_service.FakeManager') + self.addCleanup(serv.stop) serv.start() # test service version got updated and saved: diff --git a/nova/tests/unit/test_test.py b/nova/tests/unit/test_test.py index caf3f0b988..4757115aa6 100644 --- a/nova/tests/unit/test_test.py +++ b/nova/tests/unit/test_test.py @@ -61,6 +61,7 @@ class IsolationTestCase(test.TestCase): server = rpc.get_server(messaging.Target(topic='compute', server=CONF.host), endpoints=[NeverCalled()]) + self.addCleanup(server.stop) server.start()