Merge "Use futurist for _get_default_green_pool()"

This commit is contained in:
Zuul
2025-07-03 17:11:12 +00:00
committed by Gerrit Code Review
8 changed files with 109 additions and 83 deletions
+1
View File
@@ -58,6 +58,7 @@ def main():
# state (i.e. number of workers idle in the pool). A long therm solution
# would be to use os.spawn instead of os.fork for the workers.
utils.destroy_scatter_gather_executor()
utils.destroy_default_green_pool()
service.serve(server, workers=workers)
service.wait()
+4 -4
View File
@@ -577,7 +577,7 @@ class NetworkInfoAsyncWrapper(NetworkInfo):
def __init__(self, async_method, *args, **kwargs):
super(NetworkInfoAsyncWrapper, self).__init__()
self._gt = utils.spawn(async_method, *args, **kwargs)
self._future = utils.spawn(async_method, *args, **kwargs)
methods = ['json', 'fixed_ips', 'floating_ips']
for method in methods:
fn = getattr(self, method)
@@ -612,16 +612,16 @@ class NetworkInfoAsyncWrapper(NetworkInfo):
def wait(self, do_raise=True):
"""Wait for asynchronous call to finish."""
if self._gt is not None:
if self._future is not None:
try:
# NOTE(comstud): This looks funky, but this object is
# subclassed from list. In other words, 'self' is really
# just a list with a bunch of extra methods. So this
# line just replaces the current list (which should be
# empty) with the result.
self[:] = self._gt.wait()
self[:] = self._future.result()
except Exception:
if do_raise:
raise
finally:
self._gt = None
self._future = None
+17 -32
View File
@@ -1208,6 +1208,7 @@ class IsolatedGreenPoolFixture(fixtures.Fixture):
def _get_default_green_pool():
self.greenpool = origi_default_green_pool()
self.greenpool.name = f"{self.test_case_id}.default"
return self.greenpool
# NOTE(sean-k-mooney): greenpools use eventlet.spawn and
# eventlet.spawn_n so we can't stub out all calls to those functions.
@@ -1216,7 +1217,7 @@ class IsolatedGreenPoolFixture(fixtures.Fixture):
# Greenthreads created via the standard lib threading module.
self.useFixture(fixtures.MonkeyPatch(
'nova.utils._get_default_green_pool', _get_default_green_pool))
self.addCleanup(self.do_cleanup_default)
self.addCleanup(lambda: self.do_cleanup_executor(self.greenpool))
def _get_scatter_gather_executor():
self.scatter_gather_executor = origi_get_scatter_gather()
@@ -1228,11 +1229,16 @@ class IsolatedGreenPoolFixture(fixtures.Fixture):
'nova.utils.get_scatter_gather_executor',
_get_scatter_gather_executor))
self.addCleanup(self.do_cleanup_scatter_gather)
self.addCleanup(
lambda: self.do_cleanup_executor(self.scatter_gather_executor))
def do_cleanup_scatter_gather(self):
self.addCleanup(self.reset_globals)
def reset_globals(self):
utils.SCATTER_GATHER_EXECUTOR = None
executor = self.scatter_gather_executor
utils.DEFAULT_GREEN_POOL = None
def do_cleanup_executor(self, executor):
# NOTE(gibi): we cannot rely on utils.concurrency_mode_threading
# as that might have been mocked during the test when the executor
# was created, but during cleanup the mock is already removed.
@@ -1287,19 +1293,8 @@ class IsolatedGreenPoolFixture(fixtures.Fixture):
'and therefore are not expected to return or raise.'
)
def do_cleanup_default(self):
if self.greenpool and self.greenpool.running:
# kill all greenthreads in the pool before raising to prevent
# them from interfering with other tests.
for gt in list(self.greenpool.coroutines_running):
if isinstance(gt, eventlet.greenthread.GreenThread):
gt.kill()
# reset the global greenpool just in case.
utils.DEFAULT_GREEN_POOL = None
self._raise_on_green_pool(self.greenpool)
class _FakeGreenThread(object):
class _FakeFuture(object):
def __init__(self, func, *args, **kwargs):
try:
self._result = func(*args, **kwargs)
@@ -1313,20 +1308,10 @@ class _FakeGreenThread(object):
# defined to satisfy the interface.
pass
def kill(self, *args, **kwargs):
# This method doesn't make sense for a synchronous call, it's just
# defined to satisfy the interface.
pass
def add_done_callback(self, func):
func(self)
def link(self, func, *args, **kwargs):
func(self, *args, **kwargs)
def unlink(self, func, *args, **kwargs):
# This method doesn't make sense for a synchronous call, it's just
# defined to satisfy the interface.
pass
def wait(self):
def result(self):
if self.raised:
raise self._result
@@ -1334,14 +1319,14 @@ class _FakeGreenThread(object):
class SpawnIsSynchronousFixture(fixtures.Fixture):
"""Patch and restore the spawn_n utility method to be synchronous"""
"""Patch and restore the spawn_* utility methods to be synchronous"""
def setUp(self):
super(SpawnIsSynchronousFixture, self).setUp()
self.useFixture(fixtures.MonkeyPatch(
'nova.utils.spawn_n', _FakeGreenThread))
'nova.utils.spawn_n', _FakeFuture))
self.useFixture(fixtures.MonkeyPatch(
'nova.utils.spawn', _FakeGreenThread))
'nova.utils.spawn', _FakeFuture))
class BannedDBSchemaOperations(fixtures.Fixture):
+4 -1
View File
@@ -55,7 +55,8 @@ class TestScheduler(test.NoDBTestCase):
self, mock_wait, mock_serve, service_create
):
# simulate that the thread pool is initialized before the fork
executor = utils.get_scatter_gather_executor()
executor = utils._get_default_green_pool()
sc_executor = utils.get_scatter_gather_executor()
scheduler.main()
mock_serve.assert_called_once_with(
@@ -63,4 +64,6 @@ class TestScheduler(test.NoDBTestCase):
mock_wait.assert_called_once_with()
# check that the executor was properly destroyed
self.assertFalse(executor.alive)
self.assertIsNone(utils.DEFAULT_GREEN_POOL)
self.assertFalse(sc_executor.alive)
self.assertIsNone(utils.SCATTER_GATHER_EXECUTOR)
+8 -30
View File
@@ -290,44 +290,22 @@ class TestSpawnIsSynchronousFixture(testtools.TestCase):
utils.spawn_n(tester.function, 'foo', bar='bar')
tester.function.assert_called_once_with('foo', bar='bar')
def test_spawn_return_has_wait(self):
def test_spawn_return_has_result(self):
self.useFixture(fixtures.SpawnIsSynchronousFixture())
gt = utils.spawn(lambda x: '%s' % x, 'foo')
foo = gt.wait()
future = utils.spawn(lambda x: '%s' % x, 'foo')
foo = future.result()
self.assertEqual('foo', foo)
def test_spawn_n_return_has_wait(self):
def test_spawn_callback(self):
self.useFixture(fixtures.SpawnIsSynchronousFixture())
gt = utils.spawn_n(lambda x: '%s' % x, 'foo')
foo = gt.wait()
self.assertEqual('foo', foo)
def test_spawn_has_link(self):
self.useFixture(fixtures.SpawnIsSynchronousFixture())
gt = utils.spawn(mock.MagicMock)
passed_arg = 'test'
future = utils.spawn(mock.MagicMock)
call_count = []
def fake(thread, param):
self.assertEqual(gt, thread)
self.assertEqual(passed_arg, param)
def fake(thread):
self.assertEqual(future, thread)
call_count.append(1)
gt.link(fake, passed_arg)
self.assertEqual(1, len(call_count))
def test_spawn_n_has_link(self):
self.useFixture(fixtures.SpawnIsSynchronousFixture())
gt = utils.spawn_n(mock.MagicMock)
passed_arg = 'test'
call_count = []
def fake(thread, param):
self.assertEqual(gt, thread)
self.assertEqual(passed_arg, param)
call_count.append(1)
gt.link(fake, passed_arg)
future.add_done_callback(fake)
self.assertEqual(1, len(call_count))
+36 -5
View File
@@ -772,7 +772,7 @@ class SpawnNTestCase(test.NoDBTestCase):
def fake(arg):
pass
pool = utils._get_default_green_pool()
with mock.patch.object(pool, self.spawn_name, _fake_spawn):
with mock.patch.object(pool, "submit", _fake_spawn):
getattr(utils, self.spawn_name)(fake, 'test')
self.assertIsNone(common_context.get_current())
@@ -790,7 +790,7 @@ class SpawnNTestCase(test.NoDBTestCase):
pass
pool = utils._get_default_green_pool()
with mock.patch.object(pool, self.spawn_name, _fake_spawn):
with mock.patch.object(pool, "submit", _fake_spawn):
getattr(utils, self.spawn_name)(fake, ctxt, kwarg1='test')
self.assertEqual(ctxt, common_context.get_current())
@@ -811,7 +811,7 @@ class SpawnNTestCase(test.NoDBTestCase):
pass
pool = utils._get_default_green_pool()
with mock.patch.object(pool, self.spawn_name, _fake_spawn):
with mock.patch.object(pool, "submit", _fake_spawn):
getattr(utils, self.spawn_name)(fake, ctxt_passed, kwarg1='test')
self.assertEqual(ctxt, common_context.get_current())
@@ -1458,8 +1458,11 @@ class LatchErrorOnRaiseTests(test.NoDBTestCase):
class ScatterGatherExecutorTestCase(test.NoDBTestCase):
def test_executor_is_named(self):
executor = utils.get_scatter_gather_executor()
# NOTE(gibi): during test we use a test-case-specific name, outside
# of test we use process name specific name instead.
# NOTE(gibi): The executor is name both in normal run and in the test
# env. During testing we use a test-case-specific name, outside
# of test we use process name specific name instead. The test case
# specific name is added to help troubleshooting leaked executors
# between test case.
self.assertRegex(executor.name,
"nova.tests.unit.test_utils.ScatterGatherExecutor.*"
"test_executor_is_named.cell_worker")
@@ -1487,3 +1490,31 @@ class ScatterGatherExecutorTestCase(test.NoDBTestCase):
utils.destroy_scatter_gather_executor()
self.assertIsNone(utils.SCATTER_GATHER_EXECUTOR)
self.assertFalse(executor.alive)
class DefaultExecutorTestCase(test.NoDBTestCase):
def test_executor_is_named(self):
executor = utils._get_default_green_pool()
# NOTE(gibi): The executor is name both in normal run and in the test
# env. During testing we use a test-case-specific name, outside
# of test we use process name specific name instead. The test case
# specific name is added to help troubleshooting leaked executors
# between test case.
self.assertRegex(executor.name,
"nova.tests.unit.test_utils.DefaultExecutor.*"
"test_executor_is_named.default")
@mock.patch.object(
utils, 'concurrency_mode_threading', new=mock.Mock(return_value=False))
def test_executor_type_eventlet(self):
executor = utils._get_default_green_pool()
self.assertEqual('GreenThreadPoolExecutor', type(executor).__name__)
def test_executor_destroy(self):
executor = utils._get_default_green_pool()
self.assertIsNotNone(utils.DEFAULT_GREEN_POOL)
utils.destroy_default_green_pool()
self.assertIsNone(utils.DEFAULT_GREEN_POOL)
self.assertFalse(executor.alive)
+34 -7
View File
@@ -29,7 +29,6 @@ import shutil
import tempfile
import time
import eventlet
from eventlet import tpool
import futurist
from keystoneauth1 import loading as ks_loading
@@ -92,12 +91,36 @@ def cooperative_yield():
time.sleep(0)
def destroy_default_green_pool():
"""Closes the executor and resets the global to None to allow forked worker
processes to properly init it.
"""
global DEFAULT_GREEN_POOL
if DEFAULT_GREEN_POOL:
LOG.info(
"The default thread pool %s is shutting down",
DEFAULT_GREEN_POOL.name)
DEFAULT_GREEN_POOL.shutdown()
LOG.info(
"The default thread pool %s is closed", DEFAULT_GREEN_POOL.name)
DEFAULT_GREEN_POOL = None
def _get_default_green_pool():
global DEFAULT_GREEN_POOL
if DEFAULT_GREEN_POOL is None:
DEFAULT_GREEN_POOL = eventlet.greenpool.GreenPool(
if not DEFAULT_GREEN_POOL:
DEFAULT_GREEN_POOL = futurist.GreenThreadPoolExecutor(
CONF.default_green_pool_size
)
pname = multiprocessing.current_process().name
executor_name = f"{pname}.default"
DEFAULT_GREEN_POOL.name = executor_name
LOG.info("The default thread pool %s is initialized", executor_name)
return DEFAULT_GREEN_POOL
@@ -685,7 +708,7 @@ def pass_context(runner, func, *args, **kwargs):
return runner(pass_context_wrapper(func), *args, **kwargs)
def spawn(func, *args, **kwargs):
def spawn(func, *args, **kwargs) -> futurist.Future:
"""Passthrough method for eventlet.spawn.
This utility exists so that it can be stubbed for testing without
@@ -696,11 +719,12 @@ def spawn(func, *args, **kwargs):
context when using this method to spawn a new thread.
"""
return pass_context(_get_default_green_pool().spawn, func, *args, **kwargs)
return pass_context(
_get_default_green_pool().submit, func, *args, **kwargs)
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.greenpool.spawn_n.
"""Passthrough method for eventlet.greenpool.spawn.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
@@ -708,9 +732,12 @@ def spawn_n(func, *args, **kwargs):
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
Note that this is not different from calling spawn. Both spawn and
spawn_n uses the eventlet.spawn.
"""
pass_context(_get_default_green_pool().spawn_n, func, *args, **kwargs)
spawn(func, *args, **kwargs)
def tpool_execute(func, *args, **kwargs):
+5 -4
View File
@@ -11319,7 +11319,7 @@ class LibvirtDriver(driver.ComputeDriver):
disk_paths, device_names = self._live_migration_copy_disk_paths(
context, instance, guest)
opthread = utils.spawn(self._live_migration_operation,
future = utils.spawn(self._live_migration_operation,
context, instance, dest,
block_migration,
migrate_data, guest,
@@ -11328,11 +11328,12 @@ class LibvirtDriver(driver.ComputeDriver):
finish_event = eventlet.event.Event()
self.active_migrations[instance.uuid] = deque()
def thread_finished(thread, event):
def thread_finished(_):
LOG.debug("Migration operation thread notification",
instance=instance)
event.send()
opthread.link(thread_finished, finish_event)
finish_event.send()
future.add_done_callback(thread_finished)
# Let eventlet schedule the new thread right away
time.sleep(0)