Prevent leaking RPC poller thread between tests

When a unit test create a nova.service.Service() and calls start() on
it, nova starts an RPC server with the fake oslo_messaging
implementation. That implementation uses a thread / greenthread to poll
for messages. If the RPC server is not stopped directly or via
Service.stop() at the end of the test case then that poller thread is
remains running during any subsequent test. This can cause interference
between test case.

This patch adds a fixture that tracks the started poller threads and
fail the test case if the poller is still active at the end of the test
case. As a consequence a set of test cases needed fixes to remove the
leak.

Change-Id: I92dc4ad09c77fd0a9e0bb263d355d9b0204be790
Signed-off-by: Balazs Gibizer <gibi@redhat.com>
This commit is contained in:
Balazs Gibizer
2026-01-22 09:37:13 +01:00
parent 637f66f677
commit a9b8c4aee1
4 changed files with 37 additions and 0 deletions
+1
View File
@@ -229,6 +229,7 @@ class TestCase(base.BaseTestCase):
# happens only in the RPCFixture
CONF.set_default('driver', ['test'],
group='oslo_messaging_notifications')
self.useFixture(nova_fixtures.RPCPollerCleanupFixture())
# NOTE(danms): Make sure to reset us back to non-remote objects
# for each test to avoid interactions. Also, backup the object
+33
View File
@@ -26,6 +26,7 @@ import logging as std_logging
import os
import sys
import time
import traceback
from unittest import mock
import warnings
@@ -2181,3 +2182,35 @@ class UnifiedLimitsFixture(fixtures.Fixture):
pl.region_id = attrs.get('region_id')
pl.service_id = attrs.get('service_id')
self.limits_list.append(pl)
class RPCPollerCleanupFixture(fixtures.Fixture):
def setUp(self):
super().setUp()
orig_start = (
messaging._drivers.base.PollStyleListenerAdapter.start)
def wrapped_start(_self, *args, **kwargs):
stack = "".join(traceback.format_stack())
self.addCleanup(lambda: self._check_listener_stopped(_self, stack))
return orig_start(_self, *args, **kwargs)
self.useFixture(
fixtures.MonkeyPatch(
'oslo_messaging._drivers.base.'
'PollStyleListenerAdapter.start',
wrapped_start))
@staticmethod
def _check_listener_stopped(
listener: messaging._drivers.base.PollStyleListenerAdapter,
stack: str,
):
if listener._started:
raise RuntimeError(
'The test case leaked an active oslo_messaging poller thread. '
'This can lead to unexpected failures in later test case. '
'Please stop the RPC server or the nova.service.Service '
'instance in your test case e.g. by using '
'self.addCleanup(...). The test started the poller at the '
'following place:\n%s' % stack)
+2
View File
@@ -124,6 +124,7 @@ class ServiceTestCase(test.NoDBTestCase):
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager')
self.addCleanup(serv.stop)
serv.manager = mock_manager
serv.manager.service_name = self.topic
serv.manager.additional_endpoints = []
@@ -165,6 +166,7 @@ class ServiceTestCase(test.NoDBTestCase):
serv = service.Service(self.host, self.binary, self.topic,
'nova.tests.unit.test_service.FakeManager')
self.addCleanup(serv.stop)
serv.start()
# test service version got updated and saved:
+1
View File
@@ -61,6 +61,7 @@ class IsolationTestCase(test.TestCase):
server = rpc.get_server(messaging.Target(topic='compute',
server=CONF.host),
endpoints=[NeverCalled()])
self.addCleanup(server.stop)
server.start()