Enable mypy on nova/utils.py
As a follow up for a review comment in [1] this patch enables mypy for nova/utils, fixes the existing mypy findings, and adds some trivial type annotations where make sense. [1]https://review.opendev.org/c/openstack/nova/+/956089/comment/caec94ed_4fdb16bf/ Change-Id: I29ca69bd1e583adc1b1f408bd45de183649986d2 Signed-off-by: Balazs Gibizer <gibi@redhat.com>
This commit is contained in:
+31
-13
@@ -28,6 +28,7 @@ import re
|
|||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
|
import typing as ty
|
||||||
|
|
||||||
from eventlet import tpool
|
from eventlet import tpool
|
||||||
import futurist
|
import futurist
|
||||||
@@ -75,11 +76,16 @@ SM_SKIP_KEYS = (
|
|||||||
'img_mappings', 'img_block_device_mapping',
|
'img_mappings', 'img_block_device_mapping',
|
||||||
)
|
)
|
||||||
|
|
||||||
_FILE_CACHE = {}
|
_FILE_CACHE: dict[str, dict] = {}
|
||||||
|
|
||||||
_SERVICE_TYPES = service_types.ServiceTypes()
|
_SERVICE_TYPES = service_types.ServiceTypes()
|
||||||
|
|
||||||
DEFAULT_EXECUTOR = None
|
# NOTE(gibi): futurist does not expose the common based class.
|
||||||
|
# NOTE(gibi): we can simplify this when eventlet is removed.
|
||||||
|
Executor: ty.TypeAlias = (
|
||||||
|
futurist.GreenThreadPoolExecutor | futurist.ThreadPoolExecutor)
|
||||||
|
|
||||||
|
DEFAULT_EXECUTOR: Executor | None = None
|
||||||
|
|
||||||
|
|
||||||
def cooperative_yield():
|
def cooperative_yield():
|
||||||
@@ -105,7 +111,7 @@ def destroy_default_executor():
|
|||||||
DEFAULT_EXECUTOR = None
|
DEFAULT_EXECUTOR = None
|
||||||
|
|
||||||
|
|
||||||
def create_executor(max_workers):
|
def create_executor(max_workers) -> Executor:
|
||||||
if concurrency_mode_threading():
|
if concurrency_mode_threading():
|
||||||
executor = futurist.ThreadPoolExecutor(max_workers)
|
executor = futurist.ThreadPoolExecutor(max_workers)
|
||||||
else:
|
else:
|
||||||
@@ -113,7 +119,7 @@ def create_executor(max_workers):
|
|||||||
return executor
|
return executor
|
||||||
|
|
||||||
|
|
||||||
def _get_default_executor():
|
def _get_default_executor() -> Executor:
|
||||||
global DEFAULT_EXECUTOR
|
global DEFAULT_EXECUTOR
|
||||||
|
|
||||||
if not DEFAULT_EXECUTOR:
|
if not DEFAULT_EXECUTOR:
|
||||||
@@ -563,7 +569,9 @@ def _serialize_profile_info():
|
|||||||
return trace_info
|
return trace_info
|
||||||
|
|
||||||
|
|
||||||
def spawn(func, *args, **kwargs) -> futurist.Future:
|
def spawn(
|
||||||
|
func: ty.Callable[..., ty.Any], *args: ty.Any, **kwargs: ty.Any
|
||||||
|
) -> futurist.Future:
|
||||||
"""Passthrough method for eventlet.spawn.
|
"""Passthrough method for eventlet.spawn.
|
||||||
|
|
||||||
This utility exists so that it can be stubbed for testing without
|
This utility exists so that it can be stubbed for testing without
|
||||||
@@ -577,7 +585,11 @@ def spawn(func, *args, **kwargs) -> futurist.Future:
|
|||||||
return spawn_on(_get_default_executor(), func, *args, **kwargs)
|
return spawn_on(_get_default_executor(), func, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future:
|
def spawn_after(
|
||||||
|
seconds: float,
|
||||||
|
func: ty.Callable[..., ty.Any],
|
||||||
|
*args: ty.Any, **kwargs: ty.Any
|
||||||
|
) -> futurist.Future:
|
||||||
"""Executing the function asynchronously after the given time."""
|
"""Executing the function asynchronously after the given time."""
|
||||||
|
|
||||||
def delayed(*args, **kwargs):
|
def delayed(*args, **kwargs):
|
||||||
@@ -587,7 +599,7 @@ def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future:
|
|||||||
return spawn(delayed, *args, **kwargs)
|
return spawn(delayed, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def _executor_is_full(executor):
|
def _executor_is_full(executor: Executor) -> bool:
|
||||||
if concurrency_mode_threading():
|
if concurrency_mode_threading():
|
||||||
# TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor
|
# TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor
|
||||||
# so that we can avoid accessing the internals of the executor
|
# so that we can avoid accessing the internals of the executor
|
||||||
@@ -599,7 +611,11 @@ def _executor_is_full(executor):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def spawn_on(executor, func, *args, **kwargs) -> futurist.Future:
|
def spawn_on(
|
||||||
|
executor: Executor,
|
||||||
|
func: ty.Callable[..., ty.Any],
|
||||||
|
*args: ty.Any, **kwargs: ty.Any,
|
||||||
|
) -> futurist.Future:
|
||||||
"""Passthrough method to run func on a thread in a given executor.
|
"""Passthrough method to run func on a thread in a given executor.
|
||||||
|
|
||||||
It will also grab the context from the threadlocal store and add it to
|
It will also grab the context from the threadlocal store and add it to
|
||||||
@@ -1089,6 +1105,7 @@ def run_once(message, logger, cleanup=None):
|
|||||||
logger and cleanup function will be propagated to the caller.
|
logger and cleanup function will be propagated to the caller.
|
||||||
"""
|
"""
|
||||||
def outer_wrapper(func):
|
def outer_wrapper(func):
|
||||||
|
@ty.no_type_check
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
if not wrapper.called:
|
if not wrapper.called:
|
||||||
@@ -1138,6 +1155,7 @@ def latch_error_on_raise(retryable=(_SentinelException,)):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def outer_wrapper(func):
|
def outer_wrapper(func):
|
||||||
|
@ty.no_type_check
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
if wrapper.error:
|
if wrapper.error:
|
||||||
@@ -1272,10 +1290,10 @@ def concurrency_mode_threading():
|
|||||||
return not monkey_patch.is_patched()
|
return not monkey_patch.is_patched()
|
||||||
|
|
||||||
|
|
||||||
SCATTER_GATHER_EXECUTOR = None
|
SCATTER_GATHER_EXECUTOR: Executor | None = None
|
||||||
|
|
||||||
|
|
||||||
def get_scatter_gather_executor():
|
def get_scatter_gather_executor() -> Executor:
|
||||||
"""Returns the executor used for scatter/gather operations."""
|
"""Returns the executor used for scatter/gather operations."""
|
||||||
global SCATTER_GATHER_EXECUTOR
|
global SCATTER_GATHER_EXECUTOR
|
||||||
|
|
||||||
@@ -1313,10 +1331,10 @@ def destroy_scatter_gather_executor():
|
|||||||
SCATTER_GATHER_EXECUTOR = None
|
SCATTER_GATHER_EXECUTOR = None
|
||||||
|
|
||||||
|
|
||||||
CACHE_IMAGES_EXECUTOR = None
|
CACHE_IMAGES_EXECUTOR: Executor | None = None
|
||||||
|
|
||||||
|
|
||||||
def get_cache_images_executor():
|
def get_cache_images_executor() -> Executor:
|
||||||
"""Returns the executor used for cache images operations."""
|
"""Returns the executor used for cache images operations."""
|
||||||
global CACHE_IMAGES_EXECUTOR
|
global CACHE_IMAGES_EXECUTOR
|
||||||
|
|
||||||
@@ -1350,7 +1368,7 @@ def destroy_cache_images_executor():
|
|||||||
CACHE_IMAGES_EXECUTOR = None
|
CACHE_IMAGES_EXECUTOR = None
|
||||||
|
|
||||||
|
|
||||||
def _log_executor_stats(executor):
|
def _log_executor_stats(executor: Executor) -> None:
|
||||||
if CONF.thread_pool_statistic_period < 0:
|
if CONF.thread_pool_statistic_period < 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@@ -137,6 +137,7 @@ files = [
|
|||||||
"nova/scheduler/client/report.py",
|
"nova/scheduler/client/report.py",
|
||||||
"nova/scheduler/request_filter.py",
|
"nova/scheduler/request_filter.py",
|
||||||
"nova/scheduler/utils.py",
|
"nova/scheduler/utils.py",
|
||||||
|
"nova/utils.py",
|
||||||
"nova/virt/driver.py",
|
"nova/virt/driver.py",
|
||||||
"nova/virt/hardware.py",
|
"nova/virt/hardware.py",
|
||||||
"nova/virt/libvirt/machine_type_utils.py",
|
"nova/virt/libvirt/machine_type_utils.py",
|
||||||
|
|||||||
Reference in New Issue
Block a user