Merge "Add 2nd RPC server for compute service"

This commit is contained in:
Zuul
2026-02-19 17:36:34 +00:00
committed by Gerrit Code Review
4 changed files with 109 additions and 8 deletions
+4 -2
View File
@@ -59,8 +59,10 @@ def main():
nova.db.main.api.DISABLE_DB_ACCESS = True
objects_base.NovaObject.indirection_api = conductor_rpcapi.ConductorAPI()
objects.Service.enable_min_version_cache()
server = service.Service.create(binary='nova-compute',
topic=compute_rpcapi.RPC_TOPIC)
server = service.Service.create(
binary='nova-compute',
topic=compute_rpcapi.RPC_TOPIC,
topic_alt=compute_rpcapi.RPC_TOPIC_ALT)
# Compute service should never fork worker processes
service.serve(server, workers=1, no_fork=True)
service.wait()
+8
View File
@@ -32,6 +32,14 @@ from nova import rpc
CONF = nova.conf.CONF
RPC_TOPIC = "compute"
# NOTE(gmaan): The compute service creates two rpc servers which means each
# compute service worker will be listening on two topic queues (1. 'compute'
# 2. 'compute-alt'). The 'compute-alt' rpc server is used to handle the
# graceful shutdown of compute service. During graceful shutdown, 'compute'
# rpc server will be stopped but 'compute-alt' rpc server will be active for
# finishing the ongoing operations. The 'compute-alt' topic is supposed to be
# used in the rpc call/cast which are used to finish the ongoing operations.
RPC_TOPIC_ALT = "compute-alt"
LOG = logging.getLogger(__name__)
LAST_VERSION = None
+49 -6
View File
@@ -97,11 +97,19 @@ class Service(service.Service):
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, *args, **kwargs):
periodic_interval_max=None, topic_alt=None,
*args, **kwargs):
super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
# NOTE(gmaan): If any service would like to create a 2nd rpc server,
# then it needs to be created with different topic (topic_alt) so that
# oslo.messaging creates the different RPC objects (for example,
# dispatcher, consumers, rabbitmq queue, amqp listener, kombu
# connection etc). The endpoint (manager) stay same so that same
# manager will be serving the both rpc servers.
self.topic_alt = topic_alt
self.manager_class_name = manager
self.servicegroup_api = servicegroup.API()
manager_class = importutils.import_class(self.manager_class_name)
@@ -174,8 +182,6 @@ class Service(service.Service):
if self.backdoor_port is not None:
self.manager.backdoor_port = self.backdoor_port
LOG.debug("Creating RPC server for service %s", self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [
@@ -186,9 +192,30 @@ class Service(service.Service):
serializer = objects_base.NovaObjectSerializer()
LOG.debug("Creating RPC server for service: %s on topic: %s",
self.binary, self.topic)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
self.rpcserver_alt = None
# NOTE(gmaan): Only compute service creates the two rpcservers which
# means each compute service will create two rabiitmq queues bound with
# same exchange but on two different topics (1. 'compute'
# 2. 'compute-alt').
# The main use case for 2nd rpcserver is graceful shutdown of compute
# service. During graceful shutdown, the compute service will stop
# listening to the new request (stop listening on 'compute' rpcserver)
# but continue listening to the 'compute-alt' rpcserver so that it can
# finish all their ongoing operations.
if self.topic_alt is not None:
LOG.debug("Creating 2nd RPC server for service: %s on topic: %s",
self.binary, self.topic_alt)
target_alt = messaging.Target(
topic=self.topic_alt, server=self.host)
self.rpcserver_alt = rpc.get_server(
target_alt, endpoints, serializer)
self.rpcserver_alt.start()
self.manager.post_start_hook()
LOG.debug("Join ServiceGroup membership for this service %s",
@@ -214,7 +241,8 @@ class Service(service.Service):
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None):
periodic_fuzzy_delay=None, periodic_interval_max=None,
topic_alt=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
@@ -225,6 +253,7 @@ class Service(service.Service):
:param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs
:param topic_alt: defaults to None
"""
if not host:
@@ -246,7 +275,8 @@ class Service(service.Service):
report_interval=report_interval,
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max)
periodic_interval_max=periodic_interval_max,
topic_alt=topic_alt)
# NOTE(gibi): This have to be after the service object creation as
# that is the point where we can safely use the RPC to the conductor.
@@ -281,9 +311,22 @@ class Service(service.Service):
def stop(self):
"""stop the service and clean up."""
try:
LOG.debug('%s service stopping RPC server on topic: %s',
self.binary, self.topic)
self.rpcserver.stop()
self.rpcserver.wait()
except Exception:
LOG.debug('%s service stopped RPC server on topic: %s',
self.binary, self.topic)
if self.rpcserver_alt is not None:
LOG.debug('%s service stopping the 2nd RPC server on '
'topic: %s', self.binary, self.topic_alt)
self.rpcserver_alt.stop()
self.rpcserver_alt.wait()
LOG.debug('%s service stopped the 2nd RPC server on '
'topic: %s', self.binary, self.topic_alt)
except Exception as exc:
LOG.exception('Service error occurred during RPC server '
'stop & wait, Error: %s', str(exc))
pass
try:
+48
View File
@@ -22,6 +22,7 @@ import os.path
from unittest import mock
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_service import service as _service
from nova import exception
@@ -265,6 +266,53 @@ class ServiceTestCase(test.NoDBTestCase):
serv.rpcserver.start.assert_called_once_with()
serv.rpcserver.stop.assert_called_once_with()
serv.rpcserver.wait.assert_called_once_with()
self.assertIsNone(serv.rpcserver_alt)
self.assertEqual(mock_rpc.call_count, 1)
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(rpc, 'TRANSPORT')
@mock.patch.object(messaging, 'get_rpc_server')
def test_service_with_two_rpcservers(
self, mock_get, mock_TRANSPORT, mock_svc_get):
topic_alt = 'fake_alt'
fake_manager = 'nova.tests.unit.test_service.FakeManager'
serv = service.Service(self.host,
self.binary,
self.topic,
fake_manager,
topic_alt=topic_alt)
serv.start()
serv.stop()
target = messaging.Target(topic=self.topic, server=self.host)
target_alt = messaging.Target(topic=topic_alt, server=self.host)
# Check the two calls to oslo.messasing get_rpc_server()
# with different target
self.assertEqual(mock_get.call_count, 2)
mock_get.assert_any_call(mock_TRANSPORT, target, mock.ANY,
executor=mock.ANY, serializer=mock.ANY,
access_policy=mock.ANY)
mock_get.assert_any_call(mock_TRANSPORT, target_alt, mock.ANY,
executor=mock.ANY, serializer=mock.ANY,
access_policy=mock.ANY)
self.assertIsNotNone(serv.rpcserver)
self.assertIsNotNone(serv.rpcserver_alt)
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(messaging.rpc.server.RPCServer, '_create_listener')
def test_service_with_two_rpc_topics_get_two_different_rpcservers(
self, mock_listner, mock_svc_get):
topic_alt = 'fake_alt'
fake_manager = 'nova.tests.unit.test_service.FakeManager'
serv = service.Service(self.host,
self.binary,
self.topic,
fake_manager,
topic_alt=topic_alt)
serv.start()
self.assertIsNotNone(serv.rpcserver)
self.assertIsNotNone(serv.rpcserver_alt)
self.assertNotEqual(serv.rpcserver, serv.rpcserver_alt)
def test_reset(self):
serv = service.Service(self.host,