Merge "Add spawn_on"
This commit is contained in:
+3
-3
@@ -430,9 +430,9 @@ def scatter_gather_cells(context, cell_mappings, timeout, fn, *args, **kwargs):
|
||||
|
||||
for cell_mapping in cell_mappings:
|
||||
with target_cell(context, cell_mapping) as cctxt:
|
||||
future = executor.submit(
|
||||
utils.pass_context_wrapper(gather_result),
|
||||
cell_mapping.uuid, fn, cctxt, *args, **kwargs)
|
||||
future = utils.spawn_on(
|
||||
executor,
|
||||
gather_result, cell_mapping.uuid, fn, cctxt, *args, **kwargs)
|
||||
tasks[cell_mapping.uuid] = future
|
||||
|
||||
futurist.waiters.wait_for_all(tasks.values(), timeout)
|
||||
|
||||
Vendored
+13
-26
@@ -1294,37 +1294,24 @@ class IsolatedGreenPoolFixture(fixtures.Fixture):
|
||||
)
|
||||
|
||||
|
||||
class _FakeFuture(object):
|
||||
def __init__(self, func, *args, **kwargs):
|
||||
try:
|
||||
self._result = func(*args, **kwargs)
|
||||
self.raised = False
|
||||
except Exception as e:
|
||||
self.raised = True
|
||||
self._result = e
|
||||
|
||||
def cancel(self, *args, **kwargs):
|
||||
# This method doesn't make sense for a synchronous call, it's just
|
||||
# defined to satisfy the interface.
|
||||
pass
|
||||
|
||||
def add_done_callback(self, func):
|
||||
func(self)
|
||||
|
||||
def result(self):
|
||||
if self.raised:
|
||||
raise self._result
|
||||
|
||||
return self._result
|
||||
|
||||
|
||||
class SpawnIsSynchronousFixture(fixtures.Fixture):
|
||||
"""Patch and restore the spawn_* utility methods to be synchronous"""
|
||||
|
||||
def setUp(self):
|
||||
super(SpawnIsSynchronousFixture, self).setUp()
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'nova.utils.spawn', _FakeFuture))
|
||||
executor = futurist.SynchronousExecutor()
|
||||
self.addCleanup(executor.shutdown)
|
||||
|
||||
def spawn(*args, **kwargs):
|
||||
return executor.submit(*args, **kwargs)
|
||||
|
||||
# Just ignore the first arg that is the original executor instance
|
||||
# and use our test internal synchronous executor.
|
||||
def spawn_on(_, *args, **kwargs):
|
||||
return executor.submit(*args, **kwargs)
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch('nova.utils.spawn', spawn))
|
||||
self.useFixture(fixtures.MonkeyPatch('nova.utils.spawn_on', spawn_on))
|
||||
|
||||
|
||||
class BannedDBSchemaOperations(fixtures.Fixture):
|
||||
|
||||
@@ -17,6 +17,7 @@ import hashlib
|
||||
import os
|
||||
import os.path
|
||||
import tempfile
|
||||
import threading
|
||||
from unittest import mock
|
||||
|
||||
import fixtures
|
||||
@@ -1511,3 +1512,52 @@ class DefaultExecutorTestCase(test.NoDBTestCase):
|
||||
utils.destroy_default_green_pool()
|
||||
self.assertIsNone(utils.DEFAULT_GREEN_POOL)
|
||||
self.assertFalse(executor.alive)
|
||||
|
||||
|
||||
class SpawnOnTestCase(test.NoDBTestCase):
|
||||
def test_spawn_on_submits_work(self):
|
||||
executor = utils.get_scatter_gather_executor()
|
||||
task = mock.MagicMock()
|
||||
|
||||
future = utils.spawn_on(executor, task, 13, foo='bar')
|
||||
future.result()
|
||||
|
||||
task.assert_called_once_with(13, foo='bar')
|
||||
|
||||
@mock.patch.object(
|
||||
utils, 'concurrency_mode_threading', new=mock.Mock(return_value=True))
|
||||
@mock.patch.object(utils.LOG, 'warning')
|
||||
def test_spawn_on_warns_on_full_executor(self, mock_warning):
|
||||
# Ensure we have executor for a single task only at a time
|
||||
self.flags(cell_worker_thread_pool_size=1)
|
||||
executor = utils.get_scatter_gather_executor()
|
||||
|
||||
work = threading.Event()
|
||||
started = threading.Event()
|
||||
|
||||
# let the blocked tasks finish after the test case so that the leaked
|
||||
# thread check is not triggered during cleanup
|
||||
self.addCleanup(work.set)
|
||||
|
||||
def task():
|
||||
started.set()
|
||||
work.wait()
|
||||
|
||||
# Start two tasks that will wait, the first will execute the second
|
||||
# will wait in the queue
|
||||
utils.spawn_on(executor, task)
|
||||
utils.spawn_on(executor, task)
|
||||
# wait for the first task to consume the single executor thread
|
||||
started.wait()
|
||||
# start one more task to trigger the fullness check.
|
||||
utils.spawn_on(executor, task)
|
||||
|
||||
# We expect that spawn_on will warn due to the second task being is
|
||||
# waiting in the queue, and no idle worker thread exists.
|
||||
mock_warning.assert_called_once_with(
|
||||
'The %s pool does not have free threads so the task %s will be '
|
||||
'queued. If this happens repeatedly then the size of the pool is '
|
||||
'too small for the load or there are stuck threads filling the '
|
||||
'pool.',
|
||||
'nova.tests.unit.test_utils.SpawnOnTestCase.'
|
||||
'test_spawn_on_warns_on_full_executor.cell_worker', task)
|
||||
|
||||
+32
-4
@@ -675,7 +675,7 @@ def _serialize_profile_info():
|
||||
return trace_info
|
||||
|
||||
|
||||
def pass_context_wrapper(func):
|
||||
def _pass_context_wrapper(func):
|
||||
"""Generalised passthrough method
|
||||
It will grab the context from the threadlocal store and add it to
|
||||
the store on the new thread. This allows for continuity in logging the
|
||||
@@ -705,7 +705,7 @@ def pass_context(runner, func, *args, **kwargs):
|
||||
runner function
|
||||
"""
|
||||
|
||||
return runner(pass_context_wrapper(func), *args, **kwargs)
|
||||
return runner(_pass_context_wrapper(func), *args, **kwargs)
|
||||
|
||||
|
||||
def spawn(func, *args, **kwargs) -> futurist.Future:
|
||||
@@ -719,8 +719,36 @@ def spawn(func, *args, **kwargs) -> futurist.Future:
|
||||
context when using this method to spawn a new thread.
|
||||
"""
|
||||
|
||||
return pass_context(
|
||||
_get_default_green_pool().submit, func, *args, **kwargs)
|
||||
return spawn_on(_get_default_green_pool(), func, *args, **kwargs)
|
||||
|
||||
|
||||
def _executor_is_full(executor):
|
||||
if concurrency_mode_threading():
|
||||
# TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor
|
||||
# so that we can avoid accessing the internals of the executor
|
||||
with executor._shutdown_lock:
|
||||
idle_workers = len([w for w in executor._workers if w.idle]) > 0
|
||||
queued_tasks = executor._work_queue.qsize() > 0
|
||||
return queued_tasks and not idle_workers
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def spawn_on(executor, func, *args, **kwargs) -> futurist.Future:
|
||||
"""Passthrough method to run func on a thread in a given executor.
|
||||
|
||||
It will also grab the context from the threadlocal store and add it to
|
||||
the store on the new thread. This allows for continuity in logging the
|
||||
context when using this method to spawn a new thread.
|
||||
"""
|
||||
|
||||
if _executor_is_full(executor):
|
||||
LOG.warning(
|
||||
"The %s pool does not have free threads so the task %s will be "
|
||||
"queued. If this happens repeatedly then the size of the pool is "
|
||||
"too small for the load or there are stuck threads filling the "
|
||||
"pool.", executor.name, func)
|
||||
return pass_context(executor.submit, func, *args, **kwargs)
|
||||
|
||||
|
||||
def tpool_execute(func, *args, **kwargs):
|
||||
|
||||
Reference in New Issue
Block a user