Merge "Print ThreadPool statistics"
This commit is contained in:
@@ -93,6 +93,19 @@ The number of tasks that can run concurrently, one for each cell, for
|
||||
operations requires cross cell data gathering a.k.a scatter-gather, like
|
||||
listing instances across multiple cells. This is only used if the service is
|
||||
running in native thread mode.
|
||||
'''),
|
||||
cfg.IntOpt(
|
||||
'thread_pool_statistic_period',
|
||||
default=-1,
|
||||
min=-1,
|
||||
help='''
|
||||
When new work is submitted to any of the thread pools nova logs the
|
||||
statistics of the pool (work executed, threads available, work queued, etc).
|
||||
This parameter defines how frequently such logging happens from a specific
|
||||
pool in seconds. A value of 60 means that statistic will be logged
|
||||
from a pool maximum once every 60 seconds. The value 0 means that logging
|
||||
happens every time work is submitted to the pool. The value -1 means the
|
||||
logging is disabled.
|
||||
'''),
|
||||
]
|
||||
|
||||
|
||||
@@ -1572,3 +1572,114 @@ class SpawnOnTestCase(test.NoDBTestCase):
|
||||
'pool.',
|
||||
'nova.tests.unit.test_utils.SpawnOnTestCase.'
|
||||
'test_spawn_on_warns_on_full_executor.cell_worker', task)
|
||||
|
||||
|
||||
class ExecutorStatsTestCase(test.NoDBTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.work = threading.Event()
|
||||
|
||||
def _task_finishes(self):
|
||||
return
|
||||
|
||||
def _task_fails(self):
|
||||
raise ValueError()
|
||||
|
||||
def _task_running(self):
|
||||
self.work.wait()
|
||||
|
||||
@mock.patch.object(
|
||||
utils, 'concurrency_mode_threading', new=mock.Mock(return_value=False))
|
||||
@mock.patch.object(utils.LOG, 'debug')
|
||||
def test_stats_logged_eventlet(self, mock_debug):
|
||||
# ensure that each task submission triggers stats printing
|
||||
self.flags(thread_pool_statistic_period=0)
|
||||
|
||||
utils.spawn(self._task_finishes).result()
|
||||
utils.spawn(self._task_fails).exception()
|
||||
running = utils.spawn(self._task_running)
|
||||
|
||||
# avoid having a hanging thread leaking from the test case
|
||||
def cleanup():
|
||||
self.work.set()
|
||||
running.result()
|
||||
|
||||
self.addCleanup(cleanup)
|
||||
|
||||
# The stats are printed *before* the work is submitted so we need an
|
||||
# extra task submitted to get the stats from the above task.
|
||||
utils.spawn(self._task_finishes).result()
|
||||
print(mock_debug.mock_calls)
|
||||
|
||||
args = mock_debug.mock_calls[3][1]
|
||||
self.assertEqual(
|
||||
('State of %s GreenThreadPoolExecutor when submitting a new task: '
|
||||
'workers: %d, max_workers: %d, work queued length: %d, stats: %s',
|
||||
'nova.tests.unit.test_utils.ExecutorStatsTestCase.'
|
||||
'test_stats_logged_eventlet.default', 1, 1000, 0),
|
||||
args[0:5])
|
||||
stats = args[5]
|
||||
self.assertEqual(1, stats.failures)
|
||||
self.assertEqual(2, stats.executed)
|
||||
self.assertEqual(0, stats.cancelled)
|
||||
|
||||
@mock.patch.object(
|
||||
utils, 'concurrency_mode_threading', new=mock.Mock(return_value=True))
|
||||
@mock.patch.object(utils.LOG, 'debug')
|
||||
def test_stats_logged_threading(self, mock_debug):
|
||||
# ensure that each task submission triggers stats printing
|
||||
self.flags(thread_pool_statistic_period=0)
|
||||
# make the tasks sequential to help simulating queued task
|
||||
self.flags(default_thread_pool_size=1)
|
||||
|
||||
utils.spawn(self._task_finishes).result()
|
||||
utils.spawn(self._task_fails).exception()
|
||||
running = utils.spawn(self._task_running)
|
||||
|
||||
# avoid having a hanging thread leaking from the test case
|
||||
def cleanup():
|
||||
self.work.set()
|
||||
running.result()
|
||||
|
||||
self.addCleanup(cleanup)
|
||||
|
||||
# this will be queued as the only worker thread is held up by the
|
||||
# running task
|
||||
utils.spawn(self._task_finishes)
|
||||
# this is also queued so we can cancel it to dequeue it
|
||||
utils.spawn(self._task_finishes).cancel()
|
||||
# The stats are printed *before* the work is submitted so we need an
|
||||
# extra task submitted to get the stats from the above task.
|
||||
utils.spawn(self._task_finishes)
|
||||
|
||||
args = mock_debug.mock_calls[5][1]
|
||||
self.assertEqual(
|
||||
('State of %s ThreadPoolExecutor when submitting a new task: '
|
||||
'max_workers: %d, workers: %d, idle workers: %d, queued work: %d,'
|
||||
' stats: %s',
|
||||
'nova.tests.unit.test_utils.ExecutorStatsTestCase.'
|
||||
'test_stats_logged_threading.default',
|
||||
1, 1, 1, 3),
|
||||
args[0:6])
|
||||
stats = args[6]
|
||||
self.assertEqual(1, stats.failures)
|
||||
self.assertEqual(2, stats.executed)
|
||||
self.assertEqual(1, stats.cancelled)
|
||||
|
||||
@mock.patch.object(utils.LOG, 'debug')
|
||||
def test_stats_skipped_if_too_frequent(self, mock_debug):
|
||||
self.flags(thread_pool_statistic_period=10)
|
||||
utils.spawn(self._task_finishes).result()
|
||||
mock_debug.assert_called()
|
||||
mock_debug.reset_mock()
|
||||
|
||||
utils.spawn(self._task_finishes).result()
|
||||
mock_debug.assert_not_called()
|
||||
|
||||
@mock.patch.object(utils.LOG, 'debug')
|
||||
def test_stats_skipped_disabled(self, mock_info):
|
||||
self.flags(thread_pool_statistic_period=-1)
|
||||
|
||||
utils.spawn(self._task_finishes).result()
|
||||
mock_info.assert_not_called()
|
||||
|
||||
+34
-1
@@ -737,7 +737,7 @@ def spawn_on(executor, func, *args, **kwargs) -> futurist.Future:
|
||||
the store on the new thread. This allows for continuity in logging the
|
||||
context when using this method to spawn a new thread.
|
||||
"""
|
||||
|
||||
_log_executor_stats(executor)
|
||||
if _executor_is_full(executor):
|
||||
LOG.warning(
|
||||
"The %s pool does not have free threads so the task %s will be "
|
||||
@@ -1336,3 +1336,36 @@ def destroy_scatter_gather_executor():
|
||||
SCATTER_GATHER_EXECUTOR.name)
|
||||
|
||||
SCATTER_GATHER_EXECUTOR = None
|
||||
|
||||
|
||||
def _log_executor_stats(executor):
|
||||
if CONF.thread_pool_statistic_period < 0:
|
||||
return
|
||||
|
||||
last_stats = getattr(executor, "last_stats", None)
|
||||
name = getattr(executor, "name", "unknown")
|
||||
|
||||
allowed_stat_age = time.monotonic() - CONF.thread_pool_statistic_period
|
||||
if last_stats and last_stats > allowed_stat_age:
|
||||
return
|
||||
|
||||
executor.last_stats = time.monotonic()
|
||||
|
||||
stats: futurist.ExecutorStatistics = executor.statistics
|
||||
|
||||
if isinstance(executor, futurist.ThreadPoolExecutor):
|
||||
LOG.debug(
|
||||
"State of %s ThreadPoolExecutor when submitting a new task: "
|
||||
"max_workers: %d, workers: %d, idle workers: %d, queued work: %d, "
|
||||
"stats: %s",
|
||||
name,
|
||||
executor._max_workers, len(executor._workers),
|
||||
len([w for w in executor._workers if w.idle]),
|
||||
executor._work_queue.qsize(), stats)
|
||||
elif isinstance(executor, futurist.GreenThreadPoolExecutor):
|
||||
LOG.debug(
|
||||
"State of %s GreenThreadPoolExecutor when submitting a new task: "
|
||||
"workers: %d, max_workers: %d, work queued length: %d, stats: %s",
|
||||
name,
|
||||
len(executor._pool.coroutines_running), executor._pool.size,
|
||||
executor._delayed_work.unfinished_tasks, stats)
|
||||
|
||||
Reference in New Issue
Block a user