From 82fd8ffdced6e976083c5066c5a88a5d7cec474d Mon Sep 17 00:00:00 2001 From: Ghanshyam Maan Date: Fri, 30 Jan 2026 11:38:46 -0800 Subject: [PATCH] Add 2nd RPC server for compute service For the compute service graceful shutdown, we need two RPC servers. 1st RPC server will used for the new requests and 2nd for completing the in-progress tasks. The 2nd RPC server will use the same transport bus and same endpoint (compute manager instance) but listen to the different topic then 1st RPC server. By having two different topics, other service (API, conductor, or compute) can make difference on which topic they want to send the RPC request to the compute service. That will be done via RPC client sending the request to specific topic. This change stop both RPC servers but later in this series we will keep the 2nd RPC server active so that compute service can listen to the in-progress tasks required communication coming from other services. The next change in this series will use this 2nd RPC server. The tasks (compute RPC client methods) who needs to be using this 2nd RPC server will be modified in the next change. Partial implement blueprint nova-services-graceful-shutdown-part1 Change-Id: I26656869f00efe6d89d993000dcf2e91683a217e Signed-off-by: Ghanshyam Maan --- nova/cmd/compute.py | 6 ++-- nova/compute/rpcapi.py | 8 +++++ nova/service.py | 55 +++++++++++++++++++++++++++++---- nova/tests/unit/test_service.py | 48 ++++++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 8 deletions(-) diff --git a/nova/cmd/compute.py b/nova/cmd/compute.py index 9007d4b79b..04636ba7c4 100644 --- a/nova/cmd/compute.py +++ b/nova/cmd/compute.py @@ -59,8 +59,10 @@ def main(): nova.db.main.api.DISABLE_DB_ACCESS = True objects_base.NovaObject.indirection_api = conductor_rpcapi.ConductorAPI() objects.Service.enable_min_version_cache() - server = service.Service.create(binary='nova-compute', - topic=compute_rpcapi.RPC_TOPIC) + server = service.Service.create( + binary='nova-compute', + topic=compute_rpcapi.RPC_TOPIC, + topic_alt=compute_rpcapi.RPC_TOPIC_ALT) # Compute service should never fork worker processes service.serve(server, workers=1, no_fork=True) service.wait() diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index 79cbc2ab61..c7c2797aef 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -32,6 +32,14 @@ from nova import rpc CONF = nova.conf.CONF RPC_TOPIC = "compute" +# NOTE(gmaan): The compute service creates two rpc servers which means each +# compute service worker will be listening on two topic queues (1. 'compute' +# 2. 'compute-alt'). The 'compute-alt' rpc server is used to handle the +# graceful shutdown of compute service. During graceful shutdown, 'compute' +# rpc server will be stopped but 'compute-alt' rpc server will be active for +# finishing the ongoing operations. The 'compute-alt' topic is supposed to be +# used in the rpc call/cast which are used to finish the ongoing operations. +RPC_TOPIC_ALT = "compute-alt" LOG = logging.getLogger(__name__) LAST_VERSION = None diff --git a/nova/service.py b/nova/service.py index b307fb18d9..946be7c993 100644 --- a/nova/service.py +++ b/nova/service.py @@ -97,11 +97,19 @@ class Service(service.Service): def __init__(self, host, binary, topic, manager, report_interval=None, periodic_enable=None, periodic_fuzzy_delay=None, - periodic_interval_max=None, *args, **kwargs): + periodic_interval_max=None, topic_alt=None, + *args, **kwargs): super(Service, self).__init__() self.host = host self.binary = binary self.topic = topic + # NOTE(gmaan): If any service would like to create a 2nd rpc server, + # then it needs to be created with different topic (topic_alt) so that + # oslo.messaging creates the different RPC objects (for example, + # dispatcher, consumers, rabbitmq queue, amqp listener, kombu + # connection etc). The endpoint (manager) stay same so that same + # manager will be serving the both rpc servers. + self.topic_alt = topic_alt self.manager_class_name = manager self.servicegroup_api = servicegroup.API() manager_class = importutils.import_class(self.manager_class_name) @@ -174,8 +182,6 @@ class Service(service.Service): if self.backdoor_port is not None: self.manager.backdoor_port = self.backdoor_port - LOG.debug("Creating RPC server for service %s", self.topic) - target = messaging.Target(topic=self.topic, server=self.host) endpoints = [ @@ -186,9 +192,30 @@ class Service(service.Service): serializer = objects_base.NovaObjectSerializer() + LOG.debug("Creating RPC server for service: %s on topic: %s", + self.binary, self.topic) self.rpcserver = rpc.get_server(target, endpoints, serializer) self.rpcserver.start() + self.rpcserver_alt = None + # NOTE(gmaan): Only compute service creates the two rpcservers which + # means each compute service will create two rabiitmq queues bound with + # same exchange but on two different topics (1. 'compute' + # 2. 'compute-alt'). + # The main use case for 2nd rpcserver is graceful shutdown of compute + # service. During graceful shutdown, the compute service will stop + # listening to the new request (stop listening on 'compute' rpcserver) + # but continue listening to the 'compute-alt' rpcserver so that it can + # finish all their ongoing operations. + if self.topic_alt is not None: + LOG.debug("Creating 2nd RPC server for service: %s on topic: %s", + self.binary, self.topic_alt) + target_alt = messaging.Target( + topic=self.topic_alt, server=self.host) + self.rpcserver_alt = rpc.get_server( + target_alt, endpoints, serializer) + self.rpcserver_alt.start() + self.manager.post_start_hook() LOG.debug("Join ServiceGroup membership for this service %s", @@ -214,7 +241,8 @@ class Service(service.Service): @classmethod def create(cls, host=None, binary=None, topic=None, manager=None, report_interval=None, periodic_enable=None, - periodic_fuzzy_delay=None, periodic_interval_max=None): + periodic_fuzzy_delay=None, periodic_interval_max=None, + topic_alt=None): """Instantiates class and passes back application object. :param host: defaults to CONF.host @@ -225,6 +253,7 @@ class Service(service.Service): :param periodic_enable: defaults to CONF.periodic_enable :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay :param periodic_interval_max: if set, the max time to wait between runs + :param topic_alt: defaults to None """ if not host: @@ -246,7 +275,8 @@ class Service(service.Service): report_interval=report_interval, periodic_enable=periodic_enable, periodic_fuzzy_delay=periodic_fuzzy_delay, - periodic_interval_max=periodic_interval_max) + periodic_interval_max=periodic_interval_max, + topic_alt=topic_alt) # NOTE(gibi): This have to be after the service object creation as # that is the point where we can safely use the RPC to the conductor. @@ -281,9 +311,22 @@ class Service(service.Service): def stop(self): """stop the service and clean up.""" try: + LOG.debug('%s service stopping RPC server on topic: %s', + self.binary, self.topic) self.rpcserver.stop() self.rpcserver.wait() - except Exception: + LOG.debug('%s service stopped RPC server on topic: %s', + self.binary, self.topic) + if self.rpcserver_alt is not None: + LOG.debug('%s service stopping the 2nd RPC server on ' + 'topic: %s', self.binary, self.topic_alt) + self.rpcserver_alt.stop() + self.rpcserver_alt.wait() + LOG.debug('%s service stopped the 2nd RPC server on ' + 'topic: %s', self.binary, self.topic_alt) + except Exception as exc: + LOG.exception('Service error occurred during RPC server ' + 'stop & wait, Error: %s', str(exc)) pass try: diff --git a/nova/tests/unit/test_service.py b/nova/tests/unit/test_service.py index 151a8901cd..91fabb4a3d 100644 --- a/nova/tests/unit/test_service.py +++ b/nova/tests/unit/test_service.py @@ -22,6 +22,7 @@ import os.path from unittest import mock from oslo_config import cfg +import oslo_messaging as messaging from oslo_service import service as _service from nova import exception @@ -265,6 +266,53 @@ class ServiceTestCase(test.NoDBTestCase): serv.rpcserver.start.assert_called_once_with() serv.rpcserver.stop.assert_called_once_with() serv.rpcserver.wait.assert_called_once_with() + self.assertIsNone(serv.rpcserver_alt) + self.assertEqual(mock_rpc.call_count, 1) + + @mock.patch('nova.objects.service.Service.get_by_host_and_binary') + @mock.patch.object(rpc, 'TRANSPORT') + @mock.patch.object(messaging, 'get_rpc_server') + def test_service_with_two_rpcservers( + self, mock_get, mock_TRANSPORT, mock_svc_get): + topic_alt = 'fake_alt' + fake_manager = 'nova.tests.unit.test_service.FakeManager' + serv = service.Service(self.host, + self.binary, + self.topic, + fake_manager, + topic_alt=topic_alt) + serv.start() + serv.stop() + target = messaging.Target(topic=self.topic, server=self.host) + target_alt = messaging.Target(topic=topic_alt, server=self.host) + + # Check the two calls to oslo.messasing get_rpc_server() + # with different target + self.assertEqual(mock_get.call_count, 2) + mock_get.assert_any_call(mock_TRANSPORT, target, mock.ANY, + executor=mock.ANY, serializer=mock.ANY, + access_policy=mock.ANY) + mock_get.assert_any_call(mock_TRANSPORT, target_alt, mock.ANY, + executor=mock.ANY, serializer=mock.ANY, + access_policy=mock.ANY) + self.assertIsNotNone(serv.rpcserver) + self.assertIsNotNone(serv.rpcserver_alt) + + @mock.patch('nova.objects.service.Service.get_by_host_and_binary') + @mock.patch.object(messaging.rpc.server.RPCServer, '_create_listener') + def test_service_with_two_rpc_topics_get_two_different_rpcservers( + self, mock_listner, mock_svc_get): + topic_alt = 'fake_alt' + fake_manager = 'nova.tests.unit.test_service.FakeManager' + serv = service.Service(self.host, + self.binary, + self.topic, + fake_manager, + topic_alt=topic_alt) + serv.start() + self.assertIsNotNone(serv.rpcserver) + self.assertIsNotNone(serv.rpcserver_alt) + self.assertNotEqual(serv.rpcserver, serv.rpcserver_alt) def test_reset(self): serv = service.Service(self.host,