Add 2nd RPC server for compute service

For the compute service graceful shutdown, we need two RPC servers.
1st RPC server will used for the new requests and 2nd for completing
the in-progress tasks. The 2nd RPC server will use the same transport
bus and same endpoint (compute manager instance) but listen to the
different topic then 1st RPC server. By having two different topics,
other service (API, conductor, or compute) can make difference on
which topic they want to send the RPC request to the compute service.
That will be done via RPC client sending the request to specific
topic.

This change stop both RPC servers but later in this series we will
keep the 2nd RPC server active so that compute service can listen to
the in-progress tasks required communication coming from other
services.

The next change in this series will use this 2nd RPC server. The tasks
(compute RPC client methods) who needs to be using this 2nd RPC server
will be modified in the next change.

Partial implement blueprint nova-services-graceful-shutdown-part1

Change-Id: I26656869f00efe6d89d993000dcf2e91683a217e
Signed-off-by: Ghanshyam Maan <gmaan@ghanshyammann.com>
This commit is contained in:
Ghanshyam Maan
2026-01-30 11:38:46 -08:00
parent 59a7093915
commit 82fd8ffdce
4 changed files with 109 additions and 8 deletions
+4 -2
View File
@@ -59,8 +59,10 @@ def main():
nova.db.main.api.DISABLE_DB_ACCESS = True nova.db.main.api.DISABLE_DB_ACCESS = True
objects_base.NovaObject.indirection_api = conductor_rpcapi.ConductorAPI() objects_base.NovaObject.indirection_api = conductor_rpcapi.ConductorAPI()
objects.Service.enable_min_version_cache() objects.Service.enable_min_version_cache()
server = service.Service.create(binary='nova-compute', server = service.Service.create(
topic=compute_rpcapi.RPC_TOPIC) binary='nova-compute',
topic=compute_rpcapi.RPC_TOPIC,
topic_alt=compute_rpcapi.RPC_TOPIC_ALT)
# Compute service should never fork worker processes # Compute service should never fork worker processes
service.serve(server, workers=1, no_fork=True) service.serve(server, workers=1, no_fork=True)
service.wait() service.wait()
+8
View File
@@ -32,6 +32,14 @@ from nova import rpc
CONF = nova.conf.CONF CONF = nova.conf.CONF
RPC_TOPIC = "compute" RPC_TOPIC = "compute"
# NOTE(gmaan): The compute service creates two rpc servers which means each
# compute service worker will be listening on two topic queues (1. 'compute'
# 2. 'compute-alt'). The 'compute-alt' rpc server is used to handle the
# graceful shutdown of compute service. During graceful shutdown, 'compute'
# rpc server will be stopped but 'compute-alt' rpc server will be active for
# finishing the ongoing operations. The 'compute-alt' topic is supposed to be
# used in the rpc call/cast which are used to finish the ongoing operations.
RPC_TOPIC_ALT = "compute-alt"
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
LAST_VERSION = None LAST_VERSION = None
+49 -6
View File
@@ -97,11 +97,19 @@ class Service(service.Service):
def __init__(self, host, binary, topic, manager, report_interval=None, def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None, periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, *args, **kwargs): periodic_interval_max=None, topic_alt=None,
*args, **kwargs):
super(Service, self).__init__() super(Service, self).__init__()
self.host = host self.host = host
self.binary = binary self.binary = binary
self.topic = topic self.topic = topic
# NOTE(gmaan): If any service would like to create a 2nd rpc server,
# then it needs to be created with different topic (topic_alt) so that
# oslo.messaging creates the different RPC objects (for example,
# dispatcher, consumers, rabbitmq queue, amqp listener, kombu
# connection etc). The endpoint (manager) stay same so that same
# manager will be serving the both rpc servers.
self.topic_alt = topic_alt
self.manager_class_name = manager self.manager_class_name = manager
self.servicegroup_api = servicegroup.API() self.servicegroup_api = servicegroup.API()
manager_class = importutils.import_class(self.manager_class_name) manager_class = importutils.import_class(self.manager_class_name)
@@ -174,8 +182,6 @@ class Service(service.Service):
if self.backdoor_port is not None: if self.backdoor_port is not None:
self.manager.backdoor_port = self.backdoor_port self.manager.backdoor_port = self.backdoor_port
LOG.debug("Creating RPC server for service %s", self.topic)
target = messaging.Target(topic=self.topic, server=self.host) target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [ endpoints = [
@@ -186,9 +192,30 @@ class Service(service.Service):
serializer = objects_base.NovaObjectSerializer() serializer = objects_base.NovaObjectSerializer()
LOG.debug("Creating RPC server for service: %s on topic: %s",
self.binary, self.topic)
self.rpcserver = rpc.get_server(target, endpoints, serializer) self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start() self.rpcserver.start()
self.rpcserver_alt = None
# NOTE(gmaan): Only compute service creates the two rpcservers which
# means each compute service will create two rabiitmq queues bound with
# same exchange but on two different topics (1. 'compute'
# 2. 'compute-alt').
# The main use case for 2nd rpcserver is graceful shutdown of compute
# service. During graceful shutdown, the compute service will stop
# listening to the new request (stop listening on 'compute' rpcserver)
# but continue listening to the 'compute-alt' rpcserver so that it can
# finish all their ongoing operations.
if self.topic_alt is not None:
LOG.debug("Creating 2nd RPC server for service: %s on topic: %s",
self.binary, self.topic_alt)
target_alt = messaging.Target(
topic=self.topic_alt, server=self.host)
self.rpcserver_alt = rpc.get_server(
target_alt, endpoints, serializer)
self.rpcserver_alt.start()
self.manager.post_start_hook() self.manager.post_start_hook()
LOG.debug("Join ServiceGroup membership for this service %s", LOG.debug("Join ServiceGroup membership for this service %s",
@@ -214,7 +241,8 @@ class Service(service.Service):
@classmethod @classmethod
def create(cls, host=None, binary=None, topic=None, manager=None, def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_enable=None, report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None): periodic_fuzzy_delay=None, periodic_interval_max=None,
topic_alt=None):
"""Instantiates class and passes back application object. """Instantiates class and passes back application object.
:param host: defaults to CONF.host :param host: defaults to CONF.host
@@ -225,6 +253,7 @@ class Service(service.Service):
:param periodic_enable: defaults to CONF.periodic_enable :param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs :param periodic_interval_max: if set, the max time to wait between runs
:param topic_alt: defaults to None
""" """
if not host: if not host:
@@ -246,7 +275,8 @@ class Service(service.Service):
report_interval=report_interval, report_interval=report_interval,
periodic_enable=periodic_enable, periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay, periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max) periodic_interval_max=periodic_interval_max,
topic_alt=topic_alt)
# NOTE(gibi): This have to be after the service object creation as # NOTE(gibi): This have to be after the service object creation as
# that is the point where we can safely use the RPC to the conductor. # that is the point where we can safely use the RPC to the conductor.
@@ -281,9 +311,22 @@ class Service(service.Service):
def stop(self): def stop(self):
"""stop the service and clean up.""" """stop the service and clean up."""
try: try:
LOG.debug('%s service stopping RPC server on topic: %s',
self.binary, self.topic)
self.rpcserver.stop() self.rpcserver.stop()
self.rpcserver.wait() self.rpcserver.wait()
except Exception: LOG.debug('%s service stopped RPC server on topic: %s',
self.binary, self.topic)
if self.rpcserver_alt is not None:
LOG.debug('%s service stopping the 2nd RPC server on '
'topic: %s', self.binary, self.topic_alt)
self.rpcserver_alt.stop()
self.rpcserver_alt.wait()
LOG.debug('%s service stopped the 2nd RPC server on '
'topic: %s', self.binary, self.topic_alt)
except Exception as exc:
LOG.exception('Service error occurred during RPC server '
'stop & wait, Error: %s', str(exc))
pass pass
try: try:
+48
View File
@@ -22,6 +22,7 @@ import os.path
from unittest import mock from unittest import mock
from oslo_config import cfg from oslo_config import cfg
import oslo_messaging as messaging
from oslo_service import service as _service from oslo_service import service as _service
from nova import exception from nova import exception
@@ -265,6 +266,53 @@ class ServiceTestCase(test.NoDBTestCase):
serv.rpcserver.start.assert_called_once_with() serv.rpcserver.start.assert_called_once_with()
serv.rpcserver.stop.assert_called_once_with() serv.rpcserver.stop.assert_called_once_with()
serv.rpcserver.wait.assert_called_once_with() serv.rpcserver.wait.assert_called_once_with()
self.assertIsNone(serv.rpcserver_alt)
self.assertEqual(mock_rpc.call_count, 1)
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(rpc, 'TRANSPORT')
@mock.patch.object(messaging, 'get_rpc_server')
def test_service_with_two_rpcservers(
self, mock_get, mock_TRANSPORT, mock_svc_get):
topic_alt = 'fake_alt'
fake_manager = 'nova.tests.unit.test_service.FakeManager'
serv = service.Service(self.host,
self.binary,
self.topic,
fake_manager,
topic_alt=topic_alt)
serv.start()
serv.stop()
target = messaging.Target(topic=self.topic, server=self.host)
target_alt = messaging.Target(topic=topic_alt, server=self.host)
# Check the two calls to oslo.messasing get_rpc_server()
# with different target
self.assertEqual(mock_get.call_count, 2)
mock_get.assert_any_call(mock_TRANSPORT, target, mock.ANY,
executor=mock.ANY, serializer=mock.ANY,
access_policy=mock.ANY)
mock_get.assert_any_call(mock_TRANSPORT, target_alt, mock.ANY,
executor=mock.ANY, serializer=mock.ANY,
access_policy=mock.ANY)
self.assertIsNotNone(serv.rpcserver)
self.assertIsNotNone(serv.rpcserver_alt)
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(messaging.rpc.server.RPCServer, '_create_listener')
def test_service_with_two_rpc_topics_get_two_different_rpcservers(
self, mock_listner, mock_svc_get):
topic_alt = 'fake_alt'
fake_manager = 'nova.tests.unit.test_service.FakeManager'
serv = service.Service(self.host,
self.binary,
self.topic,
fake_manager,
topic_alt=topic_alt)
serv.start()
self.assertIsNotNone(serv.rpcserver)
self.assertIsNotNone(serv.rpcserver_alt)
self.assertNotEqual(serv.rpcserver, serv.rpcserver_alt)
def test_reset(self): def test_reset(self):
serv = service.Service(self.host, serv = service.Service(self.host,