diff --git a/nova/cmd/compute.py b/nova/cmd/compute.py index 9007d4b79b..04636ba7c4 100644 --- a/nova/cmd/compute.py +++ b/nova/cmd/compute.py @@ -59,8 +59,10 @@ def main(): nova.db.main.api.DISABLE_DB_ACCESS = True objects_base.NovaObject.indirection_api = conductor_rpcapi.ConductorAPI() objects.Service.enable_min_version_cache() - server = service.Service.create(binary='nova-compute', - topic=compute_rpcapi.RPC_TOPIC) + server = service.Service.create( + binary='nova-compute', + topic=compute_rpcapi.RPC_TOPIC, + topic_alt=compute_rpcapi.RPC_TOPIC_ALT) # Compute service should never fork worker processes service.serve(server, workers=1, no_fork=True) service.wait() diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index 79cbc2ab61..c7c2797aef 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -32,6 +32,14 @@ from nova import rpc CONF = nova.conf.CONF RPC_TOPIC = "compute" +# NOTE(gmaan): The compute service creates two rpc servers which means each +# compute service worker will be listening on two topic queues (1. 'compute' +# 2. 'compute-alt'). The 'compute-alt' rpc server is used to handle the +# graceful shutdown of compute service. During graceful shutdown, 'compute' +# rpc server will be stopped but 'compute-alt' rpc server will be active for +# finishing the ongoing operations. The 'compute-alt' topic is supposed to be +# used in the rpc call/cast which are used to finish the ongoing operations. +RPC_TOPIC_ALT = "compute-alt" LOG = logging.getLogger(__name__) LAST_VERSION = None diff --git a/nova/service.py b/nova/service.py index b307fb18d9..946be7c993 100644 --- a/nova/service.py +++ b/nova/service.py @@ -97,11 +97,19 @@ class Service(service.Service): def __init__(self, host, binary, topic, manager, report_interval=None, periodic_enable=None, periodic_fuzzy_delay=None, - periodic_interval_max=None, *args, **kwargs): + periodic_interval_max=None, topic_alt=None, + *args, **kwargs): super(Service, self).__init__() self.host = host self.binary = binary self.topic = topic + # NOTE(gmaan): If any service would like to create a 2nd rpc server, + # then it needs to be created with different topic (topic_alt) so that + # oslo.messaging creates the different RPC objects (for example, + # dispatcher, consumers, rabbitmq queue, amqp listener, kombu + # connection etc). The endpoint (manager) stay same so that same + # manager will be serving the both rpc servers. + self.topic_alt = topic_alt self.manager_class_name = manager self.servicegroup_api = servicegroup.API() manager_class = importutils.import_class(self.manager_class_name) @@ -174,8 +182,6 @@ class Service(service.Service): if self.backdoor_port is not None: self.manager.backdoor_port = self.backdoor_port - LOG.debug("Creating RPC server for service %s", self.topic) - target = messaging.Target(topic=self.topic, server=self.host) endpoints = [ @@ -186,9 +192,30 @@ class Service(service.Service): serializer = objects_base.NovaObjectSerializer() + LOG.debug("Creating RPC server for service: %s on topic: %s", + self.binary, self.topic) self.rpcserver = rpc.get_server(target, endpoints, serializer) self.rpcserver.start() + self.rpcserver_alt = None + # NOTE(gmaan): Only compute service creates the two rpcservers which + # means each compute service will create two rabiitmq queues bound with + # same exchange but on two different topics (1. 'compute' + # 2. 'compute-alt'). + # The main use case for 2nd rpcserver is graceful shutdown of compute + # service. During graceful shutdown, the compute service will stop + # listening to the new request (stop listening on 'compute' rpcserver) + # but continue listening to the 'compute-alt' rpcserver so that it can + # finish all their ongoing operations. + if self.topic_alt is not None: + LOG.debug("Creating 2nd RPC server for service: %s on topic: %s", + self.binary, self.topic_alt) + target_alt = messaging.Target( + topic=self.topic_alt, server=self.host) + self.rpcserver_alt = rpc.get_server( + target_alt, endpoints, serializer) + self.rpcserver_alt.start() + self.manager.post_start_hook() LOG.debug("Join ServiceGroup membership for this service %s", @@ -214,7 +241,8 @@ class Service(service.Service): @classmethod def create(cls, host=None, binary=None, topic=None, manager=None, report_interval=None, periodic_enable=None, - periodic_fuzzy_delay=None, periodic_interval_max=None): + periodic_fuzzy_delay=None, periodic_interval_max=None, + topic_alt=None): """Instantiates class and passes back application object. :param host: defaults to CONF.host @@ -225,6 +253,7 @@ class Service(service.Service): :param periodic_enable: defaults to CONF.periodic_enable :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay :param periodic_interval_max: if set, the max time to wait between runs + :param topic_alt: defaults to None """ if not host: @@ -246,7 +275,8 @@ class Service(service.Service): report_interval=report_interval, periodic_enable=periodic_enable, periodic_fuzzy_delay=periodic_fuzzy_delay, - periodic_interval_max=periodic_interval_max) + periodic_interval_max=periodic_interval_max, + topic_alt=topic_alt) # NOTE(gibi): This have to be after the service object creation as # that is the point where we can safely use the RPC to the conductor. @@ -281,9 +311,22 @@ class Service(service.Service): def stop(self): """stop the service and clean up.""" try: + LOG.debug('%s service stopping RPC server on topic: %s', + self.binary, self.topic) self.rpcserver.stop() self.rpcserver.wait() - except Exception: + LOG.debug('%s service stopped RPC server on topic: %s', + self.binary, self.topic) + if self.rpcserver_alt is not None: + LOG.debug('%s service stopping the 2nd RPC server on ' + 'topic: %s', self.binary, self.topic_alt) + self.rpcserver_alt.stop() + self.rpcserver_alt.wait() + LOG.debug('%s service stopped the 2nd RPC server on ' + 'topic: %s', self.binary, self.topic_alt) + except Exception as exc: + LOG.exception('Service error occurred during RPC server ' + 'stop & wait, Error: %s', str(exc)) pass try: diff --git a/nova/tests/unit/test_service.py b/nova/tests/unit/test_service.py index 151a8901cd..91fabb4a3d 100644 --- a/nova/tests/unit/test_service.py +++ b/nova/tests/unit/test_service.py @@ -22,6 +22,7 @@ import os.path from unittest import mock from oslo_config import cfg +import oslo_messaging as messaging from oslo_service import service as _service from nova import exception @@ -265,6 +266,53 @@ class ServiceTestCase(test.NoDBTestCase): serv.rpcserver.start.assert_called_once_with() serv.rpcserver.stop.assert_called_once_with() serv.rpcserver.wait.assert_called_once_with() + self.assertIsNone(serv.rpcserver_alt) + self.assertEqual(mock_rpc.call_count, 1) + + @mock.patch('nova.objects.service.Service.get_by_host_and_binary') + @mock.patch.object(rpc, 'TRANSPORT') + @mock.patch.object(messaging, 'get_rpc_server') + def test_service_with_two_rpcservers( + self, mock_get, mock_TRANSPORT, mock_svc_get): + topic_alt = 'fake_alt' + fake_manager = 'nova.tests.unit.test_service.FakeManager' + serv = service.Service(self.host, + self.binary, + self.topic, + fake_manager, + topic_alt=topic_alt) + serv.start() + serv.stop() + target = messaging.Target(topic=self.topic, server=self.host) + target_alt = messaging.Target(topic=topic_alt, server=self.host) + + # Check the two calls to oslo.messasing get_rpc_server() + # with different target + self.assertEqual(mock_get.call_count, 2) + mock_get.assert_any_call(mock_TRANSPORT, target, mock.ANY, + executor=mock.ANY, serializer=mock.ANY, + access_policy=mock.ANY) + mock_get.assert_any_call(mock_TRANSPORT, target_alt, mock.ANY, + executor=mock.ANY, serializer=mock.ANY, + access_policy=mock.ANY) + self.assertIsNotNone(serv.rpcserver) + self.assertIsNotNone(serv.rpcserver_alt) + + @mock.patch('nova.objects.service.Service.get_by_host_and_binary') + @mock.patch.object(messaging.rpc.server.RPCServer, '_create_listener') + def test_service_with_two_rpc_topics_get_two_different_rpcservers( + self, mock_listner, mock_svc_get): + topic_alt = 'fake_alt' + fake_manager = 'nova.tests.unit.test_service.FakeManager' + serv = service.Service(self.host, + self.binary, + self.topic, + fake_manager, + topic_alt=topic_alt) + serv.start() + self.assertIsNotNone(serv.rpcserver) + self.assertIsNotNone(serv.rpcserver_alt) + self.assertNotEqual(serv.rpcserver, serv.rpcserver_alt) def test_reset(self): serv = service.Service(self.host,