Merge "Add manager graceful shutdown, timeout, and wait"

This commit is contained in:
Zuul
2026-02-24 17:58:27 +00:00
committed by Gerrit Code Review
13 changed files with 374 additions and 25 deletions
+25
View File
@@ -1812,6 +1812,31 @@ class ComputeManager(manager.Manager):
self._set_instance_obj_error_state( self._set_instance_obj_error_state(
instance, clean_task_state=True) instance, clean_task_state=True)
def graceful_shutdown(self):
"""Gracefully shutdown the manager.
This will be called during graceful shutdown (SIGTERM) and manager
should transit the in-progress tasks to safe termination point. The
safe termination point can be either complete or abort them.
"""
# TODO(gmaan) Time based wait is temporary solution and it will be
# replaced by the better solution to finish in-progress tasks.
if CONF.manager_shutdown_timeout > CONF.graceful_shutdown_timeout:
LOG.warning('manager_shutdown_timeout (%s) is higher than '
'graceful_shutdown_timeout (%s); the service may be '
'killed before the manager finishes waiting.',
CONF.manager_shutdown_timeout,
CONF.graceful_shutdown_timeout)
sleep_time = CONF.graceful_shutdown_timeout - 10
else:
sleep_time = CONF.manager_shutdown_timeout
LOG.debug('Compute service manager is waiting for %s seconds to '
'finish in-progress tasks', sleep_time)
time.sleep(sleep_time)
# Cleanup host will be the last step of manager graceful_shutdown
self.cleanup_host()
def cleanup_host(self): def cleanup_host(self):
self.driver.register_event_listener(None) self.driver.register_event_listener(None)
self.instance_events.cancel_all_events() self.instance_events.cancel_all_events()
+23
View File
@@ -21,6 +21,7 @@ import copy
import functools import functools
import sys import sys
import threading import threading
import time
from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import exceptions as ks_exc
from oslo_config import cfg from oslo_config import cfg
@@ -202,6 +203,28 @@ class ConductorManager(manager.Manager):
def reset(self): def reset(self):
objects.Service.clear_min_version_cache() objects.Service.clear_min_version_cache()
def graceful_shutdown(self):
"""Gracefully shutdown the manager.
This will be called during graceful shutdown (SIGTERM) and manager
should transit the in-progress tasks to safe termination point. The
safe termination point can be either complete or abort them.
"""
# TODO(gmaan) Time based wait is temporary solution and it will be
# replaced by the better solution to finish in-progress tasks.
if CONF.manager_shutdown_timeout > CONF.graceful_shutdown_timeout:
LOG.warning('manager_shutdown_timeout (%s) is higher than '
'graceful_shutdown_timeout (%s); the service may be '
'killed before the manager finishes waiting.',
CONF.manager_shutdown_timeout,
CONF.graceful_shutdown_timeout)
sleep_time = CONF.graceful_shutdown_timeout - 10
else:
sleep_time = CONF.manager_shutdown_timeout
LOG.debug('Conductor service manager is waiting for %s seconds to '
'finish in-progress tasks', sleep_time)
time.sleep(sleep_time)
@contextlib.contextmanager @contextlib.contextmanager
def try_target_cell(context, cell): def try_target_cell(context, cell):
+35
View File
@@ -16,6 +16,17 @@
# under the License. # under the License.
from oslo_config import cfg from oslo_config import cfg
from oslo_service import opts
# NOTE(gmaan): 'graceful_shutdown_timeout' is defined in oslo.service with
# default value of 60 which is too low for Nova services. Override its default
# here which will be applicable for all Nova services.
NOVA_DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = 180
opts.set_service_opts_defaults(
cfg.CONF,
graceful_shutdown_timeout=NOVA_DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT)
base_options = [ base_options = [
cfg.IntOpt( cfg.IntOpt(
@@ -107,6 +118,30 @@ from a pool maximum once every 60 seconds. The value 0 means that logging
happens every time work is submitted to the pool. The value -1 means the happens every time work is submitted to the pool. The value -1 means the
logging is disabled. logging is disabled.
'''), '''),
cfg.IntOpt(
'manager_shutdown_timeout',
default=160,
min=0,
help="""
Specifies the total time in seconds for the manager to complete the
in-progress tasks. During a graceful shutdown, the manager will
attempt to finish the in-progress tasks within this period. If tasks
take a longer time, then we need to timeout that and let the service
complete the remaining graceful shutdown steps.
This timeout must be less than the overall graceful shutdown timeout
``[DEFAULT]/graceful_shutdown_timeout``.
Possible values:
* 0: The compute manager does not wait to finish in-progress tasks.
* A positive integer: Number of seconds the manager waits before the service
stops (The default value is 160).
Related options:
* ``[DEFAULT]/graceful_shutdown_timeout``
"""),
] ]
+8
View File
@@ -115,6 +115,14 @@ class Manager(PeriodicTasks, metaclass=ManagerMeta):
""" """
pass pass
def graceful_shutdown(self):
"""Hook to gracefully shutdown the manager.
Child classes should override this method.
"""
pass
def cleanup_host(self): def cleanup_host(self):
"""Hook to do cleanup work when the service shuts down. """Hook to do cleanup work when the service shuts down.
+23
View File
@@ -22,6 +22,7 @@ Scheduler Service
import collections import collections
import copy import copy
import random import random
import time
from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import exceptions as ks_exc
from oslo_log import log as logging from oslo_log import log as logging
@@ -156,6 +157,28 @@ class SchedulerManager(manager.Manager):
# cell. # cell.
self.host_manager.refresh_cells_caches() self.host_manager.refresh_cells_caches()
def graceful_shutdown(self):
"""Gracefully shutdown the manager.
This will be called during graceful shutdown (SIGTERM) and manager
should transit the in-progress tasks to safe termination point. The
safe termination point can be either complete or abort them.
"""
# TODO(gmaan) Time based wait is temporary solution and it will be
# replaced by the better solution to finish in-progress tasks.
if CONF.manager_shutdown_timeout > CONF.graceful_shutdown_timeout:
LOG.warning('manager_shutdown_timeout (%s) is higher than '
'graceful_shutdown_timeout (%s); the service may be '
'killed before the manager finishes waiting.',
CONF.manager_shutdown_timeout,
CONF.graceful_shutdown_timeout)
sleep_time = CONF.graceful_shutdown_timeout - 10
else:
sleep_time = CONF.manager_shutdown_timeout
LOG.debug('Scheduler service manager is waiting for %s seconds to '
'finish in-progress tasks', sleep_time)
time.sleep(sleep_time)
@messaging.expected_exceptions(exception.NoValidHost) @messaging.expected_exceptions(exception.NoValidHost)
def select_destinations( def select_destinations(
self, context, request_spec=None, self, context, request_spec=None,
+33 -22
View File
@@ -308,33 +308,44 @@ class Service(service.Service):
except exception.NotFound: except exception.NotFound:
LOG.warning('Service killed that has no database entry') LOG.warning('Service killed that has no database entry')
def stop(self): def _shutdown_rpc_server(self, rpc_server, topic):
"""stop the service and clean up."""
try: try:
LOG.debug('%s service stopping RPC server on topic: %s', LOG.debug('%s service stopping RPC server on topic: %s',
self.binary, self.topic) self.binary, topic)
self.rpcserver.stop() rpc_server.stop()
self.rpcserver.wait() rpc_server.wait()
LOG.debug('%s service stopped RPC server on topic: %s', LOG.debug('%s service stopped RPC server on topic: %s',
self.binary, self.topic) self.binary, topic)
if self.rpcserver_alt is not None:
LOG.debug('%s service stopping the 2nd RPC server on '
'topic: %s', self.binary, self.topic_alt)
self.rpcserver_alt.stop()
self.rpcserver_alt.wait()
LOG.debug('%s service stopped the 2nd RPC server on '
'topic: %s', self.binary, self.topic_alt)
except Exception as exc:
LOG.exception('Service error occurred during RPC server '
'stop & wait, Error: %s', str(exc))
pass
try:
self.manager.cleanup_host()
except Exception: except Exception:
LOG.exception('Service error occurred during cleanup_host') LOG.exception('Error occurred during RPC server stop & wait.')
pass
def stop(self):
"""stop the service and clean up."""
LOG.debug('%s service graceful shutdown started.', self.binary)
# This RPC server handles new requests during normal operation. During
# graceful shutdown, we limit the RPC requests the service can handle.
# So we stop the main RPC server here and let the alternative RPC
# server handle the remaining requests for the ongoing operations.
self._shutdown_rpc_server(self.rpcserver, self.topic)
try:
LOG.debug('%s manager graceful shutdown started.',
self.binary)
self.manager.graceful_shutdown()
LOG.debug('%s manager graceful shutdown finished.',
self.binary)
except Exception:
LOG.exception('Error occurred during %s manager graceful '
'shutdown', self.binary)
if self.rpcserver_alt is not None:
# During graceful shutdown, manager will use this RPC server to
# finish the in-progress tasks so this RPC server will be stopped
# at the end.
self._shutdown_rpc_server(
self.rpcserver_alt, self.topic_alt)
LOG.debug('%s service graceful shutdown finished.', self.binary)
super(Service, self).stop() super(Service, self).stop()
def periodic_tasks(self, raise_on_error=False): def periodic_tasks(self, raise_on_error=False):
+4
View File
@@ -60,6 +60,10 @@ class ConfFixture(config_fixture.Config):
self.conf.set_default( self.conf.set_default(
'notification_format', "both", group="notifications") 'notification_format', "both", group="notifications")
# Disable graceful shutdown wait otherwise the service stop() will
# take time and may end up with tests timeout.
self.conf.set_default('manager_shutdown_timeout', 0)
# oslo.limit requires endpoint_id since 2.3.0 # oslo.limit requires endpoint_id since 2.3.0
self.conf.set_default('endpoint_id', 'ENDPOINT_ID', group='oslo_limit') self.conf.set_default('endpoint_id', 'ENDPOINT_ID', group='oslo_limit')
@@ -7989,6 +7989,27 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
times = reportclient._association_refresh_time times = reportclient._association_refresh_time
self.assertEqual({}, times) self.assertEqual({}, times)
@mock.patch('time.sleep')
@mock.patch('nova.compute.manager.ComputeManager.cleanup_host')
def test_graceful_shutdown(self, mock_cleanup, mock_sleep):
self.flags(manager_shutdown_timeout=5)
self.compute.graceful_shutdown()
mock_sleep.assert_called_once_with(5)
mock_cleanup.assert_called_once_with()
@mock.patch('nova.compute.manager.LOG')
@mock.patch('time.sleep')
@mock.patch('nova.compute.manager.ComputeManager.cleanup_host')
def test_graceful_shutdown_manager_timeout_higher(
self, mock_cleanup, mock_sleep, mock_log):
# manager_shutdown_timeout > graceful_shutdown_timeout:
# warning logged, sleep = graceful_shutdown_timeout - 10 = 20
self.flags(manager_shutdown_timeout=50, graceful_shutdown_timeout=30)
self.compute.graceful_shutdown()
mock_log.warning.assert_called_once()
mock_sleep.assert_called_once_with(20)
mock_cleanup.assert_called_once_with()
@mock.patch('nova.objects.BlockDeviceMappingList.get_by_instance_uuid') @mock.patch('nova.objects.BlockDeviceMappingList.get_by_instance_uuid')
@mock.patch('nova.compute.manager.ComputeManager._delete_instance') @mock.patch('nova.compute.manager.ComputeManager._delete_instance')
def test_terminate_instance_no_bdm_volume_id(self, mock_delete_instance, def test_terminate_instance_no_bdm_volume_id(self, mock_delete_instance,
@@ -303,6 +303,23 @@ class ConductorTestCase(_BaseTestCase, test.TestCase):
self.conductor.reset() self.conductor.reset()
mock_clear_cache.assert_called_once_with() mock_clear_cache.assert_called_once_with()
@mock.patch('time.sleep')
def test_graceful_shutdown(self, mock_sleep):
self.flags(manager_shutdown_timeout=10)
self.conductor.graceful_shutdown()
mock_sleep.assert_called_once_with(10)
@mock.patch('nova.conductor.manager.LOG')
@mock.patch('time.sleep')
def test_graceful_shutdown_manager_timeout_higher(
self, mock_sleep, mock_log):
# manager_shutdown_timeout > graceful_shutdown_timeout:
# warning logged, sleep = graceful_shutdown_timeout - 10 = 20
self.flags(manager_shutdown_timeout=50, graceful_shutdown_timeout=30)
self.conductor.graceful_shutdown()
mock_log.warning.assert_called_once()
mock_sleep.assert_called_once_with(20)
def test_provider_fw_rule_get_all(self): def test_provider_fw_rule_get_all(self):
result = self.conductor.provider_fw_rule_get_all(self.context) result = self.conductor.provider_fw_rule_get_all(self.context)
self.assertEqual([], result) self.assertEqual([], result)
+28
View File
@@ -0,0 +1,28 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import nova.conf
from nova.conf import base
from nova import test
CONF = nova.conf.CONF
class BaseConfTestCase(test.NoDBTestCase):
def test_graceful_shutdown_timeout_default(self):
# Check that CONF.graceful_shutdown_timeout default is overridden
# by the Nova.
self.assertEqual(
base.NOVA_DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT,
CONF.graceful_shutdown_timeout)
+17
View File
@@ -1661,6 +1661,23 @@ class SchedulerManagerTestCase(test.NoDBTestCase):
self.manager.reset() self.manager.reset()
mock_refresh.assert_called_once_with() mock_refresh.assert_called_once_with()
@mock.patch('time.sleep')
def test_graceful_shutdown(self, mock_sleep):
self.flags(manager_shutdown_timeout=10)
self.manager.graceful_shutdown()
mock_sleep.assert_called_once_with(10)
@mock.patch('nova.scheduler.manager.LOG')
@mock.patch('time.sleep')
def test_graceful_shutdown_manager_timeout_higher(
self, mock_sleep, mock_log):
# manager_shutdown_timeout > graceful_shutdown_timeout:
# warning logged, sleep = graceful_shutdown_timeout - 10 = 20
self.flags(manager_shutdown_timeout=50, graceful_shutdown_timeout=30)
self.manager.graceful_shutdown()
mock_log.warning.assert_called_once()
mock_sleep.assert_called_once_with(20)
@mock.patch('nova.objects.service.ServiceList.get_by_binary') @mock.patch('nova.objects.service.ServiceList.get_by_binary')
@mock.patch('nova.objects.host_mapping.discover_hosts') @mock.patch('nova.objects.host_mapping.discover_hosts')
def test_discover_hosts(self, mock_discover, mock_get_by_binary): def test_discover_hosts(self, mock_discover, mock_get_by_binary):
+139 -2
View File
@@ -233,7 +233,7 @@ class ServiceTestCase(test.NoDBTestCase):
@mock.patch('nova.servicegroup.API') @mock.patch('nova.servicegroup.API')
@mock.patch('nova.objects.service.Service.get_by_host_and_binary') @mock.patch('nova.objects.service.Service.get_by_host_and_binary')
def test_parent_graceful_shutdown_with_cleanup_host( def test_service_stop_call_manager_graceful_shutdown(
self, mock_svc_get_by_host_and_binary, mock_API): self, mock_svc_get_by_host_and_binary, mock_API):
mock_manager = mock.Mock(target=None) mock_manager = mock.Mock(target=None)
@@ -250,7 +250,8 @@ class ServiceTestCase(test.NoDBTestCase):
mock_svc_get_by_host_and_binary.return_value) mock_svc_get_by_host_and_binary.return_value)
serv.stop() serv.stop()
serv.manager.cleanup_host.assert_called_with() # Check service with one RPC server calls manager graceful_shutdown
serv.manager.graceful_shutdown.assert_called_once_with()
@mock.patch('nova.servicegroup.API') @mock.patch('nova.servicegroup.API')
@mock.patch('nova.objects.service.Service.get_by_host_and_binary') @mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@@ -314,6 +315,142 @@ class ServiceTestCase(test.NoDBTestCase):
self.assertIsNotNone(serv.rpcserver_alt) self.assertIsNotNone(serv.rpcserver_alt)
self.assertNotEqual(serv.rpcserver, serv.rpcserver_alt) self.assertNotEqual(serv.rpcserver, serv.rpcserver_alt)
@mock.patch('nova.servicegroup.API')
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(rpc, 'get_server')
def test_service_stop_with_two_rpcservers(
self, mock_rpc, mock_svc_get_by_host_and_binary, mock_API):
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager',
topic_alt='fake_alt')
serv.start()
rpcserver = serv.rpcserver
rpcserver_alt = serv.rpcserver_alt
self.assertIsNotNone(rpcserver_alt)
with mock.patch.object(serv.manager, 'graceful_shutdown') as mock_gs:
serv.stop()
# Both rpcservers stop and wait is called.
rpcserver.stop.assert_called_with()
rpcserver.wait.assert_called_with()
rpcserver_alt.stop.assert_called_with()
rpcserver_alt.wait.assert_called_with()
# Check service with two RPC server calls manager graceful_shutdown
mock_gs.assert_called_once_with()
@mock.patch('nova.servicegroup.API')
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(rpc, 'get_server')
def test_service_stop_handle_manager_gs_exception(
self, mock_rpc, mock_svc_get_by_host_and_binary, mock_API):
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager')
serv.start()
serv.manager.graceful_shutdown = mock.Mock(side_effect=Exception())
# service.stop() should proceed even manager graceful_shutdown raise
# error
serv.stop()
serv.manager.graceful_shutdown.assert_called_once_with()
@mock.patch('nova.servicegroup.API')
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(rpc, 'get_server')
def test_stop_with_two_rpcservers_handle_manager_gs_exception(
self, mock_rpc, mock_svc_get_by_host_and_binary, mock_API):
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager',
topic_alt='fake-alt')
serv.start()
rpcserver = serv.rpcserver
rpcserver_alt = serv.rpcserver_alt
serv.manager.graceful_shutdown = mock.Mock(side_effect=Exception())
serv.stop()
# Even manager graceful_shutdown() raise error, both rpcservers stop
# and wait is called.
rpcserver.stop.assert_called_with()
rpcserver.wait.assert_called_with()
rpcserver_alt.stop.assert_called_with()
rpcserver_alt.wait.assert_called_with()
serv.manager.graceful_shutdown.assert_called_once_with()
def test_shutdown_rpc_server(self):
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager')
mock_rpc_server = mock.Mock()
serv._shutdown_rpc_server(mock_rpc_server, self.topic)
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_called_once_with()
def test_shutdown_rpc_server_handle_exception(self):
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager')
mock_rpc_server = mock.Mock()
mock_rpc_server.stop.side_effect = Exception()
serv._shutdown_rpc_server(mock_rpc_server, self.topic)
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_not_called()
@mock.patch('nova.servicegroup.API')
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(rpc, 'get_server')
def test_service_stop_handle_first_rpcserver_exception(
self, mock_rpc, mock_svc_get_by_host_and_binary, mock_API):
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager',
topic_alt='fake_alt')
serv.start()
serv.rpcserver = mock.Mock()
rpcserver = serv.rpcserver
serv.rpcserver.stop.side_effect = Exception()
rpcserver_alt = serv.rpcserver_alt
self.assertIsNotNone(rpcserver_alt)
with mock.patch.object(serv.manager, 'graceful_shutdown') as mock_gs:
serv.stop()
rpcserver.stop.assert_called_once_with()
rpcserver.wait.assert_not_called()
# Check if first RPC server stop() raise exception, it still call
# manager graceful_shutdown() and 2nd RPC server stop/wait.
mock_gs.assert_called_once_with()
rpcserver_alt.stop.assert_called_once_with()
rpcserver_alt.wait.assert_called_once_with()
@mock.patch('nova.servicegroup.API')
@mock.patch('nova.objects.service.Service.get_by_host_and_binary')
@mock.patch.object(rpc, 'get_server')
def test_service_stop_handle_2nd_rpcserver_exception(
self, mock_rpc, mock_svc_get_by_host_and_binary, mock_API):
serv = service.Service(self.host,
self.binary,
self.topic,
'nova.tests.unit.test_service.FakeManager',
topic_alt='fake_alt')
serv.start()
rpcserver = serv.rpcserver
serv.rpcserver_alt = mock.Mock()
serv.rpcserver_alt.stop.side_effect = Exception()
rpcserver_alt = serv.rpcserver_alt
self.assertIsNotNone(rpcserver_alt)
with mock.patch.object(serv.manager, 'graceful_shutdown') as mock_gs:
serv.stop()
rpcserver.stop.assert_called_once_with()
rpcserver.wait.assert_called_once_with()
mock_gs.assert_called_once_with()
# service.stop() should proceed even 2nd RPC server stop() raise
# error.
rpcserver_alt.stop.assert_called_once_with()
rpcserver_alt.wait.assert_not_called()
def test_reset(self): def test_reset(self):
serv = service.Service(self.host, serv = service.Service(self.host,
self.binary, self.binary,
+1 -1
View File
@@ -43,7 +43,7 @@ oslo.messaging>=14.1.0 # Apache-2.0
oslo.policy>=4.5.0 # Apache-2.0 oslo.policy>=4.5.0 # Apache-2.0
oslo.privsep>=2.6.2 # Apache-2.0 oslo.privsep>=2.6.2 # Apache-2.0
oslo.i18n>=5.1.0 # Apache-2.0 oslo.i18n>=5.1.0 # Apache-2.0
oslo.service[threading]>=4.4.1 # Apache-2.0 oslo.service[threading]>=4.5.0 # Apache-2.0
rfc3986>=1.2.0 # Apache-2.0 rfc3986>=1.2.0 # Apache-2.0
oslo.middleware>=3.31.0 # Apache-2.0 oslo.middleware>=3.31.0 # Apache-2.0
psutil>=3.2.2 # BSD psutil>=3.2.2 # BSD