[compute]Use single long task executor
Move the execution of build_and_run_instance and snapshot_instance to one common long task executor. Originally snapshot ran on the RPC pool, build_and_run_instance ran on the default pool. Also each of these tasks had a separate concurrency limit enforced by a semaphore. After this patch each of these tasks use a common Executor. The size of that executor and the way how we limit the concurrency differs in eventlet and in native threading mode. In eventlet mode we have one big Executor with "unlimit" size and individual semaphores are used for each task type to enforce the configured limits. In threading mode we requests the admin to configure the 2 limits to the same number, and we warn if not. We use that limit (or the max of the 2 limits) as the size of the long task Executor. As the limits are the same we don't enforce individual limit any more. The executor size will ensure the shared limit is kept. As the limit is shared a single operation type can consume the whole limit. Note that while live migration is a long-running task we cannot put it into the same long_task_executor as build and snapshot as we need: 1. a very small limit of concurrent live migrations compared to builds and snapshots 2. a way to cancel live migrations easily that are waiting due to the limit Change-Id: I88a6a593af8a5b518715e1245a76ee54752afe83 Signed-off-by: Balazs Gibizer <gibi@redhat.com>
This commit is contained in:
@@ -96,6 +96,13 @@ tasks to be executed concurrently.
|
|||||||
more performant live migration is needed then enable
|
more performant live migration is needed then enable
|
||||||
:oslo.config:option:`libvirt.live_migration_parallel_connections` instead.
|
:oslo.config:option:`libvirt.live_migration_parallel_connections` instead.
|
||||||
|
|
||||||
|
* :oslo.config:option:`max_concurrent_builds` and
|
||||||
|
:oslo.config:option:`max_concurrent_snapshots`: In native threading mode
|
||||||
|
both types of operations using a common shared executor to free up the RPC
|
||||||
|
handler workers. Therefore both type of operations are counted against the
|
||||||
|
same shared maximum limit. If the two options are set to different values
|
||||||
|
then the shared limit will be the bigger of the two values.
|
||||||
|
|
||||||
Seeing the usage of the pools
|
Seeing the usage of the pools
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
|||||||
+66
-8
@@ -672,10 +672,49 @@ class ComputeManager(manager.Manager):
|
|||||||
self._syncs_in_progress_lock = threading.Lock()
|
self._syncs_in_progress_lock = threading.Lock()
|
||||||
self.send_instance_updates = (
|
self.send_instance_updates = (
|
||||||
CONF.filter_scheduler.track_instance_changes)
|
CONF.filter_scheduler.track_instance_changes)
|
||||||
self._build_semaphore = threading.Semaphore(
|
|
||||||
self._get_max_concurrent_builds())
|
max_builds = self._get_max_concurrent_builds()
|
||||||
self._snapshot_semaphore = threading.Semaphore(
|
max_snapshots = self._get_max_concurrent_snapshots()
|
||||||
self._get_max_concurrent_snapshots())
|
|
||||||
|
if utils.concurrency_mode_threading():
|
||||||
|
max_tasks = max(max_builds, max_snapshots)
|
||||||
|
|
||||||
|
if max_builds != max_snapshots:
|
||||||
|
LOG.warning(
|
||||||
|
"In native threading mode the number of concurrent "
|
||||||
|
"builds, and snapshots should be limited to the "
|
||||||
|
"same number. The current configuration has differing "
|
||||||
|
"limits: max_concurrent_builds: %d, "
|
||||||
|
"max_concurrent_snapshots: %d. "
|
||||||
|
"Nova will use a single, overall limit of %d for these "
|
||||||
|
"tasks.",
|
||||||
|
max_builds, max_snapshots, max_tasks)
|
||||||
|
|
||||||
|
self._long_task_executor = utils.get_long_task_executor(max_tasks)
|
||||||
|
|
||||||
|
# In threading mode we want to use the size of the executor to
|
||||||
|
# act as the limit of concurrent execution. So neuter the
|
||||||
|
# semaphores here.
|
||||||
|
# TODO(gibi): remove the semaphores once eventlet mode is removed
|
||||||
|
self._build_semaphore = compute_utils.UnlimitedSemaphore()
|
||||||
|
self._snapshot_semaphore = compute_utils.UnlimitedSemaphore()
|
||||||
|
|
||||||
|
else:
|
||||||
|
# In eventlet mode we use the individual semaphores to limit
|
||||||
|
# the concurrent tasks, so just create a big Executor to
|
||||||
|
# potentially host all of them
|
||||||
|
self._long_task_executor = utils.get_long_task_executor(
|
||||||
|
max_builds + max_snapshots)
|
||||||
|
|
||||||
|
self._build_semaphore = threading.Semaphore(max_builds)
|
||||||
|
self._snapshot_semaphore = threading.Semaphore(max_snapshots)
|
||||||
|
|
||||||
|
# While live migration is a long-running task we cannot put it into
|
||||||
|
# the same long_task_executor as build and snapshot as we need:
|
||||||
|
# 1. a very small limit of concurrent live migrations compared to
|
||||||
|
# builds and snapshots
|
||||||
|
# 2. a way to cancel live migrations easily that are waiting due to the
|
||||||
|
# limit
|
||||||
self._live_migration_executor = nova.utils.create_executor(
|
self._live_migration_executor = nova.utils.create_executor(
|
||||||
max_workers=self._get_max_concurrent_live_migrations())
|
max_workers=self._get_max_concurrent_live_migrations())
|
||||||
|
|
||||||
@@ -1854,6 +1893,10 @@ class ComputeManager(manager.Manager):
|
|||||||
self.instance_events.cancel_all_events()
|
self.instance_events.cancel_all_events()
|
||||||
self.driver.cleanup_host(host=self.host)
|
self.driver.cleanup_host(host=self.host)
|
||||||
self._cleanup_live_migrations_in_pool()
|
self._cleanup_live_migrations_in_pool()
|
||||||
|
# NOTE: graceful shutdown needs to take care of the executors
|
||||||
|
# self._sync_power_executor.shutdown()
|
||||||
|
# utils.destroy_long_task_executor()
|
||||||
|
# utils.destroy_default_executor()
|
||||||
|
|
||||||
def _cleanup_live_migrations_in_pool(self):
|
def _cleanup_live_migrations_in_pool(self):
|
||||||
# Shutdown the pool so we don't get new requests.
|
# Shutdown the pool so we don't get new requests.
|
||||||
@@ -2489,7 +2532,8 @@ class ComputeManager(manager.Manager):
|
|||||||
# NOTE(danms): We spawn here to return the RPC worker thread back to
|
# NOTE(danms): We spawn here to return the RPC worker thread back to
|
||||||
# the pool. Since what follows could take a really long time, we don't
|
# the pool. Since what follows could take a really long time, we don't
|
||||||
# want to tie up RPC workers.
|
# want to tie up RPC workers.
|
||||||
utils.spawn(_locked_do_build_and_run_instance,
|
utils.spawn_on(self._long_task_executor,
|
||||||
|
_locked_do_build_and_run_instance,
|
||||||
context, instance, image, request_spec,
|
context, instance, image, request_spec,
|
||||||
filter_properties, admin_password, injected_files,
|
filter_properties, admin_password, injected_files,
|
||||||
requested_networks, security_groups,
|
requested_networks, security_groups,
|
||||||
@@ -4657,10 +4701,24 @@ class ComputeManager(manager.Manager):
|
|||||||
instance=instance)
|
instance=instance)
|
||||||
return
|
return
|
||||||
|
|
||||||
with self._snapshot_semaphore:
|
def do_snapshot_instance(
|
||||||
self._snapshot_instance(context, image_id, instance,
|
context, image_id, instance, expected_task_state
|
||||||
task_states.IMAGE_SNAPSHOT)
|
):
|
||||||
|
with self._snapshot_semaphore:
|
||||||
|
self._snapshot_instance(context, image_id, instance,
|
||||||
|
expected_task_state)
|
||||||
|
|
||||||
|
# NOTE(gibi): We spawn a separate task as this can be a long-running
|
||||||
|
# operation, and we want to return the RPC worker to its executor to
|
||||||
|
# avoid blocking RPC traffic.
|
||||||
|
return utils.spawn_on(
|
||||||
|
self._long_task_executor, do_snapshot_instance, context,
|
||||||
|
image_id, instance, task_states.IMAGE_SNAPSHOT)
|
||||||
|
|
||||||
|
@wrap_exception()
|
||||||
|
@reverts_task_state
|
||||||
|
@wrap_instance_fault
|
||||||
|
@delete_image_on_error
|
||||||
def _snapshot_instance(self, context, image_id, instance,
|
def _snapshot_instance(self, context, image_id, instance,
|
||||||
expected_task_state):
|
expected_task_state):
|
||||||
context = context.elevated()
|
context = context.elevated()
|
||||||
|
|||||||
@@ -664,6 +664,11 @@ instances, if asked to do so. This limit is enforced to avoid building
|
|||||||
unlimited instance concurrently on a compute node. This value can be set
|
unlimited instance concurrently on a compute node. This value can be set
|
||||||
per compute node.
|
per compute node.
|
||||||
|
|
||||||
|
In native threading mode concurrent builds and concurrent snapshot operations
|
||||||
|
are sharing the same executor and therefore the max limit values are common.
|
||||||
|
If max_concurrent_builds and max_concurrent_snapshots are set to different
|
||||||
|
values the bigger value will be used as the combined limit for both.
|
||||||
|
|
||||||
Possible Values:
|
Possible Values:
|
||||||
|
|
||||||
* ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously
|
* ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously
|
||||||
@@ -682,6 +687,11 @@ This limit is enforced to prevent snapshots overwhelming the
|
|||||||
host/network/storage and causing failure. This value can be set per
|
host/network/storage and causing failure. This value can be set per
|
||||||
compute node.
|
compute node.
|
||||||
|
|
||||||
|
In native threading mode concurrent builds and concurrent snapshot operations
|
||||||
|
are sharing the same executor and therefore the max limit values are common.
|
||||||
|
If max_concurrent_builds and max_concurrent_snapshots are set to different
|
||||||
|
values the bigger value will be used as the combined limit for both.
|
||||||
|
|
||||||
Possible Values:
|
Possible Values:
|
||||||
|
|
||||||
* ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously
|
* ``0``: Deprecated since 33.0.0 (2026.1 Gazpacho). This value was previously
|
||||||
|
|||||||
Vendored
+17
@@ -1199,14 +1199,17 @@ class IsolatedExecutorFixture(fixtures.Fixture):
|
|||||||
assert utils.SCATTER_GATHER_EXECUTOR is None
|
assert utils.SCATTER_GATHER_EXECUTOR is None
|
||||||
assert utils.DEFAULT_EXECUTOR is None
|
assert utils.DEFAULT_EXECUTOR is None
|
||||||
assert utils.CACHE_IMAGES_EXECUTOR is None
|
assert utils.CACHE_IMAGES_EXECUTOR is None
|
||||||
|
assert utils.LONG_TASK_EXECUTOR is None
|
||||||
|
|
||||||
origi_get_scatter_gather = utils.get_scatter_gather_executor
|
origi_get_scatter_gather = utils.get_scatter_gather_executor
|
||||||
origi_default_executor = utils._get_default_executor
|
origi_default_executor = utils._get_default_executor
|
||||||
origi_get_cache_images_executor = utils.get_cache_images_executor
|
origi_get_cache_images_executor = utils.get_cache_images_executor
|
||||||
|
origi_get_long_task_executor = utils.get_long_task_executor
|
||||||
|
|
||||||
self.executor = None
|
self.executor = None
|
||||||
self.scatter_gather_executor = None
|
self.scatter_gather_executor = None
|
||||||
self.cache_images_executor = None
|
self.cache_images_executor = None
|
||||||
|
self.long_task_executor = None
|
||||||
|
|
||||||
def _get_default_executor():
|
def _get_default_executor():
|
||||||
self.executor = origi_default_executor()
|
self.executor = origi_default_executor()
|
||||||
@@ -1247,12 +1250,26 @@ class IsolatedExecutorFixture(fixtures.Fixture):
|
|||||||
self.addCleanup(
|
self.addCleanup(
|
||||||
lambda: self.do_cleanup_executor(self.cache_images_executor))
|
lambda: self.do_cleanup_executor(self.cache_images_executor))
|
||||||
|
|
||||||
|
def _get_long_task_executor(max_workers):
|
||||||
|
self.long_task_executor = origi_get_long_task_executor(max_workers)
|
||||||
|
self.long_task_executor.name = (
|
||||||
|
f"{self.test_case_id}.long_task")
|
||||||
|
return self.long_task_executor
|
||||||
|
|
||||||
|
self.useFixture(fixtures.MonkeyPatch(
|
||||||
|
'nova.utils.get_long_task_executor',
|
||||||
|
_get_long_task_executor))
|
||||||
|
|
||||||
|
self.addCleanup(
|
||||||
|
lambda: self.do_cleanup_executor(self.long_task_executor))
|
||||||
|
|
||||||
self.addCleanup(self.reset_globals)
|
self.addCleanup(self.reset_globals)
|
||||||
|
|
||||||
def reset_globals(self):
|
def reset_globals(self):
|
||||||
utils.SCATTER_GATHER_EXECUTOR = None
|
utils.SCATTER_GATHER_EXECUTOR = None
|
||||||
utils.DEFAULT_EXECUTOR = None
|
utils.DEFAULT_EXECUTOR = None
|
||||||
utils.CACHE_IMAGES_EXECUTOR = None
|
utils.CACHE_IMAGES_EXECUTOR = None
|
||||||
|
utils.LONG_TASK_EXECUTOR = None
|
||||||
|
|
||||||
def do_cleanup_executor(self, executor):
|
def do_cleanup_executor(self, executor):
|
||||||
# NOTE(gibi): we cannot rely on utils.concurrency_mode_threading
|
# NOTE(gibi): we cannot rely on utils.concurrency_mode_threading
|
||||||
|
|||||||
@@ -4090,10 +4090,9 @@ class ComputeTestCase(BaseTestCase,
|
|||||||
with mock.patch.object(compute_utils,
|
with mock.patch.object(compute_utils,
|
||||||
'EventReporter') as mock_event:
|
'EventReporter') as mock_event:
|
||||||
if method == 'snapshot':
|
if method == 'snapshot':
|
||||||
self.assertRaises(test.TestingException,
|
future = self.compute.snapshot_instance(
|
||||||
self.compute.snapshot_instance,
|
self.context, image_id=uuids.snapshot, instance=inst_obj)
|
||||||
self.context, image_id=uuids.snapshot,
|
self.assertRaises(test.TestingException, future.result)
|
||||||
instance=inst_obj)
|
|
||||||
mock_event.assert_called_once_with(self.context,
|
mock_event.assert_called_once_with(self.context,
|
||||||
'compute_snapshot_instance',
|
'compute_snapshot_instance',
|
||||||
CONF.host,
|
CONF.host,
|
||||||
|
|||||||
@@ -972,15 +972,24 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
|||||||
self._test_max_concurrent_builds()
|
self._test_max_concurrent_builds()
|
||||||
|
|
||||||
def test_max_concurrent_builds_semaphore_limited(self):
|
def test_max_concurrent_builds_semaphore_limited(self):
|
||||||
|
utils.destroy_long_task_executor()
|
||||||
self.flags(max_concurrent_builds=123)
|
self.flags(max_concurrent_builds=123)
|
||||||
self.assertEqual(123,
|
compute = manager.ComputeManager()
|
||||||
manager.ComputeManager()._build_semaphore._value)
|
if utils.concurrency_mode_threading():
|
||||||
|
self.assertIsInstance(
|
||||||
|
compute._build_semaphore, compute_utils.UnlimitedSemaphore)
|
||||||
|
self.assertEqual(123, compute._long_task_executor._max_workers)
|
||||||
|
else:
|
||||||
|
self.assertEqual(123, compute._build_semaphore._value)
|
||||||
|
|
||||||
def test_max_concurrent_builds_semaphore_unlimited(self):
|
def test_max_concurrent_builds_semaphore_unlimited(self):
|
||||||
|
utils.destroy_long_task_executor()
|
||||||
self.flags(max_concurrent_builds=0)
|
self.flags(max_concurrent_builds=0)
|
||||||
compute = manager.ComputeManager()
|
compute = manager.ComputeManager()
|
||||||
if utils.concurrency_mode_threading():
|
if utils.concurrency_mode_threading():
|
||||||
self.assertEqual(10, compute._build_semaphore._value)
|
self.assertIsInstance(
|
||||||
|
compute._build_semaphore, compute_utils.UnlimitedSemaphore)
|
||||||
|
self.assertEqual(10, compute._long_task_executor._max_workers)
|
||||||
else:
|
else:
|
||||||
self.assertEqual(1000, compute._build_semaphore._value)
|
self.assertEqual(1000, compute._build_semaphore._value)
|
||||||
|
|
||||||
@@ -1007,18 +1016,47 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
|
|||||||
self._test_max_concurrent_snapshots()
|
self._test_max_concurrent_snapshots()
|
||||||
|
|
||||||
def test_max_concurrent_snapshots_semaphore_limited(self):
|
def test_max_concurrent_snapshots_semaphore_limited(self):
|
||||||
|
utils.destroy_long_task_executor()
|
||||||
self.flags(max_concurrent_snapshots=123)
|
self.flags(max_concurrent_snapshots=123)
|
||||||
self.assertEqual(123,
|
compute = manager.ComputeManager()
|
||||||
manager.ComputeManager()._snapshot_semaphore._value)
|
if utils.concurrency_mode_threading():
|
||||||
|
self.assertIsInstance(
|
||||||
|
compute._snapshot_semaphore, compute_utils.UnlimitedSemaphore)
|
||||||
|
self.assertEqual(123, compute._long_task_executor._max_workers)
|
||||||
|
else:
|
||||||
|
self.assertEqual(123, compute._snapshot_semaphore._value)
|
||||||
|
|
||||||
def test_max_concurrent_snapshots_semaphore_unlimited(self):
|
def test_max_concurrent_snapshots_semaphore_unlimited(self):
|
||||||
|
utils.destroy_long_task_executor()
|
||||||
self.flags(max_concurrent_snapshots=0)
|
self.flags(max_concurrent_snapshots=0)
|
||||||
compute = manager.ComputeManager()
|
compute = manager.ComputeManager()
|
||||||
if utils.concurrency_mode_threading():
|
if utils.concurrency_mode_threading():
|
||||||
self.assertEqual(5, compute._snapshot_semaphore._value)
|
self.assertIsInstance(
|
||||||
|
compute._snapshot_semaphore, compute_utils.UnlimitedSemaphore)
|
||||||
|
self.assertEqual(10, compute._long_task_executor._max_workers)
|
||||||
else:
|
else:
|
||||||
self.assertEqual(1000, compute._snapshot_semaphore._value)
|
self.assertEqual(1000, compute._snapshot_semaphore._value)
|
||||||
|
|
||||||
|
@mock.patch.object(manager.LOG, 'warning')
|
||||||
|
def test_max_c_builds_and_snapshots_different_limits(self, mock_log):
|
||||||
|
utils.destroy_long_task_executor()
|
||||||
|
self.flags(max_concurrent_builds=124)
|
||||||
|
self.flags(max_concurrent_snapshots=123)
|
||||||
|
compute = manager.ComputeManager()
|
||||||
|
if utils.concurrency_mode_threading():
|
||||||
|
self.assertEqual(124, compute._long_task_executor._max_workers)
|
||||||
|
mock_log.assert_called_once_with(
|
||||||
|
'In native threading mode the number of concurrent builds, '
|
||||||
|
'and snapshots should be limited to the same number. '
|
||||||
|
'The current configuration has differing limits: '
|
||||||
|
'max_concurrent_builds: %d, max_concurrent_snapshots: %d. '
|
||||||
|
'Nova will use a single, overall limit of %d for these tasks.',
|
||||||
|
124, 123, 124)
|
||||||
|
else:
|
||||||
|
self.assertEqual(123, compute._snapshot_semaphore._value)
|
||||||
|
self.assertEqual(124, compute._build_semaphore._value)
|
||||||
|
mock_log.assert_not_called()
|
||||||
|
|
||||||
def test_nil_out_inst_obj_host_and_node_sets_nil(self):
|
def test_nil_out_inst_obj_host_and_node_sets_nil(self):
|
||||||
instance = fake_instance.fake_instance_obj(self.context,
|
instance = fake_instance.fake_instance_obj(self.context,
|
||||||
uuid=uuids.instance,
|
uuid=uuids.instance,
|
||||||
|
|||||||
@@ -1613,3 +1613,35 @@ class StaticallyDelayingCancellableTaskExecutorWrapper:
|
|||||||
@property
|
@property
|
||||||
def is_alive(self) -> bool:
|
def is_alive(self) -> bool:
|
||||||
return self._thread.is_alive()
|
return self._thread.is_alive()
|
||||||
|
|
||||||
|
|
||||||
|
LONG_TASK_EXECUTOR: Executor | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_long_task_executor(max_workers) -> Executor:
|
||||||
|
"""Returns the executor used for long compute operations."""
|
||||||
|
global LONG_TASK_EXECUTOR
|
||||||
|
|
||||||
|
if not LONG_TASK_EXECUTOR:
|
||||||
|
LONG_TASK_EXECUTOR = create_executor(max_workers)
|
||||||
|
|
||||||
|
pname = multiprocessing.current_process().name
|
||||||
|
executor_name = f"{pname}.long_task"
|
||||||
|
LONG_TASK_EXECUTOR.name = executor_name
|
||||||
|
|
||||||
|
LOG.info("The long task thread pool %s is initialized",
|
||||||
|
executor_name)
|
||||||
|
|
||||||
|
return LONG_TASK_EXECUTOR
|
||||||
|
|
||||||
|
|
||||||
|
def destroy_long_task_executor():
|
||||||
|
"""Closes the executor and resets the global to None"""
|
||||||
|
global LONG_TASK_EXECUTOR
|
||||||
|
if LONG_TASK_EXECUTOR:
|
||||||
|
LOG.info(
|
||||||
|
"The thread pool %s is shutting down", LONG_TASK_EXECUTOR.name)
|
||||||
|
LONG_TASK_EXECUTOR.shutdown()
|
||||||
|
LOG.info("The thread pool %s is closed", LONG_TASK_EXECUTOR.name)
|
||||||
|
|
||||||
|
LONG_TASK_EXECUTOR = None
|
||||||
|
|||||||
+8
@@ -36,6 +36,14 @@ upgrade:
|
|||||||
now reduced to 5 native threads. Please also read the `concurrency
|
now reduced to 5 native threads. Please also read the `concurrency
|
||||||
<https://docs.openstack.org/nova/latest/admin/concurrency.html>`__
|
<https://docs.openstack.org/nova/latest/admin/concurrency.html>`__
|
||||||
guide for more details.
|
guide for more details.
|
||||||
|
- |
|
||||||
|
In native threading mode the limit expressed by
|
||||||
|
``[DEFAULT]max_concurrent_builds`` and
|
||||||
|
``[DEFAULT]max_concurrent_snapshots`` configuration options are shared
|
||||||
|
across the two operation types as they are executed by the same Executor.
|
||||||
|
Therefore the two config options need to be set to the same value. If not
|
||||||
|
then nova will use the bigger value as the shared limit. The shared limit
|
||||||
|
also means that one operation type can consume the whole limit.
|
||||||
deprecations:
|
deprecations:
|
||||||
- |
|
- |
|
||||||
The possible 0 value of the configuration option
|
The possible 0 value of the configuration option
|
||||||
|
|||||||
Reference in New Issue
Block a user