Merge "[compute]Use single long task executor"
This commit is contained in:
@@ -96,6 +96,13 @@ tasks to be executed concurrently.
|
||||
more performant live migration is needed then enable
|
||||
:oslo.config:option:`libvirt.live_migration_parallel_connections` instead.
|
||||
|
||||
* :oslo.config:option:`max_concurrent_builds` and
|
||||
:oslo.config:option:`max_concurrent_snapshots`: In native threading mode
|
||||
both types of operations using a common shared executor to free up the RPC
|
||||
handler workers. Therefore both type of operations are counted against the
|
||||
same shared maximum limit. If the two options are set to different values
|
||||
then the shared limit will be the bigger of the two values.
|
||||
|
||||
Seeing the usage of the pools
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
||||
+66
-8
@@ -672,10 +672,49 @@ class ComputeManager(manager.Manager):
|
||||
self._syncs_in_progress_lock = threading.Lock()
|
||||
self.send_instance_updates = (
|
||||
CONF.filter_scheduler.track_instance_changes)
|
||||
self._build_semaphore = threading.Semaphore(
|
||||
self._get_max_concurrent_builds())
|
||||
self._snapshot_semaphore = threading.Semaphore(
|
||||
self._get_max_concurrent_snapshots())
|
||||
|
||||
max_builds = self._get_max_concurrent_builds()
|
||||
max_snapshots = self._get_max_concurrent_snapshots()
|
||||
|
||||
if utils.concurrency_mode_threading():
|
||||
max_tasks = max(max_builds, max_snapshots)
|
||||
|
||||
if max_builds != max_snapshots:
|
||||
LOG.warning(
|
||||
"In native threading mode the number of concurrent "
|
||||
"builds, and snapshots should be limited to the "
|
||||
"same number. The current configuration has differing "
|
||||
"limits: max_concurrent_builds: %d, "
|
||||
"max_concurrent_snapshots: %d. "
|
||||
"Nova will use a single, overall limit of %d for these "
|
||||
"tasks.",
|
||||
max_builds, max_snapshots, max_tasks)
|
||||
|
||||
self._long_task_executor = utils.get_long_task_executor(max_tasks)
|
||||
|
||||
# In threading mode we want to use the size of the executor to
|
||||
# act as the limit of concurrent execution. So neuter the
|
||||
# semaphores here.
|
||||
# TODO(gibi): remove the semaphores once eventlet mode is removed
|
||||
self._build_semaphore = compute_utils.UnlimitedSemaphore()
|
||||
self._snapshot_semaphore = compute_utils.UnlimitedSemaphore()
|
||||
|
||||
else:
|
||||
# In eventlet mode we use the individual semaphores to limit
|
||||
# the concurrent tasks, so just create a big Executor to
|
||||
# potentially host all of them
|
||||
self._long_task_executor = utils.get_long_task_executor(
|
||||
max_builds + max_snapshots)
|
||||
|
||||
self._build_semaphore = threading.Semaphore(max_builds)
|
||||
self._snapshot_semaphore = threading.Semaphore(max_snapshots)
|
||||
|
||||
# While live migration is a long-running task we cannot put it into
|
||||
# the same long_task_executor as build and snapshot as we need:
|
||||
# 1. a very small limit of concurrent live migrations compared to
|
||||
# builds and snapshots
|
||||
# 2. a way to cancel live migrations easily that are waiting due to the
|
||||
# limit
|
||||
self._live_migration_executor = nova.utils.create_executor(
|
||||
max_workers=self._get_max_concurrent_live_migrations())
|
||||
|
||||
@@ -1879,6 +1918,10 @@ class ComputeManager(manager.Manager):
|
||||
self.instance_events.cancel_all_events()
|
||||
self.driver.cleanup_host(host=self.host)
|
||||
self._cleanup_live_migrations_in_pool()
|
||||
# NOTE: graceful shutdown needs to take care of the executors
|
||||
# self._sync_power_executor.shutdown()
|
||||
# utils.destroy_long_task_executor()
|
||||
# utils.destroy_default_executor()
|
||||
|
||||
def _cleanup_live_migrations_in_pool(self):
|
||||
# Shutdown the pool so we don't get new requests.
|
||||
@@ -2514,7 +2557,8 @@ class ComputeManager(manager.Manager):
|
||||
# NOTE(danms): We spawn here to return the RPC worker thread back to
|
||||
# the pool. Since what follows could take a really long time, we don't
|
||||
# want to tie up RPC workers.
|
||||
utils.spawn(_locked_do_build_and_run_instance,
|
||||
utils.spawn_on(self._long_task_executor,
|
||||
_locked_do_build_and_run_instance,
|
||||
context, instance, image, request_spec,
|
||||
filter_properties, admin_password, injected_files,
|
||||
requested_networks, security_groups,
|
||||
@@ -4682,10 +4726,24 @@ class ComputeManager(manager.Manager):
|
||||
instance=instance)
|
||||
return
|
||||
|
||||
with self._snapshot_semaphore:
|
||||
self._snapshot_instance(context, image_id, instance,
|
||||
task_states.IMAGE_SNAPSHOT)
|
||||
def do_snapshot_instance(
|
||||
context, image_id, instance, expected_task_state
|
||||
):
|
||||
with self._snapshot_semaphore:
|
||||
self._snapshot_instance(context, image_id, instance,
|
||||
expected_task_state)
|
||||
|
||||
# NOTE(gibi): We spawn a separate task as this can be a long-running
|
||||
# operation, and we want to return the RPC worker to its executor to
|
||||
# avoid blocking RPC traffic.
|
||||
return utils.spawn_on(
|
||||
self._long_task_executor, do_snapshot_instance, context,
|
||||
image_id, instance, task_states.IMAGE_SNAPSHOT)
|
||||
|
||||
@wrap_exception()
|
||||
@reverts_task_state
|
||||
@wrap_instance_fault
|
||||
@delete_image_on_error
|
||||
def _snapshot_instance(self, context, image_id, instance,
|
||||
expected_task_state):
|
||||
context = context.elevated()
|
||||
|
||||
@@ -664,6 +664,11 @@ instances, if asked to do so. This limit is enforced to avoid building
|
||||
unlimited instance concurrently on a compute node. This value can be set
|
||||
per compute node.
|
||||
|
||||
In native threading mode concurrent builds and concurrent snapshot operations
|
||||
are sharing the same executor and therefore the max limit values are common.
|
||||
If max_concurrent_builds and max_concurrent_snapshots are set to different
|
||||
values the bigger value will be used as the combined limit for both.
|
||||
|
||||
Possible Values:
|
||||
|
||||
* ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously
|
||||
@@ -682,6 +687,11 @@ This limit is enforced to prevent snapshots overwhelming the
|
||||
host/network/storage and causing failure. This value can be set per
|
||||
compute node.
|
||||
|
||||
In native threading mode concurrent builds and concurrent snapshot operations
|
||||
are sharing the same executor and therefore the max limit values are common.
|
||||
If max_concurrent_builds and max_concurrent_snapshots are set to different
|
||||
values the bigger value will be used as the combined limit for both.
|
||||
|
||||
Possible Values:
|
||||
|
||||
* ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously
|
||||
|
||||
Vendored
+17
@@ -1199,14 +1199,17 @@ class IsolatedExecutorFixture(fixtures.Fixture):
|
||||
assert utils.SCATTER_GATHER_EXECUTOR is None
|
||||
assert utils.DEFAULT_EXECUTOR is None
|
||||
assert utils.CACHE_IMAGES_EXECUTOR is None
|
||||
assert utils.LONG_TASK_EXECUTOR is None
|
||||
|
||||
origi_get_scatter_gather = utils.get_scatter_gather_executor
|
||||
origi_default_executor = utils._get_default_executor
|
||||
origi_get_cache_images_executor = utils.get_cache_images_executor
|
||||
origi_get_long_task_executor = utils.get_long_task_executor
|
||||
|
||||
self.executor = None
|
||||
self.scatter_gather_executor = None
|
||||
self.cache_images_executor = None
|
||||
self.long_task_executor = None
|
||||
|
||||
def _get_default_executor():
|
||||
self.executor = origi_default_executor()
|
||||
@@ -1247,12 +1250,26 @@ class IsolatedExecutorFixture(fixtures.Fixture):
|
||||
self.addCleanup(
|
||||
lambda: self.do_cleanup_executor(self.cache_images_executor))
|
||||
|
||||
def _get_long_task_executor(max_workers):
|
||||
self.long_task_executor = origi_get_long_task_executor(max_workers)
|
||||
self.long_task_executor.name = (
|
||||
f"{self.test_case_id}.long_task")
|
||||
return self.long_task_executor
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'nova.utils.get_long_task_executor',
|
||||
_get_long_task_executor))
|
||||
|
||||
self.addCleanup(
|
||||
lambda: self.do_cleanup_executor(self.long_task_executor))
|
||||
|
||||
self.addCleanup(self.reset_globals)
|
||||
|
||||
def reset_globals(self):
|
||||
utils.SCATTER_GATHER_EXECUTOR = None
|
||||
utils.DEFAULT_EXECUTOR = None
|
||||
utils.CACHE_IMAGES_EXECUTOR = None
|
||||
utils.LONG_TASK_EXECUTOR = None
|
||||
|
||||
def do_cleanup_executor(self, executor):
|
||||
# NOTE(gibi): we cannot rely on utils.concurrency_mode_threading
|
||||
|
||||
@@ -4090,10 +4090,9 @@ class ComputeTestCase(BaseTestCase,
|
||||
with mock.patch.object(compute_utils,
|
||||
'EventReporter') as mock_event:
|
||||
if method == 'snapshot':
|
||||
self.assertRaises(test.TestingException,
|
||||
self.compute.snapshot_instance,
|
||||
self.context, image_id=uuids.snapshot,
|
||||
instance=inst_obj)
|
||||
future = self.compute.snapshot_instance(
|
||||
self.context, image_id=uuids.snapshot, instance=inst_obj)
|
||||
self.assertRaises(test.TestingException, future.result)
|
||||
mock_event.assert_called_once_with(self.context,
|
||||
'compute_snapshot_instance',
|
||||
CONF.host,
|
||||
|
||||
@@ -972,15 +972,24 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
||||
self._test_max_concurrent_builds()
|
||||
|
||||
def test_max_concurrent_builds_semaphore_limited(self):
|
||||
utils.destroy_long_task_executor()
|
||||
self.flags(max_concurrent_builds=123)
|
||||
self.assertEqual(123,
|
||||
manager.ComputeManager()._build_semaphore._value)
|
||||
compute = manager.ComputeManager()
|
||||
if utils.concurrency_mode_threading():
|
||||
self.assertIsInstance(
|
||||
compute._build_semaphore, compute_utils.UnlimitedSemaphore)
|
||||
self.assertEqual(123, compute._long_task_executor._max_workers)
|
||||
else:
|
||||
self.assertEqual(123, compute._build_semaphore._value)
|
||||
|
||||
def test_max_concurrent_builds_semaphore_unlimited(self):
|
||||
utils.destroy_long_task_executor()
|
||||
self.flags(max_concurrent_builds=0)
|
||||
compute = manager.ComputeManager()
|
||||
if utils.concurrency_mode_threading():
|
||||
self.assertEqual(10, compute._build_semaphore._value)
|
||||
self.assertIsInstance(
|
||||
compute._build_semaphore, compute_utils.UnlimitedSemaphore)
|
||||
self.assertEqual(10, compute._long_task_executor._max_workers)
|
||||
else:
|
||||
self.assertEqual(1000, compute._build_semaphore._value)
|
||||
|
||||
@@ -1007,18 +1016,47 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
||||
self._test_max_concurrent_snapshots()
|
||||
|
||||
def test_max_concurrent_snapshots_semaphore_limited(self):
|
||||
utils.destroy_long_task_executor()
|
||||
self.flags(max_concurrent_snapshots=123)
|
||||
self.assertEqual(123,
|
||||
manager.ComputeManager()._snapshot_semaphore._value)
|
||||
compute = manager.ComputeManager()
|
||||
if utils.concurrency_mode_threading():
|
||||
self.assertIsInstance(
|
||||
compute._snapshot_semaphore, compute_utils.UnlimitedSemaphore)
|
||||
self.assertEqual(123, compute._long_task_executor._max_workers)
|
||||
else:
|
||||
self.assertEqual(123, compute._snapshot_semaphore._value)
|
||||
|
||||
def test_max_concurrent_snapshots_semaphore_unlimited(self):
|
||||
utils.destroy_long_task_executor()
|
||||
self.flags(max_concurrent_snapshots=0)
|
||||
compute = manager.ComputeManager()
|
||||
if utils.concurrency_mode_threading():
|
||||
self.assertEqual(5, compute._snapshot_semaphore._value)
|
||||
self.assertIsInstance(
|
||||
compute._snapshot_semaphore, compute_utils.UnlimitedSemaphore)
|
||||
self.assertEqual(10, compute._long_task_executor._max_workers)
|
||||
else:
|
||||
self.assertEqual(1000, compute._snapshot_semaphore._value)
|
||||
|
||||
@mock.patch.object(manager.LOG, 'warning')
|
||||
def test_max_c_builds_and_snapshots_different_limits(self, mock_log):
|
||||
utils.destroy_long_task_executor()
|
||||
self.flags(max_concurrent_builds=124)
|
||||
self.flags(max_concurrent_snapshots=123)
|
||||
compute = manager.ComputeManager()
|
||||
if utils.concurrency_mode_threading():
|
||||
self.assertEqual(124, compute._long_task_executor._max_workers)
|
||||
mock_log.assert_called_once_with(
|
||||
'In native threading mode the number of concurrent builds, '
|
||||
'and snapshots should be limited to the same number. '
|
||||
'The current configuration has differing limits: '
|
||||
'max_concurrent_builds: %d, max_concurrent_snapshots: %d. '
|
||||
'Nova will use a single, overall limit of %d for these tasks.',
|
||||
124, 123, 124)
|
||||
else:
|
||||
self.assertEqual(123, compute._snapshot_semaphore._value)
|
||||
self.assertEqual(124, compute._build_semaphore._value)
|
||||
mock_log.assert_not_called()
|
||||
|
||||
def test_nil_out_inst_obj_host_and_node_sets_nil(self):
|
||||
instance = fake_instance.fake_instance_obj(self.context,
|
||||
uuid=uuids.instance,
|
||||
|
||||
@@ -1676,3 +1676,35 @@ class StaticallyDelayingCancellableTaskExecutorWrapper:
|
||||
@property
|
||||
def is_alive(self) -> bool:
|
||||
return self._thread.is_alive()
|
||||
|
||||
|
||||
LONG_TASK_EXECUTOR: Executor | None = None
|
||||
|
||||
|
||||
def get_long_task_executor(max_workers) -> Executor:
|
||||
"""Returns the executor used for long compute operations."""
|
||||
global LONG_TASK_EXECUTOR
|
||||
|
||||
if not LONG_TASK_EXECUTOR:
|
||||
LONG_TASK_EXECUTOR = create_executor(max_workers)
|
||||
|
||||
pname = multiprocessing.current_process().name
|
||||
executor_name = f"{pname}.long_task"
|
||||
LONG_TASK_EXECUTOR.name = executor_name
|
||||
|
||||
LOG.info("The long task thread pool %s is initialized",
|
||||
executor_name)
|
||||
|
||||
return LONG_TASK_EXECUTOR
|
||||
|
||||
|
||||
def destroy_long_task_executor():
|
||||
"""Closes the executor and resets the global to None"""
|
||||
global LONG_TASK_EXECUTOR
|
||||
if LONG_TASK_EXECUTOR:
|
||||
LOG.info(
|
||||
"The thread pool %s is shutting down", LONG_TASK_EXECUTOR.name)
|
||||
LONG_TASK_EXECUTOR.shutdown()
|
||||
LOG.info("The thread pool %s is closed", LONG_TASK_EXECUTOR.name)
|
||||
|
||||
LONG_TASK_EXECUTOR = None
|
||||
|
||||
+8
@@ -36,6 +36,14 @@ upgrade:
|
||||
now reduced to 5 native threads. Please also read the `concurrency
|
||||
<https://docs.openstack.org/nova/latest/admin/concurrency.html>`__
|
||||
guide for more details.
|
||||
- |
|
||||
In native threading mode the limit expressed by
|
||||
``[DEFAULT]max_concurrent_builds`` and
|
||||
``[DEFAULT]max_concurrent_snapshots`` configuration options are shared
|
||||
across the two operation types as they are executed by the same Executor.
|
||||
Therefore the two config options need to be set to the same value. If not
|
||||
then nova will use the bigger value as the shared limit. The shared limit
|
||||
also means that one operation type can consume the whole limit.
|
||||
deprecations:
|
||||
- |
|
||||
The possible 0 value of the configuration option
|
||||
|
||||
Reference in New Issue
Block a user