diff --git a/doc/source/admin/concurrency.rst b/doc/source/admin/concurrency.rst index 953d47ab73..21d4e52e83 100644 --- a/doc/source/admin/concurrency.rst +++ b/doc/source/admin/concurrency.rst @@ -96,6 +96,13 @@ tasks to be executed concurrently. more performant live migration is needed then enable :oslo.config:option:`libvirt.live_migration_parallel_connections` instead. +* :oslo.config:option:`max_concurrent_builds` and + :oslo.config:option:`max_concurrent_snapshots`: In native threading mode + both types of operations using a common shared executor to free up the RPC + handler workers. Therefore both type of operations are counted against the + same shared maximum limit. If the two options are set to different values + then the shared limit will be the bigger of the two values. + Seeing the usage of the pools ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/nova/compute/manager.py b/nova/compute/manager.py index ab8d456c16..0daefe4077 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -672,10 +672,49 @@ class ComputeManager(manager.Manager): self._syncs_in_progress_lock = threading.Lock() self.send_instance_updates = ( CONF.filter_scheduler.track_instance_changes) - self._build_semaphore = threading.Semaphore( - self._get_max_concurrent_builds()) - self._snapshot_semaphore = threading.Semaphore( - self._get_max_concurrent_snapshots()) + + max_builds = self._get_max_concurrent_builds() + max_snapshots = self._get_max_concurrent_snapshots() + + if utils.concurrency_mode_threading(): + max_tasks = max(max_builds, max_snapshots) + + if max_builds != max_snapshots: + LOG.warning( + "In native threading mode the number of concurrent " + "builds, and snapshots should be limited to the " + "same number. The current configuration has differing " + "limits: max_concurrent_builds: %d, " + "max_concurrent_snapshots: %d. " + "Nova will use a single, overall limit of %d for these " + "tasks.", + max_builds, max_snapshots, max_tasks) + + self._long_task_executor = utils.get_long_task_executor(max_tasks) + + # In threading mode we want to use the size of the executor to + # act as the limit of concurrent execution. So neuter the + # semaphores here. + # TODO(gibi): remove the semaphores once eventlet mode is removed + self._build_semaphore = compute_utils.UnlimitedSemaphore() + self._snapshot_semaphore = compute_utils.UnlimitedSemaphore() + + else: + # In eventlet mode we use the individual semaphores to limit + # the concurrent tasks, so just create a big Executor to + # potentially host all of them + self._long_task_executor = utils.get_long_task_executor( + max_builds + max_snapshots) + + self._build_semaphore = threading.Semaphore(max_builds) + self._snapshot_semaphore = threading.Semaphore(max_snapshots) + + # While live migration is a long-running task we cannot put it into + # the same long_task_executor as build and snapshot as we need: + # 1. a very small limit of concurrent live migrations compared to + # builds and snapshots + # 2. a way to cancel live migrations easily that are waiting due to the + # limit self._live_migration_executor = nova.utils.create_executor( max_workers=self._get_max_concurrent_live_migrations()) @@ -1879,6 +1918,10 @@ class ComputeManager(manager.Manager): self.instance_events.cancel_all_events() self.driver.cleanup_host(host=self.host) self._cleanup_live_migrations_in_pool() + # NOTE: graceful shutdown needs to take care of the executors + # self._sync_power_executor.shutdown() + # utils.destroy_long_task_executor() + # utils.destroy_default_executor() def _cleanup_live_migrations_in_pool(self): # Shutdown the pool so we don't get new requests. @@ -2514,7 +2557,8 @@ class ComputeManager(manager.Manager): # NOTE(danms): We spawn here to return the RPC worker thread back to # the pool. Since what follows could take a really long time, we don't # want to tie up RPC workers. - utils.spawn(_locked_do_build_and_run_instance, + utils.spawn_on(self._long_task_executor, + _locked_do_build_and_run_instance, context, instance, image, request_spec, filter_properties, admin_password, injected_files, requested_networks, security_groups, @@ -4682,10 +4726,24 @@ class ComputeManager(manager.Manager): instance=instance) return - with self._snapshot_semaphore: - self._snapshot_instance(context, image_id, instance, - task_states.IMAGE_SNAPSHOT) + def do_snapshot_instance( + context, image_id, instance, expected_task_state + ): + with self._snapshot_semaphore: + self._snapshot_instance(context, image_id, instance, + expected_task_state) + # NOTE(gibi): We spawn a separate task as this can be a long-running + # operation, and we want to return the RPC worker to its executor to + # avoid blocking RPC traffic. + return utils.spawn_on( + self._long_task_executor, do_snapshot_instance, context, + image_id, instance, task_states.IMAGE_SNAPSHOT) + + @wrap_exception() + @reverts_task_state + @wrap_instance_fault + @delete_image_on_error def _snapshot_instance(self, context, image_id, instance, expected_task_state): context = context.elevated() diff --git a/nova/conf/compute.py b/nova/conf/compute.py index 4f58e89cd0..1da2fd7534 100644 --- a/nova/conf/compute.py +++ b/nova/conf/compute.py @@ -664,6 +664,11 @@ instances, if asked to do so. This limit is enforced to avoid building unlimited instance concurrently on a compute node. This value can be set per compute node. +In native threading mode concurrent builds and concurrent snapshot operations +are sharing the same executor and therefore the max limit values are common. +If max_concurrent_builds and max_concurrent_snapshots are set to different +values the bigger value will be used as the combined limit for both. + Possible Values: * ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously @@ -682,6 +687,11 @@ This limit is enforced to prevent snapshots overwhelming the host/network/storage and causing failure. This value can be set per compute node. +In native threading mode concurrent builds and concurrent snapshot operations +are sharing the same executor and therefore the max limit values are common. +If max_concurrent_builds and max_concurrent_snapshots are set to different +values the bigger value will be used as the combined limit for both. + Possible Values: * ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously diff --git a/nova/tests/fixtures/nova.py b/nova/tests/fixtures/nova.py index 2653e0e5ff..c2ea4b50ac 100644 --- a/nova/tests/fixtures/nova.py +++ b/nova/tests/fixtures/nova.py @@ -1199,14 +1199,17 @@ class IsolatedExecutorFixture(fixtures.Fixture): assert utils.SCATTER_GATHER_EXECUTOR is None assert utils.DEFAULT_EXECUTOR is None assert utils.CACHE_IMAGES_EXECUTOR is None + assert utils.LONG_TASK_EXECUTOR is None origi_get_scatter_gather = utils.get_scatter_gather_executor origi_default_executor = utils._get_default_executor origi_get_cache_images_executor = utils.get_cache_images_executor + origi_get_long_task_executor = utils.get_long_task_executor self.executor = None self.scatter_gather_executor = None self.cache_images_executor = None + self.long_task_executor = None def _get_default_executor(): self.executor = origi_default_executor() @@ -1247,12 +1250,26 @@ class IsolatedExecutorFixture(fixtures.Fixture): self.addCleanup( lambda: self.do_cleanup_executor(self.cache_images_executor)) + def _get_long_task_executor(max_workers): + self.long_task_executor = origi_get_long_task_executor(max_workers) + self.long_task_executor.name = ( + f"{self.test_case_id}.long_task") + return self.long_task_executor + + self.useFixture(fixtures.MonkeyPatch( + 'nova.utils.get_long_task_executor', + _get_long_task_executor)) + + self.addCleanup( + lambda: self.do_cleanup_executor(self.long_task_executor)) + self.addCleanup(self.reset_globals) def reset_globals(self): utils.SCATTER_GATHER_EXECUTOR = None utils.DEFAULT_EXECUTOR = None utils.CACHE_IMAGES_EXECUTOR = None + utils.LONG_TASK_EXECUTOR = None def do_cleanup_executor(self, executor): # NOTE(gibi): we cannot rely on utils.concurrency_mode_threading diff --git a/nova/tests/unit/compute/test_compute.py b/nova/tests/unit/compute/test_compute.py index 9e49034455..9afb81ec77 100644 --- a/nova/tests/unit/compute/test_compute.py +++ b/nova/tests/unit/compute/test_compute.py @@ -4090,10 +4090,9 @@ class ComputeTestCase(BaseTestCase, with mock.patch.object(compute_utils, 'EventReporter') as mock_event: if method == 'snapshot': - self.assertRaises(test.TestingException, - self.compute.snapshot_instance, - self.context, image_id=uuids.snapshot, - instance=inst_obj) + future = self.compute.snapshot_instance( + self.context, image_id=uuids.snapshot, instance=inst_obj) + self.assertRaises(test.TestingException, future.result) mock_event.assert_called_once_with(self.context, 'compute_snapshot_instance', CONF.host, diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py index 0d699751e5..fe446bfd66 100644 --- a/nova/tests/unit/compute/test_compute_mgr.py +++ b/nova/tests/unit/compute/test_compute_mgr.py @@ -972,15 +972,24 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, self._test_max_concurrent_builds() def test_max_concurrent_builds_semaphore_limited(self): + utils.destroy_long_task_executor() self.flags(max_concurrent_builds=123) - self.assertEqual(123, - manager.ComputeManager()._build_semaphore._value) + compute = manager.ComputeManager() + if utils.concurrency_mode_threading(): + self.assertIsInstance( + compute._build_semaphore, compute_utils.UnlimitedSemaphore) + self.assertEqual(123, compute._long_task_executor._max_workers) + else: + self.assertEqual(123, compute._build_semaphore._value) def test_max_concurrent_builds_semaphore_unlimited(self): + utils.destroy_long_task_executor() self.flags(max_concurrent_builds=0) compute = manager.ComputeManager() if utils.concurrency_mode_threading(): - self.assertEqual(10, compute._build_semaphore._value) + self.assertIsInstance( + compute._build_semaphore, compute_utils.UnlimitedSemaphore) + self.assertEqual(10, compute._long_task_executor._max_workers) else: self.assertEqual(1000, compute._build_semaphore._value) @@ -1007,18 +1016,47 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, self._test_max_concurrent_snapshots() def test_max_concurrent_snapshots_semaphore_limited(self): + utils.destroy_long_task_executor() self.flags(max_concurrent_snapshots=123) - self.assertEqual(123, - manager.ComputeManager()._snapshot_semaphore._value) + compute = manager.ComputeManager() + if utils.concurrency_mode_threading(): + self.assertIsInstance( + compute._snapshot_semaphore, compute_utils.UnlimitedSemaphore) + self.assertEqual(123, compute._long_task_executor._max_workers) + else: + self.assertEqual(123, compute._snapshot_semaphore._value) def test_max_concurrent_snapshots_semaphore_unlimited(self): + utils.destroy_long_task_executor() self.flags(max_concurrent_snapshots=0) compute = manager.ComputeManager() if utils.concurrency_mode_threading(): - self.assertEqual(5, compute._snapshot_semaphore._value) + self.assertIsInstance( + compute._snapshot_semaphore, compute_utils.UnlimitedSemaphore) + self.assertEqual(10, compute._long_task_executor._max_workers) else: self.assertEqual(1000, compute._snapshot_semaphore._value) + @mock.patch.object(manager.LOG, 'warning') + def test_max_c_builds_and_snapshots_different_limits(self, mock_log): + utils.destroy_long_task_executor() + self.flags(max_concurrent_builds=124) + self.flags(max_concurrent_snapshots=123) + compute = manager.ComputeManager() + if utils.concurrency_mode_threading(): + self.assertEqual(124, compute._long_task_executor._max_workers) + mock_log.assert_called_once_with( + 'In native threading mode the number of concurrent builds, ' + 'and snapshots should be limited to the same number. ' + 'The current configuration has differing limits: ' + 'max_concurrent_builds: %d, max_concurrent_snapshots: %d. ' + 'Nova will use a single, overall limit of %d for these tasks.', + 124, 123, 124) + else: + self.assertEqual(123, compute._snapshot_semaphore._value) + self.assertEqual(124, compute._build_semaphore._value) + mock_log.assert_not_called() + def test_nil_out_inst_obj_host_and_node_sets_nil(self): instance = fake_instance.fake_instance_obj(self.context, uuid=uuids.instance, diff --git a/nova/utils.py b/nova/utils.py index 0fd4d99646..70f49dbc06 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -1676,3 +1676,35 @@ class StaticallyDelayingCancellableTaskExecutorWrapper: @property def is_alive(self) -> bool: return self._thread.is_alive() + + +LONG_TASK_EXECUTOR: Executor | None = None + + +def get_long_task_executor(max_workers) -> Executor: + """Returns the executor used for long compute operations.""" + global LONG_TASK_EXECUTOR + + if not LONG_TASK_EXECUTOR: + LONG_TASK_EXECUTOR = create_executor(max_workers) + + pname = multiprocessing.current_process().name + executor_name = f"{pname}.long_task" + LONG_TASK_EXECUTOR.name = executor_name + + LOG.info("The long task thread pool %s is initialized", + executor_name) + + return LONG_TASK_EXECUTOR + + +def destroy_long_task_executor(): + """Closes the executor and resets the global to None""" + global LONG_TASK_EXECUTOR + if LONG_TASK_EXECUTOR: + LOG.info( + "The thread pool %s is shutting down", LONG_TASK_EXECUTOR.name) + LONG_TASK_EXECUTOR.shutdown() + LOG.info("The thread pool %s is closed", LONG_TASK_EXECUTOR.name) + + LONG_TASK_EXECUTOR = None diff --git a/releasenotes/notes/deprecate-unlimited-max_concurrent_live_migrations-29c54c7eeb77041c.yaml b/releasenotes/notes/deprecate-unlimited-max_concurrent_live_migrations-29c54c7eeb77041c.yaml index e7276853fb..649a0a6ca3 100644 --- a/releasenotes/notes/deprecate-unlimited-max_concurrent_live_migrations-29c54c7eeb77041c.yaml +++ b/releasenotes/notes/deprecate-unlimited-max_concurrent_live_migrations-29c54c7eeb77041c.yaml @@ -36,6 +36,14 @@ upgrade: now reduced to 5 native threads. Please also read the `concurrency `__ guide for more details. + - | + In native threading mode the limit expressed by + ``[DEFAULT]max_concurrent_builds`` and + ``[DEFAULT]max_concurrent_snapshots`` configuration options are shared + across the two operation types as they are executed by the same Executor. + Therefore the two config options need to be set to the same value. If not + then nova will use the bigger value as the shared limit. The shared limit + also means that one operation type can consume the whole limit. deprecations: - | The possible 0 value of the configuration option