From d90e7726c0b72966a0d3df9e922e0df5df04b114 Mon Sep 17 00:00:00 2001 From: Balazs Gibizer Date: Thu, 24 Apr 2025 14:48:24 +0200 Subject: [PATCH] Use futurist for _get_default_green_pool() Nova uses nova.utils.spawn* to create new threads. This so far relied on a GreenPool to provide GreenThreads. We changes this pool to a futurist.GreenThreadPoolExecutor to have an interface where the implementation can be swapped out to futurist.ThreadPoolExecutor to get native threads instead. This is an interface change on utils.spawn as it will return futurist.Future instead of GreenThread. So couple of fixes needed across nova to use: * .result() instead of .wait() * .add_done_callback() instead of .link(). Here we needed to change the usage as the new callback does not forward args, so we rely on closures instead. This is also an interface and a behavior change for utils.spawn_n as it now calls utils.spawn internally. This means that top of the above detailed interface change there is behavior change for spawn_n. The spawn creates GreenThread a wrapper around greenlet while spawn_n created only the underlying greenlet. The greenlet cannot be managed the same way as a more intelligent GreenThread, including the return value but not limited to it, e.g. the whole cancellation mechanism is missing from greenlet too. After this patch spawn_n will also use GreenThread instead of naked greenlet. We consider the resulting small performance change negligible. Also the way we implement SpawnIsSynchronousFixture in our test is adapted along with other test fixture adaptation to call / mock the right functions. Change-Id: I3494660e1aaa1db46f9f08494cb5817ec7020cc5 Signed-off-by: Balazs Gibizer --- nova/cmd/scheduler.py | 1 + nova/network/model.py | 8 ++--- nova/tests/fixtures/nova.py | 49 ++++++++++----------------- nova/tests/unit/cmd/test_scheduler.py | 5 ++- nova/tests/unit/test_fixtures.py | 38 +++++---------------- nova/tests/unit/test_utils.py | 41 +++++++++++++++++++--- nova/utils.py | 41 ++++++++++++++++++---- nova/virt/libvirt/driver.py | 9 ++--- 8 files changed, 109 insertions(+), 83 deletions(-) diff --git a/nova/cmd/scheduler.py b/nova/cmd/scheduler.py index a9cc3f2b96..6e2017b3d7 100644 --- a/nova/cmd/scheduler.py +++ b/nova/cmd/scheduler.py @@ -58,6 +58,7 @@ def main(): # state (i.e. number of workers idle in the pool). A long therm solution # would be to use os.spawn instead of os.fork for the workers. utils.destroy_scatter_gather_executor() + utils.destroy_default_green_pool() service.serve(server, workers=workers) service.wait() diff --git a/nova/network/model.py b/nova/network/model.py index a7d5849a1a..16c925f6af 100644 --- a/nova/network/model.py +++ b/nova/network/model.py @@ -577,7 +577,7 @@ class NetworkInfoAsyncWrapper(NetworkInfo): def __init__(self, async_method, *args, **kwargs): super(NetworkInfoAsyncWrapper, self).__init__() - self._gt = utils.spawn(async_method, *args, **kwargs) + self._future = utils.spawn(async_method, *args, **kwargs) methods = ['json', 'fixed_ips', 'floating_ips'] for method in methods: fn = getattr(self, method) @@ -612,16 +612,16 @@ class NetworkInfoAsyncWrapper(NetworkInfo): def wait(self, do_raise=True): """Wait for asynchronous call to finish.""" - if self._gt is not None: + if self._future is not None: try: # NOTE(comstud): This looks funky, but this object is # subclassed from list. In other words, 'self' is really # just a list with a bunch of extra methods. So this # line just replaces the current list (which should be # empty) with the result. - self[:] = self._gt.wait() + self[:] = self._future.result() except Exception: if do_raise: raise finally: - self._gt = None + self._future = None diff --git a/nova/tests/fixtures/nova.py b/nova/tests/fixtures/nova.py index 557a784e9a..dd7ee0ca22 100644 --- a/nova/tests/fixtures/nova.py +++ b/nova/tests/fixtures/nova.py @@ -1208,6 +1208,7 @@ class IsolatedGreenPoolFixture(fixtures.Fixture): def _get_default_green_pool(): self.greenpool = origi_default_green_pool() + self.greenpool.name = f"{self.test_case_id}.default" return self.greenpool # NOTE(sean-k-mooney): greenpools use eventlet.spawn and # eventlet.spawn_n so we can't stub out all calls to those functions. @@ -1216,7 +1217,7 @@ class IsolatedGreenPoolFixture(fixtures.Fixture): # Greenthreads created via the standard lib threading module. self.useFixture(fixtures.MonkeyPatch( 'nova.utils._get_default_green_pool', _get_default_green_pool)) - self.addCleanup(self.do_cleanup_default) + self.addCleanup(lambda: self.do_cleanup_executor(self.greenpool)) def _get_scatter_gather_executor(): self.scatter_gather_executor = origi_get_scatter_gather() @@ -1228,11 +1229,16 @@ class IsolatedGreenPoolFixture(fixtures.Fixture): 'nova.utils.get_scatter_gather_executor', _get_scatter_gather_executor)) - self.addCleanup(self.do_cleanup_scatter_gather) + self.addCleanup( + lambda: self.do_cleanup_executor(self.scatter_gather_executor)) - def do_cleanup_scatter_gather(self): + self.addCleanup(self.reset_globals) + + def reset_globals(self): utils.SCATTER_GATHER_EXECUTOR = None - executor = self.scatter_gather_executor + utils.DEFAULT_GREEN_POOL = None + + def do_cleanup_executor(self, executor): # NOTE(gibi): we cannot rely on utils.concurrency_mode_threading # as that might have been mocked during the test when the executor # was created, but during cleanup the mock is already removed. @@ -1287,19 +1293,8 @@ class IsolatedGreenPoolFixture(fixtures.Fixture): 'and therefore are not expected to return or raise.' ) - def do_cleanup_default(self): - if self.greenpool and self.greenpool.running: - # kill all greenthreads in the pool before raising to prevent - # them from interfering with other tests. - for gt in list(self.greenpool.coroutines_running): - if isinstance(gt, eventlet.greenthread.GreenThread): - gt.kill() - # reset the global greenpool just in case. - utils.DEFAULT_GREEN_POOL = None - self._raise_on_green_pool(self.greenpool) - -class _FakeGreenThread(object): +class _FakeFuture(object): def __init__(self, func, *args, **kwargs): try: self._result = func(*args, **kwargs) @@ -1313,20 +1308,10 @@ class _FakeGreenThread(object): # defined to satisfy the interface. pass - def kill(self, *args, **kwargs): - # This method doesn't make sense for a synchronous call, it's just - # defined to satisfy the interface. - pass + def add_done_callback(self, func): + func(self) - def link(self, func, *args, **kwargs): - func(self, *args, **kwargs) - - def unlink(self, func, *args, **kwargs): - # This method doesn't make sense for a synchronous call, it's just - # defined to satisfy the interface. - pass - - def wait(self): + def result(self): if self.raised: raise self._result @@ -1334,14 +1319,14 @@ class _FakeGreenThread(object): class SpawnIsSynchronousFixture(fixtures.Fixture): - """Patch and restore the spawn_n utility method to be synchronous""" + """Patch and restore the spawn_* utility methods to be synchronous""" def setUp(self): super(SpawnIsSynchronousFixture, self).setUp() self.useFixture(fixtures.MonkeyPatch( - 'nova.utils.spawn_n', _FakeGreenThread)) + 'nova.utils.spawn_n', _FakeFuture)) self.useFixture(fixtures.MonkeyPatch( - 'nova.utils.spawn', _FakeGreenThread)) + 'nova.utils.spawn', _FakeFuture)) class BannedDBSchemaOperations(fixtures.Fixture): diff --git a/nova/tests/unit/cmd/test_scheduler.py b/nova/tests/unit/cmd/test_scheduler.py index 63c388b791..7f7ad95f8b 100644 --- a/nova/tests/unit/cmd/test_scheduler.py +++ b/nova/tests/unit/cmd/test_scheduler.py @@ -55,7 +55,8 @@ class TestScheduler(test.NoDBTestCase): self, mock_wait, mock_serve, service_create ): # simulate that the thread pool is initialized before the fork - executor = utils.get_scatter_gather_executor() + executor = utils._get_default_green_pool() + sc_executor = utils.get_scatter_gather_executor() scheduler.main() mock_serve.assert_called_once_with( @@ -63,4 +64,6 @@ class TestScheduler(test.NoDBTestCase): mock_wait.assert_called_once_with() # check that the executor was properly destroyed self.assertFalse(executor.alive) + self.assertIsNone(utils.DEFAULT_GREEN_POOL) + self.assertFalse(sc_executor.alive) self.assertIsNone(utils.SCATTER_GATHER_EXECUTOR) diff --git a/nova/tests/unit/test_fixtures.py b/nova/tests/unit/test_fixtures.py index 01505cdfca..2ac4e8b922 100644 --- a/nova/tests/unit/test_fixtures.py +++ b/nova/tests/unit/test_fixtures.py @@ -290,44 +290,22 @@ class TestSpawnIsSynchronousFixture(testtools.TestCase): utils.spawn_n(tester.function, 'foo', bar='bar') tester.function.assert_called_once_with('foo', bar='bar') - def test_spawn_return_has_wait(self): + def test_spawn_return_has_result(self): self.useFixture(fixtures.SpawnIsSynchronousFixture()) - gt = utils.spawn(lambda x: '%s' % x, 'foo') - foo = gt.wait() + future = utils.spawn(lambda x: '%s' % x, 'foo') + foo = future.result() self.assertEqual('foo', foo) - def test_spawn_n_return_has_wait(self): + def test_spawn_callback(self): self.useFixture(fixtures.SpawnIsSynchronousFixture()) - gt = utils.spawn_n(lambda x: '%s' % x, 'foo') - foo = gt.wait() - self.assertEqual('foo', foo) - - def test_spawn_has_link(self): - self.useFixture(fixtures.SpawnIsSynchronousFixture()) - gt = utils.spawn(mock.MagicMock) - passed_arg = 'test' + future = utils.spawn(mock.MagicMock) call_count = [] - def fake(thread, param): - self.assertEqual(gt, thread) - self.assertEqual(passed_arg, param) + def fake(thread): + self.assertEqual(future, thread) call_count.append(1) - gt.link(fake, passed_arg) - self.assertEqual(1, len(call_count)) - - def test_spawn_n_has_link(self): - self.useFixture(fixtures.SpawnIsSynchronousFixture()) - gt = utils.spawn_n(mock.MagicMock) - passed_arg = 'test' - call_count = [] - - def fake(thread, param): - self.assertEqual(gt, thread) - self.assertEqual(passed_arg, param) - call_count.append(1) - - gt.link(fake, passed_arg) + future.add_done_callback(fake) self.assertEqual(1, len(call_count)) diff --git a/nova/tests/unit/test_utils.py b/nova/tests/unit/test_utils.py index e1ba807cfd..eadd555b6a 100644 --- a/nova/tests/unit/test_utils.py +++ b/nova/tests/unit/test_utils.py @@ -772,7 +772,7 @@ class SpawnNTestCase(test.NoDBTestCase): def fake(arg): pass pool = utils._get_default_green_pool() - with mock.patch.object(pool, self.spawn_name, _fake_spawn): + with mock.patch.object(pool, "submit", _fake_spawn): getattr(utils, self.spawn_name)(fake, 'test') self.assertIsNone(common_context.get_current()) @@ -790,7 +790,7 @@ class SpawnNTestCase(test.NoDBTestCase): pass pool = utils._get_default_green_pool() - with mock.patch.object(pool, self.spawn_name, _fake_spawn): + with mock.patch.object(pool, "submit", _fake_spawn): getattr(utils, self.spawn_name)(fake, ctxt, kwarg1='test') self.assertEqual(ctxt, common_context.get_current()) @@ -811,7 +811,7 @@ class SpawnNTestCase(test.NoDBTestCase): pass pool = utils._get_default_green_pool() - with mock.patch.object(pool, self.spawn_name, _fake_spawn): + with mock.patch.object(pool, "submit", _fake_spawn): getattr(utils, self.spawn_name)(fake, ctxt_passed, kwarg1='test') self.assertEqual(ctxt, common_context.get_current()) @@ -1458,8 +1458,11 @@ class LatchErrorOnRaiseTests(test.NoDBTestCase): class ScatterGatherExecutorTestCase(test.NoDBTestCase): def test_executor_is_named(self): executor = utils.get_scatter_gather_executor() - # NOTE(gibi): during test we use a test-case-specific name, outside - # of test we use process name specific name instead. + # NOTE(gibi): The executor is name both in normal run and in the test + # env. During testing we use a test-case-specific name, outside + # of test we use process name specific name instead. The test case + # specific name is added to help troubleshooting leaked executors + # between test case. self.assertRegex(executor.name, "nova.tests.unit.test_utils.ScatterGatherExecutor.*" "test_executor_is_named.cell_worker") @@ -1487,3 +1490,31 @@ class ScatterGatherExecutorTestCase(test.NoDBTestCase): utils.destroy_scatter_gather_executor() self.assertIsNone(utils.SCATTER_GATHER_EXECUTOR) self.assertFalse(executor.alive) + + +class DefaultExecutorTestCase(test.NoDBTestCase): + def test_executor_is_named(self): + executor = utils._get_default_green_pool() + # NOTE(gibi): The executor is name both in normal run and in the test + # env. During testing we use a test-case-specific name, outside + # of test we use process name specific name instead. The test case + # specific name is added to help troubleshooting leaked executors + # between test case. + self.assertRegex(executor.name, + "nova.tests.unit.test_utils.DefaultExecutor.*" + "test_executor_is_named.default") + + @mock.patch.object( + utils, 'concurrency_mode_threading', new=mock.Mock(return_value=False)) + def test_executor_type_eventlet(self): + executor = utils._get_default_green_pool() + + self.assertEqual('GreenThreadPoolExecutor', type(executor).__name__) + + def test_executor_destroy(self): + executor = utils._get_default_green_pool() + self.assertIsNotNone(utils.DEFAULT_GREEN_POOL) + + utils.destroy_default_green_pool() + self.assertIsNone(utils.DEFAULT_GREEN_POOL) + self.assertFalse(executor.alive) diff --git a/nova/utils.py b/nova/utils.py index 94f6df5853..c05b68a6a0 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -29,7 +29,6 @@ import shutil import tempfile import time -import eventlet from eventlet import tpool import futurist from keystoneauth1 import loading as ks_loading @@ -92,12 +91,36 @@ def cooperative_yield(): time.sleep(0) +def destroy_default_green_pool(): + """Closes the executor and resets the global to None to allow forked worker + processes to properly init it. + """ + global DEFAULT_GREEN_POOL + if DEFAULT_GREEN_POOL: + LOG.info( + "The default thread pool %s is shutting down", + DEFAULT_GREEN_POOL.name) + DEFAULT_GREEN_POOL.shutdown() + LOG.info( + "The default thread pool %s is closed", DEFAULT_GREEN_POOL.name) + + DEFAULT_GREEN_POOL = None + + def _get_default_green_pool(): global DEFAULT_GREEN_POOL - if DEFAULT_GREEN_POOL is None: - DEFAULT_GREEN_POOL = eventlet.greenpool.GreenPool( + + if not DEFAULT_GREEN_POOL: + DEFAULT_GREEN_POOL = futurist.GreenThreadPoolExecutor( CONF.default_green_pool_size ) + + pname = multiprocessing.current_process().name + executor_name = f"{pname}.default" + DEFAULT_GREEN_POOL.name = executor_name + + LOG.info("The default thread pool %s is initialized", executor_name) + return DEFAULT_GREEN_POOL @@ -685,7 +708,7 @@ def pass_context(runner, func, *args, **kwargs): return runner(pass_context_wrapper(func), *args, **kwargs) -def spawn(func, *args, **kwargs): +def spawn(func, *args, **kwargs) -> futurist.Future: """Passthrough method for eventlet.spawn. This utility exists so that it can be stubbed for testing without @@ -696,11 +719,12 @@ def spawn(func, *args, **kwargs): context when using this method to spawn a new thread. """ - return pass_context(_get_default_green_pool().spawn, func, *args, **kwargs) + return pass_context( + _get_default_green_pool().submit, func, *args, **kwargs) def spawn_n(func, *args, **kwargs): - """Passthrough method for eventlet.greenpool.spawn_n. + """Passthrough method for eventlet.greenpool.spawn. This utility exists so that it can be stubbed for testing without interfering with the service spawns. @@ -708,9 +732,12 @@ def spawn_n(func, *args, **kwargs): It will also grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. + + Note that this is not different from calling spawn. Both spawn and + spawn_n uses the eventlet.spawn. """ - pass_context(_get_default_green_pool().spawn_n, func, *args, **kwargs) + spawn(func, *args, **kwargs) def tpool_execute(func, *args, **kwargs): diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index c06142f9ba..17e6b3fc76 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -11319,7 +11319,7 @@ class LibvirtDriver(driver.ComputeDriver): disk_paths, device_names = self._live_migration_copy_disk_paths( context, instance, guest) - opthread = utils.spawn(self._live_migration_operation, + future = utils.spawn(self._live_migration_operation, context, instance, dest, block_migration, migrate_data, guest, @@ -11328,11 +11328,12 @@ class LibvirtDriver(driver.ComputeDriver): finish_event = eventlet.event.Event() self.active_migrations[instance.uuid] = deque() - def thread_finished(thread, event): + def thread_finished(_): LOG.debug("Migration operation thread notification", instance=instance) - event.send() - opthread.link(thread_finished, finish_event) + finish_event.send() + + future.add_done_callback(thread_finished) # Let eventlet schedule the new thread right away time.sleep(0)