diff --git a/doc/ext/extra_specs.py b/doc/ext/extra_specs.py index ddd233d503..15405afa6b 100644 --- a/doc/ext/extra_specs.py +++ b/doc/ext/extra_specs.py @@ -18,7 +18,6 @@ Provides a single directive that can be used to list all extra specs validators and, thus, document all extra specs that nova recognizes and supports. """ -import typing as ty from docutils import nodes from docutils.parsers import rst @@ -90,7 +89,7 @@ def _indent(text, count=1): def _format_validator_group_help( - validators: ty.Dict[str, base.ExtraSpecValidator], + validators: dict[str, base.ExtraSpecValidator], summary: bool, ): """Generate reStructuredText snippets for a group of validators.""" diff --git a/nova/api/openstack/wsgi.py b/nova/api/openstack/wsgi.py index cf91492979..53b3888fa3 100644 --- a/nova/api/openstack/wsgi.py +++ b/nova/api/openstack/wsgi.py @@ -15,7 +15,6 @@ # under the License. import functools -import typing as ty import microversion_parse from oslo_log import log as logging @@ -228,10 +227,10 @@ class WSGICodes: """ def __init__(self) -> None: - self._codes: list[tuple[int, ty.Optional[str], ty.Optional[str]]] = [] + self._codes: list[tuple[int, str | None, str | None]] = [] def add_code( - self, code: tuple[int, ty.Optional[str], ty.Optional[str]] + self, code: tuple[int, str | None, str | None] ) -> None: self._codes.append(code) @@ -251,8 +250,8 @@ class WSGICodes: def response( code: int, - min_version: ty.Optional[str] = None, - max_version: ty.Optional[str] = None, + min_version: str | None = None, + max_version: str | None = None, ): """Attaches response code to a method. @@ -697,8 +696,8 @@ def removed(version: str, reason: str): def api_version( - min_version: ty.Optional[str] = None, - max_version: ty.Optional[str] = None, + min_version: str | None = None, + max_version: str | None = None, ): """Mark an API as supporting lower and upper version bounds. @@ -737,9 +736,9 @@ def api_version( def expected_errors( - errors: ty.Union[int, tuple[int, ...]], - min_version: ty.Optional[str] = None, - max_version: ty.Optional[str] = None, + errors: int | tuple[int, ...], + min_version: str | None = None, + max_version: str | None = None, ): """Decorator for v2.1 API methods which specifies expected exceptions. diff --git a/nova/api/validation/__init__.py b/nova/api/validation/__init__.py index 27afdbeab8..6bf804f267 100644 --- a/nova/api/validation/__init__.py +++ b/nova/api/validation/__init__.py @@ -58,8 +58,8 @@ class Schemas: def add_schema( self, schema: tuple[dict[str, object]], - min_version: ty.Optional[str], - max_version: ty.Optional[str], + min_version: str | None, + max_version: str | None, ) -> None: # we'd like to use bisect.insort but that doesn't accept a 'key' arg # until Python 3.10, so we need to sort after insertion instead :( @@ -76,9 +76,7 @@ class Schemas: def validate_schemas(self) -> None: """Ensure there are no overlapping schemas.""" - prev_max_version: ty.Optional[ - api_version_request.APIVersionRequest - ] = None + prev_max_version: api_version_request.APIVersionRequest | None = None for schema, min_version, max_version in self._schemas: if prev_max_version: @@ -91,7 +89,7 @@ class Schemas: prev_max_version = max_version - def __call__(self, req: wsgi.Request) -> ty.Optional[dict[str, object]]: + def __call__(self, req: wsgi.Request) -> dict[str, object] | None: ver = req.api_version_request for schema, min_version, max_version in self._schemas: @@ -187,9 +185,9 @@ def _schema_validation_helper( # response headers. As things stand, we're going to need five separate # decorators. def schema( - request_body_schema: ty.Dict[str, ty.Any], - min_version: ty.Optional[str] = None, - max_version: ty.Optional[str] = None, + request_body_schema: dict[str, ty.Any], + min_version: str | None = None, + max_version: str | None = None, ): """Register a schema to validate request body. @@ -229,9 +227,9 @@ def schema( def response_body_schema( - response_body_schema: ty.Dict[str, ty.Any], - min_version: ty.Optional[str] = None, - max_version: ty.Optional[str] = None, + response_body_schema: dict[str, ty.Any], + min_version: str | None = None, + max_version: str | None = None, ): """Register a schema to validate response body. diff --git a/nova/api/validation/extra_specs/base.py b/nova/api/validation/extra_specs/base.py index 2597070127..c9118b5094 100644 --- a/nova/api/validation/extra_specs/base.py +++ b/nova/api/validation/extra_specs/base.py @@ -25,9 +25,9 @@ from nova import exception class ExtraSpecValidator: name: str description: str - value: ty.Dict[str, ty.Any] + value: dict[str, ty.Any] deprecated: bool = False - parameters: ty.List[ty.Dict[str, ty.Any]] = dataclasses.field( + parameters: list[dict[str, ty.Any]] = dataclasses.field( default_factory=list ) diff --git a/nova/api/validation/extra_specs/validators.py b/nova/api/validation/extra_specs/validators.py index 2163892d71..361d35b108 100644 --- a/nova/api/validation/extra_specs/validators.py +++ b/nova/api/validation/extra_specs/validators.py @@ -15,7 +15,6 @@ """Validators for all extra specs known by nova.""" import re -import typing as ty from oslo_log import log as logging from stevedore import extension @@ -25,8 +24,8 @@ from nova import exception LOG = logging.getLogger(__name__) -VALIDATORS: ty.Dict[str, base.ExtraSpecValidator] = {} -NAMESPACES: ty.Set[str] = set() +VALIDATORS: dict[str, base.ExtraSpecValidator] = {} +NAMESPACES: set[str] = set() def validate(name: str, value: str): diff --git a/nova/cmd/manage.py b/nova/cmd/manage.py index 0c21ac7605..0187423036 100644 --- a/nova/cmd/manage.py +++ b/nova/cmd/manage.py @@ -758,8 +758,13 @@ class CellV2Commands(object): return CONF.database.connection return database_connection - def _non_unique_transport_url_database_connection_checker(self, ctxt, - cell_mapping, transport_url, database_connection): + def _non_unique_transport_url_database_connection_checker( + self, + ctxt: context.RequestContext, + cell_mapping: 'objects.CellMapping | None', + transport_url: str | None, + database_connection: str | None, + ) -> bool: for cell in objects.CellMappingList.get_all(ctxt): if cell_mapping and cell.uuid == cell_mapping.uuid: # If we're looking for a specific cell, then don't check @@ -1585,9 +1590,9 @@ class PlacementCommands(object): @staticmethod def _get_resource_request_from_ports( ctxt: context.RequestContext, - ports: ty.List[ty.Dict[str, ty.Any]] - ) -> ty.Tuple[ - ty.Dict[str, ty.List['objects.RequestGroup']], + ports: list[dict[str, ty.Any]] + ) -> tuple[ + dict[str, list['objects.RequestGroup']], 'objects.RequestLevelParams']: """Collect RequestGroups and RequestLevelParams for all ports @@ -1634,10 +1639,10 @@ class PlacementCommands(object): def _get_port_binding_profile_allocation( ctxt: context.RequestContext, neutron: neutron_api.ClientWrapper, - port: ty.Dict[str, ty.Any], - request_groups: ty.List['objects.RequestGroup'], - resource_provider_mapping: ty.Dict[str, ty.List[str]], - ) -> ty.Dict[str, str]: + port: dict[str, ty.Any], + request_groups: list['objects.RequestGroup'], + resource_provider_mapping: dict[str, list[str]], + ) -> dict[str, str]: """Generate the value of the allocation key of the port binding profile based on the provider mapping returned from placement diff --git a/nova/compute/api.py b/nova/compute/api.py index a291610246..c2f51dd2bc 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -20,6 +20,7 @@ networking and storage of VMs, and compute hosts on which they run).""" import collections +from collections.abc import Mapping import functools import re import typing as ty @@ -1629,7 +1630,7 @@ class API: def _update_ephemeral_encryption_bdms( self, flavor: 'objects.Flavor', - image_meta_dict: ty.Dict[str, ty.Any], + image_meta_dict: dict[str, ty.Any], block_device_mapping: 'objects.BlockDeviceMappingList', ) -> None: """Update local BlockDeviceMappings when ephemeral encryption requested @@ -5030,7 +5031,7 @@ class API: self, context: nova_context.RequestContext, instance: objects.Instance, - volume: ty.Mapping[str, ty.Any], + volume: Mapping[str, ty.Any], ): """Avoid duplicate volume attachments. diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 9961838e45..5d43ce370d 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -26,6 +26,7 @@ terminating it. import base64 import binascii +from collections.abc import Callable, Iterator import contextlib import copy import functools @@ -35,7 +36,6 @@ import sys import threading import time import traceback -import typing as ty from cinderclient import exceptions as cinder_exception from cursive import exception as cursive_exception @@ -263,12 +263,12 @@ class ThreadingEventWithResult(threading.Event): # Each collection of events is a dict of eventlet Events keyed by a tuple of # event name and associated tag -_InstanceEvents = ty.Dict[ty.Tuple[str, str], ThreadingEventWithResult] +_InstanceEvents = dict[tuple[str, str], ThreadingEventWithResult] class InstanceEvents(object): def __init__(self): - self._events: ty.Optional[ty.Dict[str, _InstanceEvents]] = {} + self._events: dict[str, _InstanceEvents] | None = {} @staticmethod def _lock_name(instance) -> str: @@ -483,7 +483,7 @@ class ComputeVirtAPI(virtapi.VirtAPI): def _wait_for_instance_events( instance: 'objects.Instance', events: dict, - error_callback: ty.Callable, + error_callback: Callable[[str, 'objects.Instance'], bool], timeout: int, ) -> None: deadline = time.monotonic() + timeout @@ -723,7 +723,7 @@ class ComputeManager(manager.Manager): self.host, self.driver, reportclient=self.reportclient) @contextlib.contextmanager - def syncs_in_progress(self) -> ty.Iterator[set[str]]: + def syncs_in_progress(self) -> Iterator[set[str]]: with self._syncs_in_progress_lock: yield self._syncs_in_progress @@ -3349,7 +3349,7 @@ class ComputeManager(manager.Manager): raise original_exception def _get_multiattach_volume_lock_names_bdms( - self, bdms: objects.BlockDeviceMappingList) -> ty.List[str]: + self, bdms: objects.BlockDeviceMappingList) -> list[str]: """Get the lock names for multiattach volumes. :param bdms: BlockDeviceMappingList object @@ -8639,7 +8639,7 @@ class ComputeManager(manager.Manager): context: nova.context.RequestContext, instance: 'objects.Instance', port_id: str, - port_allocation: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]], + port_allocation: dict[str, dict[str, dict[str, int]]], ) -> None: if not port_allocation: @@ -8691,7 +8691,7 @@ class ComputeManager(manager.Manager): context: nova.context.RequestContext, instance: 'objects.Instance', pci_reqs: 'objects.InstancePCIRequests', - ) -> ty.Optional['objects.PciDevice']: + ) -> 'objects.PciDevice | None': """Claim PCI devices if there are PCI requests :param context: nova.context.RequestContext @@ -8728,10 +8728,12 @@ class ComputeManager(manager.Manager): context: nova.context.RequestContext, instance: 'objects.Instance', pci_reqs: 'objects.InstancePCIRequests', - request_groups: ty.List['objects.RequestGroup'], + request_groups: list['objects.RequestGroup'], request_level_params: 'objects.RequestLevelParams', - ) -> ty.Tuple[ty.Optional[ty.Dict[str, ty.List[str]]], - ty.Optional[ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]]]]: + ) -> tuple[ + dict[str, list[str]] | None, + dict[str, dict[str, dict[str, int]]] | None + ]: """Allocate resources for the request in placement :param context: nova.context.RequestContext diff --git a/nova/compute/pci_placement_translator.py b/nova/compute/pci_placement_translator.py index 5f7bf12151..b20742ede2 100644 --- a/nova/compute/pci_placement_translator.py +++ b/nova/compute/pci_placement_translator.py @@ -11,9 +11,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + import collections import copy -import typing as ty import os_resource_classes import os_traits @@ -48,7 +48,7 @@ def _is_placement_tracking_enabled() -> bool: return CONF.pci.report_in_placement -def _normalize_traits(traits: ty.List[str]) -> ty.List[str]: +def _normalize_traits(traits: list[str]) -> list[str]: """Make the trait names acceptable for placement. It keeps the already valid standard or custom traits but normalizes trait @@ -66,7 +66,7 @@ def _normalize_traits(traits: ty.List[str]) -> ty.List[str]: return list(standard_traits) + custom_traits -def get_traits(traits_str: str) -> ty.Set[str]: +def get_traits(traits_str: str) -> set[str]: """Return a normalized set of placement standard and custom traits from a string of comma separated trait names. """ @@ -77,8 +77,8 @@ def get_traits(traits_str: str) -> ty.Set[str]: def _get_traits_for_dev( - dev_spec_tags: ty.Dict[str, str], -) -> ty.Set[str]: + dev_spec_tags: dict[str, str], +) -> set[str]: return get_traits(dev_spec_tags.get("traits", "")) | { os_traits.COMPUTE_MANAGED_PCI_DEVICE } @@ -98,7 +98,7 @@ def _normalize_resource_class(rc: str) -> str: def get_resource_class( - requested_name: ty.Optional[str], vendor_id: str, product_id: str + requested_name: str | None, vendor_id: str, product_id: str ) -> str: """Return the normalized resource class name based on what is requested or if nothing is requested then generated from the vendor_id and product_id @@ -112,7 +112,7 @@ def get_resource_class( def _get_rc_for_dev( dev: pci_device.PciDevice, - dev_spec_tags: ty.Dict[str, str], + dev_spec_tags: dict[str, str], ) -> str: """Return the resource class to represent the device. @@ -132,9 +132,9 @@ class PciResourceProvider: def __init__(self, name: str) -> None: self.name = name self.parent_dev = None - self.children_devs: ty.List[pci_device.PciDevice] = [] - self.resource_class: ty.Optional[str] = None - self.traits: ty.Optional[ty.Set[str]] = None + self.children_devs: list[pci_device.PciDevice] = [] + self.resource_class: str | None = None + self.traits: set[str] | None = None self.is_otu = False # This is an adjustment for the total inventory based on normal device # due to possibility of devices held in the tracker even though they @@ -144,7 +144,7 @@ class PciResourceProvider: self.adjustment = 0 @property - def devs(self) -> ty.List[pci_device.PciDevice]: + def devs(self) -> list[pci_device.PciDevice]: return [self.parent_dev] if self.parent_dev else self.children_devs @property @@ -155,7 +155,7 @@ class PciResourceProvider: def to_be_deleted(self): return self.total == 0 - def add_child(self, dev, dev_spec_tags: ty.Dict[str, str]) -> None: + def add_child(self, dev, dev_spec_tags: dict[str, str]) -> None: if self.parent_dev: raise exception.PlacementPciDependentDeviceException( parent_dev=dev.address, @@ -192,7 +192,7 @@ class PciResourceProvider: self.resource_class = rc self.traits = traits - def add_parent(self, dev, dev_spec_tags: ty.Dict[str, str]) -> None: + def add_parent(self, dev, dev_spec_tags: dict[str, str]) -> None: if self.parent_dev or self.children_devs: raise exception.PlacementPciDependentDeviceException( parent_dev=dev.address, @@ -222,7 +222,7 @@ class PciResourceProvider: # Nothing to do here. The update_provider_tree we handle full RP pass - def _get_allocations(self) -> ty.Mapping[str, int]: + def _get_allocations(self) -> collections.Counter[str]: """Return a dict of used resources keyed by consumer UUID. Note that: @@ -279,7 +279,7 @@ class PciResourceProvider: def _adjust_for_removals_and_held_devices( self, provider_tree: provider_tree.ProviderTree, - rp_rc_usage: ty.Dict[str, ty.Dict[str, int]], + rp_rc_usage: dict[str, dict[str, int]], ) -> None: rp_uuid = provider_tree.data(self.name).uuid @@ -323,7 +323,7 @@ class PciResourceProvider: self, provider_tree: provider_tree.ProviderTree, parent_rp_name: str, - rp_rc_usage: ty.Dict[str, ty.Dict[str, int]], + rp_rc_usage: dict[str, dict[str, int]], ) -> None: if not provider_tree.exists(self.name): @@ -362,7 +362,7 @@ class PciResourceProvider: self, allocations: dict, provider_tree: provider_tree.ProviderTree, - same_host_instances: ty.List[str], + same_host_instances: list[str], ) -> bool: updated = False @@ -452,9 +452,9 @@ class PlacementView: def __init__( self, hypervisor_hostname: str, - instances_under_same_host_resize: ty.List[str], + instances_under_same_host_resize: list[str], ) -> None: - self.rps: ty.Dict[str, PciResourceProvider] = {} + self.rps: dict[str, PciResourceProvider] = {} self.root_rp_name = hypervisor_hostname self.same_host_instances = instances_under_same_host_resize @@ -478,7 +478,7 @@ class PlacementView: return self._get_rp_name_for_address(dev.parent_addr) def _add_dev( - self, dev: pci_device.PciDevice, dev_spec_tags: ty.Dict[str, str] + self, dev: pci_device.PciDevice, dev_spec_tags: dict[str, str] ) -> None: if dev_spec_tags.get("physical_network"): # NOTE(gibi): We ignore devices that has physnet configured as @@ -533,7 +533,7 @@ class PlacementView: def process_dev( self, dev: pci_device.PciDevice, - dev_spec: ty.Optional[devspec.PciDeviceSpec], + dev_spec: devspec.PciDeviceSpec | None, ) -> None: # NOTE(gibi): We never observer dev.status DELETED as when that is set # the device is also removed from the PCI tracker. So we can ignore @@ -610,12 +610,12 @@ class PlacementView: @staticmethod def get_usage_per_rc_and_rp( allocations - ) -> ty.Dict[str, ty.Dict[str, int]]: + ) -> dict[str, dict[str, int]]: """Returns a dict keyed by RP uuid and the value is a dict of resource class: usage pairs telling how much total usage the given RP has from the given resource class across all the allocations. """ - rp_rc_usage: ty.Dict[str, ty.Dict[str, int]] = ( + rp_rc_usage: dict[str, dict[str, int]] = ( collections.defaultdict(lambda: collections.defaultdict(int))) for consumer in allocations.values(): for rp_uuid, alloc in consumer["allocations"].items(): @@ -672,7 +672,7 @@ class PlacementView: return updated -def ensure_no_dev_spec_with_devname(dev_specs: ty.List[devspec.PciDeviceSpec]): +def ensure_no_dev_spec_with_devname(dev_specs: list[devspec.PciDeviceSpec]): for dev_spec in dev_specs: if dev_spec.dev_spec_conf.get("devname"): msg = _( @@ -709,7 +709,7 @@ def update_provider_tree_for_pci( nodename: str, pci_tracker: pci_manager.PciDevTracker, allocations: dict, - instances_under_same_host_resize: ty.List[str], + instances_under_same_host_resize: list[str], ) -> bool: """Based on the PciDevice objects in the pci_tracker it calculates what inventories and allocations needs to exist in placement and create the diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index f407ed4d67..3b872985dc 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -21,7 +21,6 @@ import copy import functools import sys import threading -import typing as ty from keystoneauth1 import exceptions as ks_exc from oslo_config import cfg @@ -979,8 +978,8 @@ class ComputeTaskManager: flavor: 'objects.Flavor', request_spec: 'objects.RequestSpec', orig_num_req: int, - project_id: ty.Optional[str] = None, - user_id: ty.Optional[str] = None + project_id: str | None = None, + user_id: str | None = None ) -> None: # A quota "recheck" is a quota check that is performed *after* quota # limited resources are consumed. It is meant to address race diff --git a/nova/crypto.py b/nova/crypto.py index 98ffb7b936..cb7a57d4e3 100644 --- a/nova/crypto.py +++ b/nova/crypto.py @@ -24,7 +24,6 @@ import binascii import hashlib import io import os -import typing as ty from castellan.common import exception as castellan_exception from castellan.common.objects import passphrase @@ -82,7 +81,7 @@ def generate_fingerprint(public_key: str) -> str: reason=_('failed to generate fingerprint')) -def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str: +def generate_x509_fingerprint(pem_key: bytes | str) -> str: try: if isinstance(pem_key, str): pem_key = pem_key.encode('utf-8') @@ -98,7 +97,7 @@ def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str: 'Error message: %s') % ex) -def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]: +def generate_key_pair(bits: int = 2048) -> tuple[str, str, str]: key = paramiko.RSAKey.generate(bits) keyout = io.StringIO() key.write_private_key(keyout) @@ -108,7 +107,7 @@ def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]: return (private_key, public_key, fingerprint) -def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes: +def ssh_encrypt_text(ssh_public_key: str, text: str | bytes) -> bytes: """Encrypt text with an ssh public key. If text is a Unicode string, encode it to UTF-8. @@ -127,7 +126,7 @@ def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes: def generate_winrm_x509_cert( user_id: str, bits: int = 2048 -) -> ty.Tuple[str, str, str]: +) -> tuple[str, str, str]: """Generate a cert for passwordless auth for user in project.""" subject = '/CN=%s' % user_id upn = '%s@localhost' % user_id @@ -171,7 +170,7 @@ def _create_x509_openssl_config(conffile: str, upn: str): def ensure_vtpm_secret( context: nova_context.RequestContext, instance: 'objects.Instance', -) -> ty.Tuple[str, bytes]: +) -> tuple[str, bytes]: """Communicates with the key manager service to retrieve or create a secret for an instance's emulated TPM. @@ -280,7 +279,7 @@ def create_encryption_secret( context: nova_context.RequestContext, instance: 'objects.Instance', driver_bdm: 'driver_block_device.DriverBlockDevice', - for_detail: ty.Optional[str] = None, + for_detail: str | None = None, ): # Use oslo.serialization to encode some random data as passphrase secret = oslo_base64.encode_as_text( @@ -301,7 +300,7 @@ def create_encryption_secret( def get_encryption_secret( context: nova_context.RequestContext, secret_uuid: str, -) -> ty.Optional[str]: +) -> str | None: key_mgr = _get_key_manager() try: key = key_mgr.get(context, secret_uuid) diff --git a/nova/limit/local.py b/nova/limit/local.py index b5472925c2..d5cc8ff55e 100644 --- a/nova/limit/local.py +++ b/nova/limit/local.py @@ -81,7 +81,7 @@ LEGACY_LIMITS = { def get_in_use( context: 'nova.context.RequestContext', project_id: str -) -> ty.Dict[str, int]: +) -> dict[str, int]: """Returns in use counts for each resource, for given project. This sounds simple but many resources can't be counted per project, @@ -107,8 +107,8 @@ def get_in_use( def always_zero_usage( - project_id: str, resource_names: ty.List[str] -) -> ty.Dict[str, int]: + project_id: str, resource_names: list[str] +) -> dict[str, int]: """Called by oslo_limit's enforcer""" # Return usage of 0 for API limits. Values in API requests will be used as # the deltas. @@ -196,8 +196,8 @@ def enforce_db_limit( def _convert_keys_to_legacy_name( - new_dict: ty.Dict[str, int] -) -> ty.Dict[str, int]: + new_dict: dict[str, int] +) -> dict[str, int]: legacy = {} for new_name, old_name in LEGACY_LIMITS.items(): # defensive in case oslo or keystone doesn't give us an answer @@ -205,7 +205,7 @@ def _convert_keys_to_legacy_name( return legacy -def get_legacy_default_limits() -> ty.Dict[str, int]: +def get_legacy_default_limits() -> dict[str, int]: # TODO(johngarbutt): need oslo.limit API for this, it should do caching enforcer = limit.Enforcer(lambda: None) new_limits = enforcer.get_registered_limits(LEGACY_LIMITS.keys()) diff --git a/nova/limit/placement.py b/nova/limit/placement.py index dddf337728..041b11f81f 100644 --- a/nova/limit/placement.py +++ b/nova/limit/placement.py @@ -12,8 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import typing as ty - import os_resource_classes as orc from oslo_limit import exception as limit_exceptions from oslo_limit import limit @@ -42,7 +40,7 @@ LEGACY_LIMITS = { def _get_placement_usages( context: 'nova.context.RequestContext', project_id: str -) -> ty.Dict[str, int]: +) -> dict[str, int]: return report.report_client_singleton().get_usages_counts_for_limits( context, project_id) @@ -50,8 +48,8 @@ def _get_placement_usages( def _get_usage( context: 'nova.context.RequestContext', project_id: str, - resource_names: ty.List[str], -) -> ty.Dict[str, int]: + resource_names: list[str], +) -> dict[str, int]: """Called by oslo_limit's enforcer""" if not limit_utils.use_unified_limits(): raise NotImplementedError("Unified limits support is disabled") @@ -120,7 +118,7 @@ def _get_usage( def _get_deltas_by_flavor( flavor: 'objects.Flavor', is_bfv: bool, count: int -) -> ty.Dict[str, int]: +) -> dict[str, int]: if flavor is None: raise ValueError("flavor") if count < 0: @@ -156,8 +154,8 @@ def enforce_num_instances_and_flavor( is_bfvm: bool, min_count: int, max_count: int, - enforcer: ty.Optional[limit.Enforcer] = None, - delta_updates: ty.Optional[ty.Dict[str, int]] = None, + enforcer: limit.Enforcer | None = None, + delta_updates: dict[str, int] | None = None, ) -> int: """Return max instances possible, else raise TooManyInstances exception.""" if not limit_utils.use_unified_limits(): diff --git a/nova/network/neutron.py b/nova/network/neutron.py index 33ad71306a..8baf65a4fa 100644 --- a/nova/network/neutron.py +++ b/nova/network/neutron.py @@ -645,14 +645,14 @@ class API: # it is a dict of network dicts as returned by the neutron client keyed # by network UUID - networks: ty.Dict[str, ty.Dict] = {} + networks: dict[str, dict] = {} for port_id in ports: # A port_id is optional in the NetworkRequest object so check here # in case the caller forgot to filter the list. if port_id is None: continue - port_req_body: ty.Dict[str, ty.Any] = { + port_req_body: dict[str, ty.Any] = { 'port': { constants.BINDING_HOST_ID: None, } @@ -1120,8 +1120,8 @@ class API: self, context: nova_context.RequestContext, port_id: str, - resource_provider_mapping: ty.Dict[str, ty.List[str]], - ) -> ty.Union[None, str, ty.Dict[str, str]]: + resource_provider_mapping: dict[str, list[str]], + ) -> None | str | dict[str, str]: """Calculate the value of the allocation key of the binding:profile based on the allocated resources. @@ -1579,7 +1579,7 @@ class API: client = get_client(context, admin=True) - bindings_by_port_id: ty.Dict[str, ty.Any] = {} + bindings_by_port_id: dict[str, ty.Any] = {} for vif in network_info: # Now bind each port to the destination host and keep track of each # port that is bound to the resulting binding so we can rollback in @@ -1650,7 +1650,7 @@ class API: need to do the necessary plumbing in order to set a VF up for packet forwarding. """ - vf_profile: ty.Dict[str, ty.Union[str, int]] = {} + vf_profile: dict[str, str | int] = {} pf_mac = pci_dev.sriov_cap.get('pf_mac_address') vf_num = pci_dev.sriov_cap.get('vf_num') @@ -1919,7 +1919,7 @@ class API: # only neutron_admin = get_client(context, admin=True) neutron = get_client(context) - port_allocation: ty.Dict = {} + port_allocation: dict = {} try: # NOTE(gibi): we need to read the port resource information from # neutron here as we might delete the port below @@ -2652,8 +2652,8 @@ class API: self, context: nova_context.RequestContext, instance_uuid: str - ) -> ty.Tuple[ - ty.List['objects.RequestGroup'], 'objects.RequestLevelParams']: + ) -> tuple[ + list['objects.RequestGroup'], 'objects.RequestLevelParams']: """Collect resource requests from the ports associated to the instance :param context: nova request context @@ -3945,7 +3945,7 @@ class API: self, context: nova.context.RequestContext, network_id: str, - ) -> ty.List[str]: + ) -> list[str]: """Query the segmentation ids for the given network. :param context: The request context. @@ -3976,7 +3976,7 @@ class API: self, context: nova.context.RequestContext, subnet_id: str, - ) -> ty.Optional[str]: + ) -> str | None: """Query the segmentation id for the given subnet. :param context: The request context. diff --git a/nova/objects/instance.py b/nova/objects/instance.py index e16528bffa..96f2ab3cb8 100644 --- a/nova/objects/instance.py +++ b/nova/objects/instance.py @@ -13,7 +13,6 @@ # under the License. import contextlib -import typing as ty from oslo_config import cfg from oslo_db import exception as db_exc @@ -1282,9 +1281,9 @@ class Instance(base.NovaPersistentObject, base.NovaObject, def get_pci_devices( self, - source: ty.Optional[int] = None, - request_id: ty.Optional[str] = None, - ) -> ty.List["objects.PciDevice"]: + source: int | None = None, + request_id: str | None = None, + ) -> list["objects.PciDevice"]: """Return the PCI devices allocated to the instance :param source: Filter by source. It can be diff --git a/nova/objects/request_spec.py b/nova/objects/request_spec.py index 7150bd44fd..06fa1849cc 100644 --- a/nova/objects/request_spec.py +++ b/nova/objects/request_spec.py @@ -489,7 +489,7 @@ class RequestSpec(base.NovaObject): return filt_props @staticmethod - def _rc_from_request(spec: ty.Dict[str, ty.Any]) -> str: + def _rc_from_request(spec: dict[str, ty.Any]) -> str: return pci_placement_translator.get_resource_class( spec.get("resource_class"), spec.get("vendor_id"), @@ -497,7 +497,7 @@ class RequestSpec(base.NovaObject): ) @staticmethod - def _traits_from_request(spec: ty.Dict[str, ty.Any]) -> ty.Set[str]: + def _traits_from_request(spec: dict[str, ty.Any]) -> set[str]: return pci_placement_translator.get_traits(spec.get("traits", "")) def generate_request_groups_from_pci_requests(self): diff --git a/nova/pci/devspec.py b/nova/pci/devspec.py index 2c83b1ac7c..961cfd033e 100644 --- a/nova/pci/devspec.py +++ b/nova/pci/devspec.py @@ -38,7 +38,7 @@ REGEX_ANY = '.*' LOG = logging.getLogger(__name__) CONF = nova.conf.CONF -PCISpecAddressType = ty.Union[ty.Dict[str, str], str] +PCISpecAddressType = dict[str, str] | str class PciAddressSpec(metaclass=abc.ABCMeta): @@ -242,7 +242,7 @@ class WhitelistPciAddress(object): else: self.pci_address_spec = PhysicalPciAddress(pci_addr) - def match(self, pci_addr: str, pci_phys_addr: ty.Optional[str]) -> bool: + def match(self, pci_addr: str, pci_phys_addr: str | None) -> bool: """Match a device to this PciAddress. Assume this is called with a ``pci_addr`` and ``pci_phys_addr`` @@ -269,7 +269,7 @@ class WhitelistPciAddress(object): class PciDeviceSpec(PciAddressSpec): - def __init__(self, dev_spec: ty.Dict[str, str]) -> None: + def __init__(self, dev_spec: dict[str, str]) -> None: # stored for better error reporting self.dev_spec_conf = copy.deepcopy(dev_spec) # the non tag fields (i.e. address, devname) will be removed by @@ -277,7 +277,7 @@ class PciDeviceSpec(PciAddressSpec): self.tags = dev_spec self._init_dev_details() - def _address_obj(self) -> ty.Optional[WhitelistPciAddress]: + def _address_obj(self) -> WhitelistPciAddress | None: address_obj = None if self.dev_name: address_str, pf = utils.get_function_by_ifname(self.dev_name) @@ -295,7 +295,7 @@ class PciDeviceSpec(PciAddressSpec): self.vendor_id = self.tags.pop("vendor_id", ANY) self.product_id = self.tags.pop("product_id", ANY) self.dev_name = self.tags.pop("devname", None) - self.address: ty.Optional[WhitelistPciAddress] = None + self.address: WhitelistPciAddress | None = None # Note(moshele): The address attribute can be a string or a dict. # For glob syntax or specific pci it is a string and for regex syntax # it is a dict. The WhitelistPciAddress class handles both types. @@ -371,7 +371,7 @@ class PciDeviceSpec(PciAddressSpec): 'pf_addr': pf_addr}) def _ensure_remote_managed_dev_vpd_serial( - self, dev_dict: ty.Dict[str, ty.Any]) -> bool: + self, dev_dict: dict[str, ty.Any]) -> bool: """Ensure the presence of a serial number field in PCI VPD. A card serial number extracted from PCI VPD is required to allow a @@ -388,8 +388,8 @@ class PciDeviceSpec(PciAddressSpec): # an empty string which is not useful for device identification. return bool(card_sn) - def match(self, dev_dict: ty.Dict[str, ty.Any]) -> bool: - address_obj: ty.Optional[WhitelistPciAddress] = self._address_obj() + def match(self, dev_dict: dict[str, ty.Any]) -> bool: + address_obj: WhitelistPciAddress | None = self._address_obj() if not address_obj: return False @@ -412,7 +412,7 @@ class PciDeviceSpec(PciAddressSpec): } return self.match(dev_dict) - def get_tags(self) -> ty.Dict[str, str]: + def get_tags(self) -> dict[str, str]: return self.tags def _normalize_device_spec_tag(self, tag): @@ -426,7 +426,7 @@ class PciDeviceSpec(PciAddressSpec): reason=f"Cannot parse tag '{tag}': " + str(e) ) - def enhanced_pci_device_with_spec_tags(self, dev: ty.Dict[str, ty.Any]): + def enhanced_pci_device_with_spec_tags(self, dev: dict[str, ty.Any]): spec_tags = ["managed", "live_migratable"] for tag in spec_tags: tag_value = self.tags.get(tag) diff --git a/nova/pci/manager.py b/nova/pci/manager.py index 9efa98a016..b2d365df45 100644 --- a/nova/pci/manager.py +++ b/nova/pci/manager.py @@ -32,8 +32,8 @@ from nova.pci import whitelist CONF = cfg.CONF LOG = logging.getLogger(__name__) -MappingType = ty.Dict[str, ty.List['objects.PciDevice']] -PCIInvType = ty.DefaultDict[str, ty.List['objects.PciDevice']] +MappingType = dict[str, list['objects.PciDevice']] +PCIInvType = collections.defaultdict[str, list['objects.PciDevice']] class PciDevTracker(object): @@ -68,8 +68,8 @@ class PciDevTracker(object): :param compute_node: The object.ComputeNode whose PCI devices we're tracking. """ - self.stale: ty.Dict[str, objects.PciDevice] = {} - self.to_be_removed_when_freed: ty.Dict[str, objects.PciDevice] = {} + self.stale: dict[str, objects.PciDevice] = {} + self.to_be_removed_when_freed: dict[str, objects.PciDevice] = {} self.node_id: str = compute_node.id self.dev_filter = whitelist.Whitelist(CONF.pci.device_spec) numa_topology = compute_node.numa_topology @@ -177,7 +177,7 @@ class PciDevTracker(object): self._set_hvdevs(devices) @staticmethod - def _build_device_tree(all_devs: ty.List['objects.PciDevice']) -> None: + def _build_device_tree(all_devs: list['objects.PciDevice']) -> None: """Build a tree of devices that represents parent-child relationships. We need to have the relationships set up so that we can easily make @@ -214,7 +214,7 @@ class PciDevTracker(object): if dev.parent_device: parents[dev.parent_addr].child_devices.append(dev) - def _set_hvdevs(self, devices: ty.List[ty.Dict[str, ty.Any]]) -> None: + def _set_hvdevs(self, devices: list[dict[str, ty.Any]]) -> None: exist_addrs = set([dev.address for dev in self.pci_devs]) new_addrs = set([dev['address'] for dev in devices]) @@ -273,7 +273,7 @@ class PciDevTracker(object): self.stats.remove_device(existed) else: # Update tracked devices. - new_value: ty.Dict[str, ty.Any] + new_value: dict[str, ty.Any] new_value = next((dev for dev in devices if dev['address'] == existed.address)) new_value['compute_node_id'] = self.node_id @@ -312,7 +312,7 @@ class PciDevTracker(object): context: ctx.RequestContext, pci_requests: 'objects.InstancePCIRequests', instance_numa_topology: 'objects.InstanceNUMATopology', - ) -> ty.List['objects.PciDevice']: + ) -> list['objects.PciDevice']: instance_cells = None if instance_numa_topology: instance_cells = instance_numa_topology.cells @@ -337,7 +337,7 @@ class PciDevTracker(object): context: ctx.RequestContext, pci_requests: 'objects.InstancePCIRequests', instance_numa_topology: 'objects.InstanceNUMATopology', - ) -> ty.List['objects.PciDevice']: + ) -> list['objects.PciDevice']: devs = [] @@ -350,7 +350,7 @@ class PciDevTracker(object): return devs def _allocate_instance( - self, instance: 'objects.Instance', devs: ty.List['objects.PciDevice'], + self, instance: 'objects.Instance', devs: list['objects.PciDevice'], ) -> None: for dev in devs: dev.allocate(instance) @@ -398,7 +398,9 @@ class PciDevTracker(object): pci_mapping.pop(instance_uuid, None) def _free_device( - self, dev: 'objects.PciDevice', instance: 'objects.Instance' = None, + self, + dev: 'objects.PciDevice', + instance: 'objects.Instance | None' = None, ) -> None: freed_devs = dev.free(instance) stale = self.stale.pop(dev.address, None) diff --git a/nova/pci/request.py b/nova/pci/request.py index 0e61be9a1e..938dcff738 100644 --- a/nova/pci/request.py +++ b/nova/pci/request.py @@ -24,7 +24,7 @@ | "numa_policy": "legacy" | }' - Aliases with the same name, device_type and numa_policy are ORed:: +Aliases with the same name, device_type and numa_policy are ORed:: | [pci] | alias = '{ @@ -34,11 +34,11 @@ | "device_type": "type-PCI", | }' - These two aliases define a device request meaning: vendor_id is "8086" and - product_id is "0442" or "0443". - """ +These two aliases define a device request meaning: vendor_id is "8086" and +product_id is "0442" or "0443". +""" + import functools -import typing as ty import jsonschema from oslo_log import log as logging @@ -55,7 +55,7 @@ from nova.objects import fields as obj_fields from nova.pci import utils from oslo_utils import strutils -Alias = ty.Dict[str, ty.Tuple[str, ty.List[ty.Dict[str, str]]]] +Alias = dict[str, tuple[str, list[dict[str, str]]]] PCI_NET_TAG = 'physical_network' PCI_TRUSTED_TAG = 'trusted' @@ -235,12 +235,12 @@ def get_alias_from_config() -> Alias: def _translate_alias_to_requests( - alias_spec: str, affinity_policy: ty.Optional[str] = None, -) -> ty.List['objects.InstancePCIRequest']: + alias_spec: str, affinity_policy: str | None = None, +) -> list['objects.InstancePCIRequest']: """Generate complete pci requests from pci aliases in extra_spec.""" pci_aliases = get_alias_from_config() - pci_requests: ty.List[objects.InstancePCIRequest] = [] + pci_requests: list[objects.InstancePCIRequest] = [] for name, count in [spec.split(':') for spec in alias_spec.split(',')]: name = name.strip() if name not in pci_aliases: @@ -267,7 +267,7 @@ def get_instance_pci_request_from_vif( context: ctx.RequestContext, instance: 'objects.Instance', vif: network_model.VIF, -) -> ty.Optional['objects.InstancePCIRequest']: +) -> 'objects.InstancePCIRequest | None': """Given an Instance, return the PCI request associated to the PCI device related to the given VIF (if any) on the compute node the instance is currently running. @@ -322,7 +322,7 @@ def get_instance_pci_request_from_vif( def get_pci_requests_from_flavor( - flavor: 'objects.Flavor', affinity_policy: ty.Optional[str] = None, + flavor: 'objects.Flavor', affinity_policy: str | None = None, ) -> 'objects.InstancePCIRequests': """Validate and return PCI requests. @@ -369,7 +369,7 @@ def get_pci_requests_from_flavor( :raises: exception.PciInvalidAlias if the configuration contains invalid aliases. """ - pci_requests: ty.List[objects.InstancePCIRequest] = [] + pci_requests: list[objects.InstancePCIRequest] = [] if ('extra_specs' in flavor and 'pci_passthrough:alias' in flavor['extra_specs']): pci_requests = _translate_alias_to_requests( diff --git a/nova/pci/stats.py b/nova/pci/stats.py index a7a33da787..5fa3ee615c 100644 --- a/nova/pci/stats.py +++ b/nova/pci/stats.py @@ -13,7 +13,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + import collections +import collections.abc import copy import typing as ty @@ -36,7 +38,7 @@ LOG = logging.getLogger(__name__) # TODO(stephenfin): We might want to use TypedDict here. Refer to # https://mypy.readthedocs.io/en/latest/kinds_of_types.html#typeddict for # more information. -Pool = ty.Dict[str, ty.Any] +Pool = dict[str, ty.Any] class PciDeviceStats(object): @@ -81,8 +83,8 @@ class PciDeviceStats(object): def __init__( self, numa_topology: 'objects.NUMATopology', - stats: 'objects.PciDevicePoolList' = None, - dev_filter: ty.Optional[whitelist.Whitelist] = None, + stats: 'objects.PciDevicePoolList | None' = None, + dev_filter: whitelist.Whitelist | None = None, ) -> None: self.numa_topology = numa_topology self.pools = ( @@ -93,12 +95,12 @@ class PciDeviceStats(object): CONF.pci.device_spec) def _equal_properties( - self, dev: Pool, entry: Pool, matching_keys: ty.List[str], + self, dev: Pool, entry: Pool, matching_keys: list[str], ) -> bool: return all(dev.get(prop) == entry.get(prop) for prop in matching_keys) - def _find_pool(self, dev_pool: Pool) -> ty.Optional[Pool]: + def _find_pool(self, dev_pool: Pool) -> Pool | None: """Return the first pool that matches dev.""" for pool in self.pools: pool_keys = pool.copy() @@ -137,7 +139,7 @@ class PciDeviceStats(object): def _create_pool_keys_from_dev( self, dev: 'objects.PciDevice', - ) -> ty.Optional[Pool]: + ) -> Pool | None: """Create a stats pool dict that this dev is supposed to be part of Note that this pool dict contains the stats pool's keys and their @@ -181,7 +183,7 @@ class PciDeviceStats(object): def _get_pool_with_device_type_mismatch( self, dev: 'objects.PciDevice', - ) -> ty.Optional[ty.Tuple[Pool, 'objects.PciDevice']]: + ) -> tuple[Pool, 'objects.PciDevice'] | None: """Check for device type mismatch in the pools for a given device. Return (pool, device) if device type does not match or a single None @@ -223,7 +225,7 @@ class PciDeviceStats(object): @staticmethod def _decrease_pool_count( - pool_list: ty.List[Pool], pool: Pool, count: int = 1, + pool_list: list[Pool], pool: Pool, count: int = 1, ) -> int: """Decrement pool's size by count. @@ -248,15 +250,15 @@ class PciDeviceStats(object): pool['devices'].remove(dev) self._decrease_pool_count(self.pools, pool) - def get_free_devs(self) -> ty.List['objects.PciDevice']: - free_devs: ty.List[objects.PciDevice] = [] + def get_free_devs(self) -> list['objects.PciDevice']: + free_devs: list[objects.PciDevice] = [] for pool in self.pools: free_devs.extend(pool['devices']) return free_devs def _allocate_devs( self, pool: Pool, num: int, request_id: str - ) -> ty.List["objects.PciDevice"]: + ) -> list["objects.PciDevice"]: alloc_devices = [] for _ in range(num): pci_dev = pool['devices'].pop() @@ -268,10 +270,10 @@ class PciDeviceStats(object): def consume_requests( self, pci_requests: 'objects.InstancePCIRequests', - numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None, - ) -> ty.Optional[ty.List['objects.PciDevice']]: + numa_cells: list['objects.InstanceNUMACell'] | None = None, + ) -> list['objects.PciDevice'] | None: - alloc_devices: ty.List[objects.PciDevice] = [] + alloc_devices: list[objects.PciDevice] = [] for request in pci_requests: count = request.count @@ -360,8 +362,8 @@ class PciDeviceStats(object): return def _filter_pools_for_spec( - self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest', - ) -> ty.List[Pool]: + self, pools: list[Pool], request: 'objects.InstancePCIRequest', + ) -> list[Pool]: """Filter out pools that don't match the request's device spec. Exclude pools that do not match the specified ``vendor_id``, @@ -390,10 +392,10 @@ class PciDeviceStats(object): def _filter_pools_for_numa_cells( self, - pools: ty.List[Pool], + pools: list[Pool], request: 'objects.InstancePCIRequest', - numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']], - ) -> ty.List[Pool]: + numa_cells: list['objects.InstanceNUMACell'] | None, + ) -> list[Pool]: """Filter out pools with the wrong NUMA affinity, if required. Exclude pools that do not have *suitable* PCI NUMA affinity. @@ -473,9 +475,9 @@ class PciDeviceStats(object): def _filter_pools_for_socket_affinity( self, - pools: ty.List[Pool], - numa_cells: ty.List['objects.InstanceNUMACell'], - ) -> ty.List[Pool]: + pools: list[Pool], + numa_cells: list['objects.InstanceNUMACell'], + ) -> list[Pool]: host_cells = self.numa_topology.cells # bail early if we don't have socket information for all host_cells. # This could happen if we're running on an weird older system with @@ -509,8 +511,8 @@ class PciDeviceStats(object): ] def _filter_pools_for_unrequested_pfs( - self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest', - ) -> ty.List[Pool]: + self, pools: list[Pool], request: 'objects.InstancePCIRequest', + ) -> list[Pool]: """Filter out pools with PFs, unless these are required. This is necessary in cases where PFs and VFs have the same product_id @@ -534,9 +536,9 @@ class PciDeviceStats(object): def _filter_pools_for_unrequested_vdpa_devices( self, - pools: ty.List[Pool], + pools: list[Pool], request: 'objects.InstancePCIRequest', - ) -> ty.List[Pool]: + ) -> list[Pool]: """Filter out pools with VDPA devices, unless these are required. This is necessary as vdpa devices require special handling and @@ -559,8 +561,8 @@ class PciDeviceStats(object): return pools def _filter_pools_for_unrequested_remote_managed_devices( - self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest', - ) -> ty.List[Pool]: + self, pools: list[Pool], request: 'objects.InstancePCIRequest', + ) -> list[Pool]: """Filter out pools with remote_managed devices, unless requested. Remote-managed devices are not usable for legacy SR-IOV or hardware @@ -581,10 +583,10 @@ class PciDeviceStats(object): def _filter_pools_based_on_placement_allocation( self, - pools: ty.List[Pool], + pools: list[Pool], request: 'objects.InstancePCIRequest', - rp_uuids: ty.List[str], - ) -> ty.List[Pool]: + rp_uuids: list[str], + ) -> list[Pool]: if not rp_uuids: # If there is no placement allocation then we don't need to filter # by it. This could happen if the instance only has neutron port @@ -620,8 +622,8 @@ class PciDeviceStats(object): return matching_pools def _filter_pools_for_live_migratable_devices( - self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest', - ) -> ty.List[Pool]: + self, pools: list[Pool], request: 'objects.InstancePCIRequest', + ) -> list[Pool]: """Filter out pools with non live_migratable devices. :param pools: A list of PCI device pool dicts @@ -653,11 +655,11 @@ class PciDeviceStats(object): def _filter_pools( self, - pools: ty.List[Pool], + pools: list[Pool], request: 'objects.InstancePCIRequest', - numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']], - rp_uuids: ty.List[str], - ) -> ty.Optional[ty.List[Pool]]: + numa_cells: list['objects.InstanceNUMACell'] | None, + rp_uuids: list[str], + ) -> list[Pool] | None: """Determine if an individual PCI request can be met. Filter pools, which are collections of devices with similar traits, to @@ -789,9 +791,9 @@ class PciDeviceStats(object): def support_requests( self, - requests: ty.List['objects.InstancePCIRequest'], - provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]], - numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None, + requests: list['objects.InstancePCIRequest'], + provider_mapping: dict[str, list[str]] | None, + numa_cells: list['objects.InstanceNUMACell'] | None = None, ) -> bool: """Determine if the PCI requests can be met. @@ -833,10 +835,10 @@ class PciDeviceStats(object): def _apply_request( self, - pools: ty.List[Pool], + pools: list[Pool], request: 'objects.InstancePCIRequest', - rp_uuids: ty.List[str], - numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None, + rp_uuids: list[str], + numa_cells: list['objects.InstanceNUMACell'] | None = None, ) -> bool: """Apply an individual PCI request. @@ -895,9 +897,9 @@ class PciDeviceStats(object): def _get_rp_uuids_for_request( self, - provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]], + provider_mapping: dict[str, list[str]] | None, request: 'objects.InstancePCIRequest' - ) -> ty.List[str]: + ) -> list[str]: """Return the list of RP uuids that are fulfilling the request. An RP will be in the list as many times as many devices needs to @@ -937,9 +939,9 @@ class PciDeviceStats(object): def apply_requests( self, - requests: ty.List['objects.InstancePCIRequest'], - provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]], - numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None, + requests: list['objects.InstancePCIRequest'], + provider_mapping: dict[str, list[str]] | None, + numa_cells: list['objects.InstanceNUMACell'] | None = None, ) -> None: """Apply PCI requests to the PCI stats. @@ -970,8 +972,8 @@ class PciDeviceStats(object): if not self._apply_request(self.pools, r, rp_uuids, numa_cells): raise exception.PciDeviceRequestFailed(requests=requests) - def __iter__(self) -> ty.Iterator[Pool]: - pools: ty.List[Pool] = [] + def __iter__(self) -> collections.abc.Iterator[Pool]: + pools: list[Pool] = [] for pool in self.pools: pool = copy.deepcopy(pool) # 'devices' shouldn't be part of stats @@ -1045,7 +1047,7 @@ class PciDeviceStats(object): pool['rp_uuid'] = next(iter(pool_rps)) @staticmethod - def _assert_one_pool_per_rp_uuid(pools: ty.List[Pool]) -> bool: + def _assert_one_pool_per_rp_uuid(pools: list[Pool]) -> bool: """Asserts that each pool has a unique rp_uuid if any :param pools: A list of Pool objects. diff --git a/nova/pci/utils.py b/nova/pci/utils.py index 51716c9d98..9995b75573 100644 --- a/nova/pci/utils.py +++ b/nova/pci/utils.py @@ -38,7 +38,7 @@ _SRIOV_TOTALVFS = "sriov_totalvfs" def pci_device_prop_match( - pci_dev: 'stats.Pool', specs: ty.List[ty.Dict[str, str]], + pci_dev: 'stats.Pool', specs: list[dict[str, str]], ) -> bool: """Check if the pci_dev meet spec requirement @@ -53,7 +53,7 @@ def pci_device_prop_match( """ - def _matching_devices(spec: ty.Dict[str, str]) -> bool: + def _matching_devices(spec: dict[str, str]) -> bool: for k, v in spec.items(): pci_dev_v = pci_dev.get(k) if isinstance(v, list) and isinstance(pci_dev_v, list): @@ -87,7 +87,7 @@ def parse_address(address: str) -> ty.Sequence[str]: return m.groups() -def get_pci_address_fields(pci_addr: str) -> ty.Tuple[str, str, str, str]: +def get_pci_address_fields(pci_addr: str) -> tuple[str, str, str, str]: """Parse a fully-specified PCI device address. Does not validate that the components are valid hex or wildcard values. @@ -111,7 +111,7 @@ def get_pci_address(domain: str, bus: str, slot: str, func: str) -> str: return '%s:%s:%s.%s' % (domain, bus, slot, func) -def get_function_by_ifname(ifname: str) -> ty.Tuple[ty.Optional[str], bool]: +def get_function_by_ifname(ifname: str) -> tuple[str | None, bool]: """Given the device name, returns the PCI address of a device and returns True if the address is in a physical function. """ @@ -246,14 +246,14 @@ def get_vf_product_id_by_pf_addr(pci_addr: str) -> str: return vf_product_id -def get_pci_ids_by_pci_addr(pci_addr: str) -> ty.Tuple[str, ...]: +def get_pci_ids_by_pci_addr(pci_addr: str) -> tuple[str, ...]: """Get the product ID and vendor ID for a given PCI device. :param pci_addr: A string of the form "::.". :return: A list containing a vendor and product ids. """ id_prefix = f"/sys/bus/pci/devices/{pci_addr}" - ids: ty.List[str] = [] + ids: list[str] = [] for id_name in ("vendor", "product"): try: with open(os.path.join(id_prefix, id_name)) as f: diff --git a/nova/pci/whitelist.py b/nova/pci/whitelist.py index 5847640ba7..b144359819 100644 --- a/nova/pci/whitelist.py +++ b/nova/pci/whitelist.py @@ -33,7 +33,7 @@ class Whitelist(object): assignable. """ - def __init__(self, whitelist_spec: ty.Optional[str] = None) -> None: + def __init__(self, whitelist_spec: str | None = None) -> None: """White list constructor For example, the following json string specifies that devices whose @@ -55,7 +55,7 @@ class Whitelist(object): @staticmethod def _parse_white_list_from_config( whitelists: str, - ) -> ty.List[devspec.PciDeviceSpec]: + ) -> list[devspec.PciDeviceSpec]: """Parse and validate the pci whitelist from the nova config.""" specs = [] for jsonspec in whitelists: @@ -83,8 +83,8 @@ class Whitelist(object): return specs def device_assignable( - self, dev: ty.Dict[str, ty.Any] - ) -> ty.Optional[devspec.PciDeviceSpec]: + self, dev: dict[str, ty.Any] + ) -> devspec.PciDeviceSpec | None: """Check if a device is part of pci device_spec (whitelist) and so can be assigned to a guest. If yes return the spec, else return None @@ -99,7 +99,7 @@ class Whitelist(object): def get_devspec( self, pci_dev: 'objects.PciDevice', - ) -> ty.Optional[devspec.PciDeviceSpec]: + ) -> devspec.PciDeviceSpec | None: for spec in self.specs: if spec.match_pci_obj(pci_dev): return spec diff --git a/nova/privsep/qemu.py b/nova/privsep/qemu.py index 40cdd38733..f1334b9dbb 100644 --- a/nova/privsep/qemu.py +++ b/nova/privsep/qemu.py @@ -54,12 +54,12 @@ def convert_image(source, dest, in_format, out_format, instances_path, def unprivileged_convert_image( source: str, dest: str, - in_format: ty.Optional[str], + in_format: str | None, out_format: str, instances_path: str, compress: bool, - src_encryption: ty.Optional[EncryptionOptions] = None, - dest_encryption: ty.Optional[EncryptionOptions] = None, + src_encryption: EncryptionOptions | None = None, + dest_encryption: EncryptionOptions | None = None, ) -> None: """Disk image conversion with qemu-img @@ -126,7 +126,7 @@ def unprivileged_convert_image( src_secret_file = None dest_secret_file = None - encryption_opts: ty.List[str] = [] + encryption_opts: list[str] = [] with contextlib.ExitStack() as stack: if src_encryption: src_secret_file = stack.enter_context( diff --git a/nova/scheduler/client/report.py b/nova/scheduler/client/report.py index dfa371baae..8f3600984a 100644 --- a/nova/scheduler/client/report.py +++ b/nova/scheduler/client/report.py @@ -14,12 +14,12 @@ # under the License. import collections +import collections.abc import contextlib import copy import functools import random import time -import typing as ty from keystoneauth1 import exceptions as ks_exc import os_resource_classes as orc @@ -233,7 +233,7 @@ class SchedulerReportClient(object): # provider and inventory information self._provider_tree: provider_tree.ProviderTree = None # Track the last time we updated providers' aggregates and traits - self._association_refresh_time: ty.Dict[str, float] = {} + self._association_refresh_time: dict[str, float] = {} self._client = self._create_client() # NOTE(danms): Keep track of how naggy we've been self._warn_count = 0 @@ -1064,8 +1064,8 @@ class SchedulerReportClient(object): self, context: nova_context.RequestContext, rp_uuid: str, - traits: ty.Iterable[str], - generation: ty.Optional[int] = None + traits: collections.abc.Iterable[str], + generation: int | None = None ): """Replace a provider's traits with those specified. @@ -1699,7 +1699,7 @@ class SchedulerReportClient(object): self, context: nova_context.RequestContext, consumer_uuid: str, - resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]], + resources: dict[str, dict[str, dict[str, int]]], ) -> None: """Adds certain resources to the current allocation of the consumer. @@ -1750,7 +1750,7 @@ class SchedulerReportClient(object): self, context: nova_context.RequestContext, consumer_uuid: str, - resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]], + resources: dict[str, dict[str, dict[str, int]]], ) -> bool: current_allocs = self.get_allocs_for_consumer(context, consumer_uuid) @@ -1790,7 +1790,7 @@ class SchedulerReportClient(object): self, context: nova_context.RequestContext, consumer_uuid: str, - resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]] + resources: dict[str, dict[str, dict[str, int]]] ) -> None: """Removes certain resources from the current allocation of the consumer. @@ -1840,7 +1840,7 @@ class SchedulerReportClient(object): self, context: nova_context.RequestContext, consumer_uuid: str, - resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]] + resources: dict[str, dict[str, dict[str, int]]] ) -> bool: if not resources: # Nothing to remove so do not query or update allocation in @@ -2613,7 +2613,7 @@ class SchedulerReportClient(object): pcpus = usages['usages'].get(orc.PCPU, 0) return vcpus + pcpus - total_counts: ty.Dict[str, ty.Dict[str, int]] = {'project': {}} + total_counts: dict[str, dict[str, int]] = {'project': {}} # First query counts across all users of a project LOG.debug('Getting usages for project_id %s from placement', project_id) diff --git a/nova/scheduler/utils.py b/nova/scheduler/utils.py index 58a52ab02d..1af6654d8f 100644 --- a/nova/scheduler/utils.py +++ b/nova/scheduler/utils.py @@ -17,7 +17,6 @@ import collections import re import sys -import typing as ty from urllib import parse import os_resource_classes as orc @@ -63,14 +62,14 @@ class ResourceRequest(object): Do not call this directly, use the existing static factory methods from_*() """ - self._rg_by_id: ty.Dict[str, objects.RequestGroup] = {} - self._group_policy: ty.Optional[str] = None + self._rg_by_id: dict[str, objects.RequestGroup] = {} + self._group_policy: str | None = None # Default to the configured limit but _limit can be # set to None to indicate "no limit". self._limit = CONF.scheduler.max_placement_results - self._root_required: ty.Set[str] = set() - self._root_forbidden: ty.Set[str] = set() - self._same_subtree: ty.List[ty.List[str]] = [] + self._root_required: set[str] = set() + self._root_forbidden: set[str] = set() + self._same_subtree: list[list[str]] = [] self.suffixed_groups_from_flavor = 0 # TODO(stephenfin): Remove this parameter once we drop support for # 'vcpu_pin_set' @@ -205,7 +204,7 @@ class ResourceRequest(object): @classmethod def from_request_groups( cls, - request_groups: ty.List['objects.RequestGroup'], + request_groups: list['objects.RequestGroup'], request_level_params: 'objects.RequestLevelParams', group_policy: str, ) -> 'ResourceRequest': @@ -351,7 +350,8 @@ class ResourceRequest(object): if not vpmem_labels: # No vpmems required return - amount_by_rc: ty.DefaultDict[str, int] = collections.defaultdict(int) + amount_by_rc: collections.defaultdict[ + str, int] = collections.defaultdict(int) for vpmem_label in vpmem_labels: resource_class = orc.normalize_name( "PMEM_NAMESPACE_" + vpmem_label) @@ -535,7 +535,8 @@ class ResourceRequest(object): :return: A dict of the form {resource_class: amount} """ - ret: ty.DefaultDict[str, int] = collections.defaultdict(lambda: 0) + ret: collections.defaultdict[ + str, int] = collections.defaultdict(lambda: 0) for rg in self._rg_by_id.values(): for resource_class, amount in rg.resources.items(): ret[resource_class] += amount @@ -579,7 +580,7 @@ class ResourceRequest(object): @property def all_required_traits(self): - traits: ty.Set[str] = set() + traits: set[str] = set() for rr in self._rg_by_id.values(): traits = traits.union(rr.required_traits) return traits diff --git a/nova/share/manila.py b/nova/share/manila.py index 1f126462ca..0001e83932 100644 --- a/nova/share/manila.py +++ b/nova/share/manila.py @@ -16,7 +16,6 @@ Handles all requests relating to shares + manila. from dataclasses import dataclass import functools -from typing import Optional from openstack import exceptions as sdk_exc from oslo_log import log as logging @@ -53,18 +52,18 @@ def _manilaclient(context, admin=False): class Share(): id: str size: int - availability_zone: Optional[str] + availability_zone: str | None created_at: str status: str - name: Optional[str] - description: Optional[str] + name: str | None + description: str | None project_id: str - snapshot_id: Optional[str] - share_network_id: Optional[str] + snapshot_id: str | None + share_network_id: str | None share_proto: str export_location: str metadata: dict - share_type: Optional[str] + share_type: str | None is_public: bool @classmethod @@ -95,7 +94,7 @@ class Access(): state: str access_type: str access_to: str - access_key: Optional[str] + access_key: str | None @classmethod def from_manila_access(cls, manila_access): diff --git a/nova/tests/fixtures/libvirt.py b/nova/tests/fixtures/libvirt.py index 41885f3245..9818526f95 100644 --- a/nova/tests/fixtures/libvirt.py +++ b/nova/tests/fixtures/libvirt.py @@ -17,7 +17,6 @@ import os import sys import textwrap import time -import typing as ty from unittest import mock import fixtures @@ -1126,7 +1125,7 @@ class NodeDevice(object): def name(self) -> str: return self._name - def listCaps(self) -> ty.List[str]: + def listCaps(self) -> list[str]: return [self.name().split('_')[0]] diff --git a/nova/tests/functional/integrated_helpers.py b/nova/tests/functional/integrated_helpers.py index 92d97cb223..3898ea0afa 100644 --- a/nova/tests/functional/integrated_helpers.py +++ b/nova/tests/functional/integrated_helpers.py @@ -44,8 +44,6 @@ from nova.tests import fixtures as nova_fixtures from nova.tests.functional.api import client as api_client from nova.tests.functional import fixtures as func_fixtures from nova import utils -import typing as ty - CONF = nova.conf.CONF LOG = logging.getLogger(__name__) @@ -728,7 +726,7 @@ class InstanceHelperMixin: {'createImage': {'name': snapshot_name}} ) - def _attach_volumes(self, server, vol_ids: ty.List[str]): + def _attach_volumes(self, server, vol_ids: list[str]): # attach volumes to server # these attachments are done by nova api, that means # nova know about these attachments and so they are valid ones. diff --git a/nova/tests/functional/libvirt/test_pci_sriov_servers.py b/nova/tests/functional/libvirt/test_pci_sriov_servers.py index fff706320e..35fc589487 100644 --- a/nova/tests/functional/libvirt/test_pci_sriov_servers.py +++ b/nova/tests/functional/libvirt/test_pci_sriov_servers.py @@ -15,7 +15,6 @@ import copy import pprint -import typing as ty from unittest import mock from urllib import parse as urlparse @@ -87,7 +86,7 @@ class PciPlacementHealingFixture(fixtures.Fixture): ) ) - def last_healing(self, hostname: str) -> ty.Optional[ty.Tuple[dict, dict]]: + def last_healing(self, hostname: str) -> tuple[dict, dict] | None: for h, updated, before, after in self.calls: if h == hostname and updated: return before, after diff --git a/nova/virt/driver.py b/nova/virt/driver.py index f51e3e5187..fc91a3626e 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -20,6 +20,7 @@ Driver base-classes: types that support that contract """ +from collections.abc import Mapping import dataclasses import itertools import sys @@ -190,8 +191,8 @@ def block_device_info_get_mapping(block_device_info): def block_device_info_get_encrypted_disks( - block_device_info: ty.Mapping[str, ty.Any], -) -> ty.List['nova.virt.block_device.DriverBlockDevice']: + block_device_info: Mapping[str, ty.Any], +) -> list['nova.virt.block_device.DriverBlockDevice']: block_device_info = block_device_info or {} # swap is a single device, not a list diff --git a/nova/virt/hardware.py b/nova/virt/hardware.py index 6e3bcbcf2d..2f0c017c5c 100644 --- a/nova/virt/hardware.py +++ b/nova/virt/hardware.py @@ -106,7 +106,7 @@ def get_cpu_shared_set(): return shared_ids -def parse_cpu_spec(spec: str) -> ty.Set[int]: +def parse_cpu_spec(spec: str) -> set[int]: """Parse a CPU set specification. Each element in the list is either a single CPU number, a range of @@ -117,8 +117,8 @@ def parse_cpu_spec(spec: str) -> ty.Set[int]: :returns: a set of CPU indexes """ - cpuset_ids: ty.Set[int] = set() - cpuset_reject_ids: ty.Set[int] = set() + cpuset_ids: set[int] = set() + cpuset_reject_ids: set[int] = set() for rule in spec.split(','): rule = rule.strip() # Handle multi ',' @@ -169,7 +169,7 @@ def parse_cpu_spec(spec: str) -> ty.Set[int]: def format_cpu_spec( - cpuset: ty.Set[int], + cpuset: set[int], allow_ranges: bool = True, ) -> str: """Format a libvirt CPU range specification. @@ -189,7 +189,7 @@ def format_cpu_spec( # trying to do range negations to minimize the overall # spec string length if allow_ranges: - ranges: ty.List[ty.List[int]] = [] + ranges: list[list[int]] = [] previndex = None for cpuindex in sorted(cpuset): if previndex is None or previndex != (cpuindex - 1): @@ -535,7 +535,7 @@ def _sort_possible_cpu_topologies(possible, wanttopology): # We don't use python's sort(), since we want to # preserve the sorting done when populating the # 'possible' list originally - scores: ty.Dict[int, ty.List['objects.VirtCPUTopology']] = ( + scores: dict[int, list['objects.VirtCPUTopology']] = ( collections.defaultdict(list) ) for topology in possible: @@ -683,7 +683,7 @@ def _pack_instance_onto_cores(host_cell, instance_cell, # We build up a data structure that answers the question: 'Given the # number of threads I want to pack, give me a list of all the available # sibling sets (or groups thereof) that can accommodate it' - sibling_sets: ty.Dict[int, ty.List[ty.Set[int]]] = ( + sibling_sets: dict[int, list[set[int]]] = ( collections.defaultdict(list) ) for sib in host_cell.free_siblings: @@ -922,9 +922,9 @@ def _pack_instance_onto_cores(host_cell, instance_cell, def _numa_fit_instance_cell( host_cell: 'objects.NUMACell', instance_cell: 'objects.InstanceNUMACell', - limits: ty.Optional['objects.NUMATopologyLimits'] = None, + limits: 'objects.NUMATopologyLimits | None' = None, cpuset_reserved: int = 0, -) -> ty.Optional['objects.InstanceNUMACell']: +) -> 'objects.InstanceNUMACell | None': """Ensure an instance cell can fit onto a host cell Ensure an instance cell can fit onto a host cell and, if so, return @@ -1114,7 +1114,7 @@ def _get_flavor_image_meta( image_meta: 'objects.ImageMeta', default: ty.Any = None, prefix: str = 'hw', -) -> ty.Tuple[ty.Any, ty.Any]: +) -> tuple[ty.Any, ty.Any]: """Extract both flavor- and image-based variants of metadata.""" flavor_key = ':'.join([prefix, key]) image_key = '_'.join([prefix, key]) @@ -1160,8 +1160,8 @@ def _get_unique_flavor_image_meta( def get_mem_encryption_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', - machine_type: ty.Optional[str] = None, -) -> ty.Optional[MemEncryptionConfig]: + machine_type: str | None = None, +) -> MemEncryptionConfig | None: """Return memory encryption context requested either via flavor extra specs or image properties (or both). @@ -1374,7 +1374,7 @@ def _check_mem_encryption_machine_type(image_meta, machine_type=None): def _get_numa_pagesize_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[int]: +) -> int | None: """Return the requested memory page size :param flavor: a Flavor object to read extra specs from @@ -1442,7 +1442,7 @@ def _get_constraint_mappings_from_flavor(flavor, key, func): def get_locked_memory_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[bool]: +) -> bool | None: """Validate and return the requested locked memory. :param flavor: ``nova.objects.Flavor`` instance @@ -1484,7 +1484,7 @@ def get_locked_memory_constraint( def _get_numa_cpu_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[ty.List[ty.Set[int]]]: +) -> list[set[int]] | None: """Validate and return the requested guest NUMA-guest CPU mapping. Extract the user-provided mapping of guest CPUs to guest NUMA nodes. For @@ -1516,7 +1516,7 @@ def _get_numa_cpu_constraint( def _get_numa_mem_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[ty.List[int]]: +) -> list[int] | None: """Validate and return the requested guest NUMA-guest memory mapping. Extract the user-provided mapping of guest memory to guest NUMA nodes. For @@ -1548,7 +1548,7 @@ def _get_numa_mem_constraint( def _get_numa_node_count_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[int]: +) -> int | None: """Validate and return the requested NUMA nodes. :param flavor: ``nova.objects.Flavor`` instance @@ -1577,7 +1577,7 @@ def _get_numa_node_count_constraint( def get_cpu_policy_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: """Validate and return the requested CPU policy. :param flavor: ``nova.objects.Flavor`` instance @@ -1628,7 +1628,7 @@ def get_cpu_policy_constraint( def get_cpu_thread_policy_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: """Validate and return the requested CPU thread policy. :param flavor: ``nova.objects.Flavor`` instance @@ -1669,8 +1669,8 @@ def get_cpu_thread_policy_constraint( def _get_numa_topology_auto( nodes: int, flavor: 'objects.Flavor', - vcpus: ty.Set[int], - pcpus: ty.Set[int], + vcpus: set[int], + pcpus: set[int], ) -> 'objects.InstanceNUMATopology': """Generate a NUMA topology automatically based on CPUs and memory. @@ -1702,10 +1702,10 @@ def _get_numa_topology_auto( def _get_numa_topology_manual( nodes: int, flavor: 'objects.Flavor', - vcpus: ty.Set[int], - pcpus: ty.Set[int], - cpu_list: ty.List[ty.Set[int]], - mem_list: ty.List[int], + vcpus: set[int], + pcpus: set[int], + cpu_list: list[set[int]], + mem_list: list[int], ) -> 'objects.InstanceNUMATopology': """Generate a NUMA topology based on user-provided NUMA topology hints. @@ -1763,7 +1763,7 @@ def is_realtime_enabled(flavor): def _get_vcpu_pcpu_resources( flavor: 'objects.Flavor', -) -> ty.Tuple[int, int]: +) -> tuple[int, int]: requested_vcpu = 0 requested_pcpu = 0 @@ -1787,7 +1787,7 @@ def _get_vcpu_pcpu_resources( def _get_hyperthreading_trait( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: for key, val in flavor.get('extra_specs', {}).items(): if re.match('trait([1-9][0-9]*)?:%s' % os_traits.HW_CPU_HYPERTHREADING, key): @@ -1803,7 +1803,7 @@ def _get_hyperthreading_trait( # NOTE(stephenfin): This must be public as it's used elsewhere def get_dedicated_cpu_constraint( flavor: 'objects.Flavor', -) -> ty.Optional[ty.Set[int]]: +) -> set[int] | None: """Validate and return the requested dedicated CPU mask. :param flavor: ``nova.objects.Flavor`` instance @@ -1835,7 +1835,7 @@ def get_dedicated_cpu_constraint( def get_realtime_cpu_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[ty.Set[int]]: +) -> set[int] | None: """Validate and return the requested realtime CPU mask. :param flavor: ``nova.objects.Flavor`` instance @@ -1881,7 +1881,7 @@ def get_realtime_cpu_constraint( # NOTE(stephenfin): This must be public as it's used elsewhere def get_emulator_thread_policy_constraint( flavor: 'objects.Flavor', -) -> ty.Optional[str]: +) -> str | None: """Validate and return the requested emulator threads policy. :param flavor: ``nova.objects.Flavor`` instance @@ -1931,7 +1931,7 @@ def get_pci_numa_policy_constraint( def get_pmu_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[bool]: +) -> bool | None: """Validate and return the requested vPMU configuration. This one's a little different since we don't return False in the default @@ -2085,7 +2085,7 @@ def get_packed_virtqueue_constraint( def get_vtpm_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[VTPMConfig]: +) -> VTPMConfig | None: """Validate and return the requested vTPM configuration. :param flavor: ``nova.objects.Flavor`` instance @@ -2125,7 +2125,7 @@ def get_vtpm_constraint( def get_tpm_secret_security_constraint( flavor: 'objects.Flavor', -) -> ty.Optional[str]: +) -> str | None: # NOTE(melwitt): An image property for TPM secret security is intentionally # not provided because server rebuild is blocked in the API. If a user were # to create a server with a given TPM secret security policy via an image @@ -2140,7 +2140,7 @@ def get_tpm_secret_security_constraint( def get_secure_boot_constraint( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: """Validate and return the requested secure boot policy. :param flavor: ``nova.objects.Flavor`` instance @@ -2406,7 +2406,7 @@ def numa_get_constraints(flavor, image_meta): def _numa_cells_support_network_metadata( host_topology: 'objects.NUMATopology', - chosen_host_cells: ty.List['objects.NUMACell'], + chosen_host_cells: list['objects.NUMACell'], network_metadata: 'objects.NetworkMetadata', ) -> bool: """Determine whether the cells can accept the network requests. @@ -2424,7 +2424,7 @@ def _numa_cells_support_network_metadata( if not network_metadata: return True - required_physnets: ty.Set[str] = set() + required_physnets: set[str] = set() if 'physnets' in network_metadata: # use set() to avoid modifying the original data structure required_physnets = set(network_metadata.physnets) @@ -2509,10 +2509,10 @@ def _numa_cells_support_network_metadata( def numa_fit_instance_to_host( host_topology: 'objects.NUMATopology', instance_topology: 'objects.InstanceNUMATopology', - provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]], - limits: ty.Optional['objects.NUMATopologyLimits'] = None, - pci_requests: ty.Optional['objects.InstancePCIRequests'] = None, - pci_stats: ty.Optional[stats.PciDeviceStats] = None, + provider_mapping: dict[str, list[str]] | None, + limits: 'objects.NUMATopologyLimits | None' = None, + pci_requests: 'objects.InstancePCIRequests | None' = None, + pci_stats: stats.PciDeviceStats | None = None, ): """Fit the instance topology onto the host topology. @@ -2602,7 +2602,7 @@ def numa_fit_instance_to_host( if pci_stats: # Create dict with numa cell id as key # and total number of free pci devices as value. - total_pci_in_cell: ty.Dict[int, int] = {} + total_pci_in_cell: dict[int, int] = {} for pool in pci_stats.pools: if pool['numa_node'] in list(total_pci_in_cell): total_pci_in_cell[pool['numa_node']] += pool['count'] @@ -2630,8 +2630,8 @@ def numa_fit_instance_to_host( fit_cache = set() for host_cell_perm in itertools.permutations( host_cells, len(instance_topology)): - chosen_instance_cells: ty.List['objects.InstanceNUMACell'] = [] - chosen_host_cells: ty.List['objects.NUMACell'] = [] + chosen_instance_cells: list['objects.InstanceNUMACell'] = [] + chosen_host_cells: list['objects.NUMACell'] = [] for host_cell, instance_cell in zip( host_cell_perm, instance_topology.cells): @@ -2714,7 +2714,7 @@ def numa_get_reserved_huge_pages(): return {} try: - bucket: ty.Dict[int, ty.Dict[int, int]] = collections.defaultdict(dict) + bucket: dict[int, dict[int, int]] = collections.defaultdict(dict) for cfg in CONF.reserved_huge_pages: try: pagesize = int(cfg['size']) @@ -2864,7 +2864,7 @@ def get_vpmems(flavor): def get_maxphysaddr_mode( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: """Return maxphysaddr mode. :param flavor: a flavor object to read extra specs from @@ -2944,7 +2944,7 @@ def get_ephemeral_encryption_constraint( def get_ephemeral_encryption_format( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: """Get the ephemeral encryption format. :param flavor: an objects.Flavor object @@ -2995,7 +2995,7 @@ def check_shares_supported(context, instance): def get_sound_model( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: """Get the sound device model, if any. :param flavor: ``nova.objects.Flavor`` instance @@ -3019,7 +3019,7 @@ def get_sound_model( def get_usb_model( flavor: 'objects.Flavor', image_meta: 'objects.ImageMeta', -) -> ty.Optional[str]: +) -> str | None: """Get the USB controller model, if any. :param flavor: ``nova.objects.Flavor`` instance diff --git a/nova/virt/libvirt/config.py b/nova/virt/libvirt/config.py index 33a5bad4ef..0b27382f34 100644 --- a/nova/virt/libvirt/config.py +++ b/nova/virt/libvirt/config.py @@ -24,7 +24,6 @@ helpers for populating up config object instances. """ import time -import typing as ty from collections import OrderedDict from lxml import etree @@ -104,7 +103,7 @@ class LibvirtConfigObject(object): return xml_str @classmethod - def parse_on_off_str(self, value: ty.Optional[str]) -> bool: + def parse_on_off_str(self, value: str | None) -> bool: if value is not None and value not in ('on', 'off'): msg = _( "Element should contain either 'on' or 'off'; " diff --git a/nova/virt/libvirt/cpu/api.py b/nova/virt/libvirt/cpu/api.py index 5d7dccd47e..ca25dea7ad 100644 --- a/nova/virt/libvirt/cpu/api.py +++ b/nova/virt/libvirt/cpu/api.py @@ -11,7 +11,6 @@ # under the License. from dataclasses import dataclass -import typing as ty from oslo_log import log as logging @@ -60,7 +59,7 @@ class Core: return str(self.ident) @property - def governor(self) -> ty.Optional[str]: + def governor(self) -> str | None: try: return core.get_governor(self.ident) # NOTE(sbauza): cpufreq/scaling_governor is not enabled for some OS @@ -91,7 +90,7 @@ class API(object): """ return Core(i) - def power_up(self, cpus: ty.Set[int]) -> None: + def power_up(self, cpus: set[int]) -> None: if not CONF.libvirt.cpu_power_management: return cpu_dedicated_set = hardware.get_cpu_dedicated_set_nozero() or set() @@ -123,7 +122,7 @@ class API(object): pcpus = pcpus.union(pins) self.power_up(pcpus) - def _power_down(self, cpus: ty.Set[int]) -> None: + def _power_down(self, cpus: set[int]) -> None: if not CONF.libvirt.cpu_power_management: return cpu_dedicated_set = hardware.get_cpu_dedicated_set_nozero() or set() diff --git a/nova/virt/libvirt/cpu/core.py b/nova/virt/libvirt/cpu/core.py index 2d71bd60e4..f484f56ed0 100644 --- a/nova/virt/libvirt/cpu/core.py +++ b/nova/virt/libvirt/cpu/core.py @@ -11,7 +11,6 @@ # under the License. import os -import typing as ty from oslo_log import log as logging @@ -27,7 +26,7 @@ AVAILABLE_PATH = '/sys/devices/system/cpu/present' CPU_PATH_TEMPLATE = '/sys/devices/system/cpu/cpu%(core)s' -def get_available_cores() -> ty.Set[int]: +def get_available_cores() -> set[int]: cores = filesystem.read_sys(AVAILABLE_PATH) return hardware.parse_cpu_spec(cores) if cores else set() diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index d6f089bdf4..eede39b857 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -26,6 +26,7 @@ Supports KVM, LXC, QEMU, and Parallels. import binascii import collections +from collections.abc import Callable from collections import deque import contextlib import copy @@ -260,6 +261,12 @@ REGISTER_IMAGE_PROPERTY_DEFAULTS = [ 'hw_vif_model', ] +# Type Aliases + +DetachableDevice: ty.TypeAlias = ( + vconfig.LibvirtConfigGuestDisk | vconfig.LibvirtConfigGuestInterface +) + class AsyncDeviceEventsHandler: """A synchornization point between libvirt events an clients waiting for @@ -278,13 +285,13 @@ class AsyncDeviceEventsHandler: self, instance_uuid: str, device_name: str, - event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]] + event_types: set[type[libvirtevent.DeviceEvent]] ): self.instance_uuid = instance_uuid self.device_name = device_name self.event_types = event_types self.threading_event = threading.Event() - self.result: ty.Optional[libvirtevent.DeviceEvent] = None + self.result: libvirtevent.DeviceEvent | None = None def matches(self, event: libvirtevent.DeviceEvent) -> bool: """Returns true if the event is one of the expected event types @@ -306,13 +313,13 @@ class AsyncDeviceEventsHandler: self._lock = threading.Lock() # Ongoing device operations in libvirt where we wait for the events # about success or failure. - self._waiters: ty.Set[AsyncDeviceEventsHandler.Waiter] = set() + self._waiters: set[AsyncDeviceEventsHandler.Waiter] = set() def create_waiter( self, instance_uuid: str, device_name: str, - event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]] + event_types: set[type[libvirtevent.DeviceEvent]] ) -> 'AsyncDeviceEventsHandler.Waiter': """Returns an opaque token the caller can use in wait() to wait for the libvirt event @@ -344,7 +351,7 @@ class AsyncDeviceEventsHandler: def wait( self, token: 'AsyncDeviceEventsHandler.Waiter', timeout: float, - ) -> ty.Optional[libvirtevent.DeviceEvent]: + ) -> libvirtevent.DeviceEvent | None: """Blocks waiting for the libvirt event represented by the opaque token :param token: A token created by calling create_waiter() @@ -398,6 +405,13 @@ class AsyncDeviceEventsHandler: instance_waiters) +class GetDeviceConfFunction(ty.Protocol): + def __call__( + self, from_persistent_config: bool = ..., **kwargs, + ) -> vconfig.LibvirtConfigGuestDevice: + ... + + class LibvirtDriver(driver.ComputeDriver): def __init__(self, virtapi, read_only=False): if libvirt is None: @@ -467,7 +481,7 @@ class LibvirtDriver(driver.ComputeDriver): self.vif_driver = libvirt_vif.LibvirtGenericVIFDriver(self._host) # NOTE(lyarwood): Volume drivers are loaded on-demand - self.volume_drivers: ty.Dict[str, volume.LibvirtBaseVolumeDriver] = {} + self.volume_drivers: dict[str, volume.LibvirtBaseVolumeDriver] = {} self._disk_cachemode = None self.image_cache_manager = imagecache.ImageCacheManager() @@ -535,7 +549,7 @@ class LibvirtDriver(driver.ComputeDriver): # This dict is for knowing which mdev class is supported by a specific # PCI device like we do (the key being the PCI address and the value # the mdev class) - self.mdev_class_mapping: ty.Dict[str, str] = ( + self.mdev_class_mapping: dict[str, str] = ( collections.defaultdict(lambda: orc.VGPU) ) # This set is for knowing all the mdev classes the operator provides @@ -577,10 +591,10 @@ class LibvirtDriver(driver.ComputeDriver): return {}, {} # vpmem keyed by name {name: objects.LibvirtVPMEMDevice,...} - vpmems_by_name: ty.Dict[str, 'objects.LibvirtVPMEMDevice'] = {} + vpmems_by_name: dict[str, 'objects.LibvirtVPMEMDevice'] = {} # vpmem list keyed by resource class # {'RC_0': [objects.LibvirtVPMEMDevice, ...], 'RC_1': [...]} - vpmems_by_rc: ty.Dict[str, ty.List['objects.LibvirtVPMEMDevice']] = ( + vpmems_by_rc: dict[str, list['objects.LibvirtVPMEMDevice']] = ( collections.defaultdict(list) ) @@ -1022,9 +1036,9 @@ class LibvirtDriver(driver.ComputeDriver): self, instance: 'objects.Instance', image_property: str, - disk_info: ty.Optional[ty.Dict[str, ty.Any]], - guest_config: ty.Optional[vconfig.LibvirtConfigGuest], - ) -> ty.Optional[str]: + disk_info: dict[str, ty.Any] | None, + guest_config: vconfig.LibvirtConfigGuest | None, + ) -> str | None: if image_property == 'hw_machine_type': return libvirt_utils.get_machine_type(instance.image_meta) @@ -1998,7 +2012,7 @@ class LibvirtDriver(driver.ComputeDriver): LOG.debug(e, instance=instance) def _get_volume_driver( - self, connection_info: ty.Dict[str, ty.Any] + self, connection_info: dict[str, ty.Any] ) -> 'volume.LibvirtBaseVolumeDriver': """Fetch the nova.virt.libvirt.volume driver @@ -2493,9 +2507,7 @@ class LibvirtDriver(driver.ComputeDriver): self, guest: libvirt_guest.Guest, instance_uuid: str, - # to properly typehint this param we would need typing.Protocol but - # that is only available since python 3.8 - get_device_conf_func: ty.Callable, + get_device_conf_func: GetDeviceConfFunction, device_name: str, ) -> None: """Detaches a device from the guest @@ -2579,12 +2591,10 @@ class LibvirtDriver(driver.ComputeDriver): self, guest: libvirt_guest.Guest, instance_uuid: str, - persistent_dev: ty.Union[ - vconfig.LibvirtConfigGuestDisk, - vconfig.LibvirtConfigGuestInterface], - get_device_conf_func, + persistent_dev: DetachableDevice, + get_device_conf_func: GetDeviceConfFunction, device_name: str, - ): + ) -> None: LOG.debug( 'Attempting to detach device %s from instance %s from ' 'the persistent domain config.', device_name, instance_uuid) @@ -2613,12 +2623,10 @@ class LibvirtDriver(driver.ComputeDriver): self, guest: libvirt_guest.Guest, instance_uuid: str, - live_dev: ty.Union[ - vconfig.LibvirtConfigGuestDisk, - vconfig.LibvirtConfigGuestInterface], - get_device_conf_func, + live_dev: DetachableDevice, + get_device_conf_func: GetDeviceConfFunction, device_name: str, - ): + ) -> None: max_attempts = CONF.libvirt.device_detach_attempts for attempt in range(max_attempts): LOG.debug( @@ -2656,9 +2664,7 @@ class LibvirtDriver(driver.ComputeDriver): def _detach_from_live_and_wait_for_event( self, - dev: ty.Union[ - vconfig.LibvirtConfigGuestDisk, - vconfig.LibvirtConfigGuestInterface], + dev: DetachableDevice, guest: libvirt_guest.Guest, instance_uuid: str, device_name: str, @@ -2738,9 +2744,7 @@ class LibvirtDriver(driver.ComputeDriver): @staticmethod def _detach_sync( - dev: ty.Union[ - vconfig.LibvirtConfigGuestDisk, - vconfig.LibvirtConfigGuestInterface], + dev: DetachableDevice, guest: libvirt_guest.Guest, instance_uuid: str, device_name: str, @@ -4690,8 +4694,8 @@ class LibvirtDriver(driver.ComputeDriver): self, context: nova_context.RequestContext, instance: 'objects.Instance', - block_device_info: ty.Dict[str, ty.Any], - ) -> ty.Optional[ty.Dict[str, ty.Any]]: + block_device_info: dict[str, ty.Any], + ) -> dict[str, ty.Any] | None: """Add ephemeral encryption attributes to driver BDMs before use.""" encrypted_bdms = driver.block_device_info_get_encrypted_disks( block_device_info) @@ -6261,7 +6265,7 @@ class LibvirtDriver(driver.ComputeDriver): return idmaps def _get_guest_idmaps(self): - id_maps: ty.List[vconfig.LibvirtConfigGuestIDMap] = [] + id_maps: list[vconfig.LibvirtConfigGuestIDMap] = [] if CONF.libvirt.virt_type == 'lxc' and CONF.libvirt.uid_maps: uid_maps = self._create_idmaps(vconfig.LibvirtConfigGuestUIDMap, CONF.libvirt.uid_maps) @@ -6754,7 +6758,7 @@ class LibvirtDriver(driver.ComputeDriver): def _get_video_type( self, image_meta: objects.ImageMeta, - ) -> ty.Optional[str]: + ) -> str | None: # NOTE(ldbragst): The following logic returns the video type # depending on supported defaults given the architecture, # virtualization type, and features. The video type can @@ -7186,7 +7190,7 @@ class LibvirtDriver(driver.ComputeDriver): instance: 'objects.Instance', inst_path: str, image_meta: 'objects.ImageMeta', - disk_info: ty.Dict[str, ty.Any], + disk_info: dict[str, ty.Any], ): if rescue: self._set_guest_for_rescue( @@ -7865,7 +7869,7 @@ class LibvirtDriver(driver.ComputeDriver): self, guest: vconfig.LibvirtConfigGuest, image_meta: objects.ImageMeta, - ) -> ty.Tuple[ty.Optional[str], ty.Optional[str]]: + ) -> tuple[str | None, str | None]: pointer_bus = image_meta.properties.get('hw_input_bus') pointer_model = image_meta.properties.get('hw_pointer_model') @@ -7959,7 +7963,7 @@ class LibvirtDriver(driver.ComputeDriver): guest: vconfig.LibvirtConfigGuest, image_meta: 'objects.ImageMeta', flavor: 'objects.Flavor', - ) -> ty.Optional[str]: + ) -> str | None: model = flavor.extra_specs.get( 'hw:viommu_model') or image_meta.properties.get( 'hw_viommu_model') @@ -8170,7 +8174,7 @@ class LibvirtDriver(driver.ComputeDriver): self, context: nova_context.RequestContext, instance: 'objects.Instance', - ) -> ty.Tuple[ty.Any, ty.Optional[str]]: + ) -> tuple[ty.Any, str | None]: """Get or create a libvirt vTPM secret. For 'host' TPM secret security, this will look for a local libvirt @@ -8210,7 +8214,7 @@ class LibvirtDriver(driver.ComputeDriver): instance: 'objects.Instance', power_on: bool = True, pause: bool = False, - post_xml_callback: ty.Optional[ty.Callable] = None, + post_xml_callback: Callable[[], None] | None = None, ) -> libvirt_guest.Guest: """Create a Guest from XML. @@ -8266,11 +8270,11 @@ class LibvirtDriver(driver.ComputeDriver): xml: str, instance: 'objects.Instance', network_info: network_model.NetworkInfo, - block_device_info: ty.Optional[ty.Dict[str, ty.Any]], + block_device_info: dict[str, ty.Any] | None, power_on: bool = True, vifs_already_plugged: bool = False, - post_xml_callback: ty.Optional[ty.Callable] = None, - external_events: ty.Optional[ty.List[ty.Tuple[str, str]]] = None, + post_xml_callback: Callable[[], None] | None = None, + external_events: list[tuple[str, str]] | None = None, cleanup_instance_dir: bool = False, cleanup_instance_disks: bool = False, ) -> libvirt_guest.Guest: @@ -8519,7 +8523,7 @@ class LibvirtDriver(driver.ComputeDriver): @staticmethod def _get_pci_id_from_libvirt_name( libvirt_address: str - ) -> ty.Optional[str]: + ) -> str | None: """Returns a PCI ID from a libvirt pci address name. :param libvirt_address: the libvirt PCI device name, @@ -8586,7 +8590,7 @@ class LibvirtDriver(driver.ComputeDriver): mdev device handles for that GPU """ - counts_per_parent: ty.Dict[str, int] = collections.defaultdict(int) + counts_per_parent: dict[str, int] = collections.defaultdict(int) mediated_devices = self._get_mediated_devices(types=enabled_mdev_types) for mdev in mediated_devices: parent_vgpu_type = self._get_vgpu_type_per_pgpu(mdev['parent']) @@ -8608,7 +8612,7 @@ class LibvirtDriver(driver.ComputeDriver): """ mdev_capable_devices = self._get_mdev_capable_devices( types=enabled_mdev_types) - counts_per_dev: ty.Dict[str, int] = collections.defaultdict(int) + counts_per_dev: dict[str, int] = collections.defaultdict(int) for dev in mdev_capable_devices: # dev_id is the libvirt name for the PCI device, # eg. pci_0000_84_00_0 which matches a PCI address of 0000:84:00.0 @@ -8647,7 +8651,7 @@ class LibvirtDriver(driver.ComputeDriver): return {} inventories = {} # counting how many mdevs we are currently supporting per type - type_limit_mapping: ty.Dict[str, int] = collections.defaultdict(int) + type_limit_mapping: dict[str, int] = collections.defaultdict(int) count_per_parent = self._count_mediated_devices(enabled_mdev_types) for dev_name, count in count_per_parent.items(): mdev_type = self._get_vgpu_type_per_pgpu(dev_name) @@ -9286,7 +9290,7 @@ class LibvirtDriver(driver.ComputeDriver): return cell.get(page_size, 0) def _get_physnet_numa_affinity(): - affinities: ty.Dict[int, ty.Set[str]] = { + affinities: dict[int, set[str]] = { cell.id: set() for cell in topology.cells } for physnet in CONF.neutron.physnets: @@ -9504,7 +9508,7 @@ class LibvirtDriver(driver.ComputeDriver): # otherwise. inv = provider_tree.data(nodename).inventory ratios = self._get_allocation_ratios(inv) - resources: ty.Dict[str, ty.Set['objects.Resource']] = ( + resources: dict[str, set['objects.Resource']] = ( collections.defaultdict(set) ) @@ -9862,11 +9866,11 @@ class LibvirtDriver(driver.ComputeDriver): return inventories @property - def static_traits(self) -> ty.Dict[str, bool]: + def static_traits(self) -> dict[str, bool]: if self._static_traits is not None: return self._static_traits - traits: ty.Dict[str, bool] = {} + traits: dict[str, bool] = {} traits.update(self._get_cpu_traits()) traits.update(self._get_packed_virtqueue_traits()) traits.update(self._get_storage_bus_traits()) @@ -10035,7 +10039,7 @@ class LibvirtDriver(driver.ComputeDriver): :return: dict, keyed by PGPU device ID, to count of VGPUs on that device """ - vgpu_count_per_pgpu: ty.Dict[str, int] = collections.defaultdict(int) + vgpu_count_per_pgpu: dict[str, int] = collections.defaultdict(int) for mdev_uuid in mdev_uuids: # libvirt name is like mdev_00ead764_fdc0_46b6_8db9_2963f5c815b4 dev_name = libvirt_utils.mdev_uuid2name(mdev_uuid) @@ -10589,7 +10593,7 @@ class LibvirtDriver(driver.ComputeDriver): raise exception.MigrationPreCheckError(reason) dst_mdevs = self._allocate_mdevs(allocs) dst_mdev_types = self._get_mdev_types_from_uuids(dst_mdevs) - target_mdevs: ty.Dict[str, str] = {} + target_mdevs: dict[str, str] = {} for src_mdev, src_type in src_mdev_types.items(): for dst_mdev, dst_type in dst_mdev_types.items(): # we want to associate by 1:1 between dst and src mdevs @@ -11373,7 +11377,7 @@ class LibvirtDriver(driver.ComputeDriver): migrate_data, future, disk_paths): - on_migration_failure: ty.Deque[str] = deque() + on_migration_failure: deque[str] = deque() data_gb = self._live_migration_data_gb(instance, disk_paths) downtime_steps = list(libvirt_migrate.downtime_steps(data_gb)) migration = migrate_data.migration @@ -12645,8 +12649,8 @@ class LibvirtDriver(driver.ComputeDriver): network_info: network_model.NetworkInfo, image_meta: 'objects.ImageMeta', resize_instance: bool, - allocations: ty.Dict[str, ty.Any], - block_device_info: ty.Optional[ty.Dict[str, ty.Any]] = None, + allocations: dict[str, ty.Any], + block_device_info: dict[str, ty.Any] | None = None, power_on: bool = True, ) -> None: """Complete the migration process on the destination host.""" @@ -12791,7 +12795,7 @@ class LibvirtDriver(driver.ComputeDriver): instance: 'objects.Instance', network_info: network_model.NetworkInfo, migration: 'objects.Migration', - block_device_info: ty.Optional[ty.Dict[str, ty.Any]] = None, + block_device_info: dict[str, ty.Any] | None = None, power_on: bool = True, ) -> None: """Finish the second half of reverting a resize on the source host.""" @@ -12849,7 +12853,7 @@ class LibvirtDriver(driver.ComputeDriver): @staticmethod def _get_io_devices(xml_doc): """get the list of io devices from the xml document.""" - result: ty.Dict[str, ty.List[str]] = {"volumes": [], "ifaces": []} + result: dict[str, list[str]] = {"volumes": [], "ifaces": []} try: doc = etree.fromstring(xml_doc) except Exception: @@ -13320,7 +13324,7 @@ class LibvirtDriver(driver.ComputeDriver): nova.privsep.fs.FS_FORMAT_EXT4, nova.privsep.fs.FS_FORMAT_XFS] - def _get_tpm_traits(self) -> ty.Dict[str, bool]: + def _get_tpm_traits(self) -> dict[str, bool]: # Assert or deassert TPM support traits if not CONF.libvirt.swtpm_enabled: return { @@ -13374,7 +13378,7 @@ class LibvirtDriver(driver.ComputeDriver): return tr - def _get_vif_model_traits(self) -> ty.Dict[str, bool]: + def _get_vif_model_traits(self) -> dict[str, bool]: """Get vif model traits based on the currently enabled virt_type. Not all traits generated by this function may be valid and the result @@ -13403,14 +13407,14 @@ class LibvirtDriver(driver.ComputeDriver): in supported_models for model in all_models } - def _get_iommu_model_traits(self) -> ty.Dict[str, bool]: + def _get_iommu_model_traits(self) -> dict[str, bool]: """Get iommu model traits based on the currently enabled virt_type. Not all traits generated by this function may be valid and the result should be validated. :return: A dict of trait names mapped to boolean values. """ dom_caps = self._host.get_domain_capabilities() - supported_models: ty.Set[str] = {fields.VIOMMUModel.AUTO} + supported_models: set[str] = {fields.VIOMMUModel.AUTO} # our min version of qemu/libvirt support q35 and virt machine types. # They also support the smmuv3 and intel iommu modeles so if the qemu # binary is available we can report the trait. @@ -13427,7 +13431,7 @@ class LibvirtDriver(driver.ComputeDriver): in supported_models for model in fields.VIOMMUModel.ALL } - def _get_storage_bus_traits(self) -> ty.Dict[str, bool]: + def _get_storage_bus_traits(self) -> dict[str, bool]: """Get storage bus traits based on the currently enabled virt_type. For QEMU and KVM this function uses the information returned by the @@ -13445,7 +13449,7 @@ class LibvirtDriver(driver.ComputeDriver): if CONF.libvirt.virt_type in ('qemu', 'kvm'): dom_caps = self._host.get_domain_capabilities() - supported_buses: ty.Set[str] = set() + supported_buses: set[str] = set() for arch_type in dom_caps: for machine_type in dom_caps[arch_type]: supported_buses.update( @@ -13462,7 +13466,7 @@ class LibvirtDriver(driver.ComputeDriver): supported_buses for bus in all_buses } - def _get_video_model_traits(self) -> ty.Dict[str, bool]: + def _get_video_model_traits(self) -> dict[str, bool]: """Get video model traits from libvirt. Not all traits generated by this function may be valid and the result @@ -13473,7 +13477,7 @@ class LibvirtDriver(driver.ComputeDriver): all_models = fields.VideoModel.ALL dom_caps = self._host.get_domain_capabilities() - supported_models: ty.Set[str] = set() + supported_models: set[str] = set() for arch_type in dom_caps: for machine_type in dom_caps[arch_type]: supported_models.update( @@ -13486,7 +13490,7 @@ class LibvirtDriver(driver.ComputeDriver): in supported_models for model in all_models } - def _get_packed_virtqueue_traits(self) -> ty.Dict[str, bool]: + def _get_packed_virtqueue_traits(self) -> dict[str, bool]: """Get Virtio Packed Ring traits to be set on the host's resource provider. @@ -13494,7 +13498,7 @@ class LibvirtDriver(driver.ComputeDriver): """ return {ot.COMPUTE_NET_VIRTIO_PACKED: True} - def _get_cpu_traits(self) -> ty.Dict[str, bool]: + def _get_cpu_traits(self) -> dict[str, bool]: """Get CPU-related traits to be set and unset on the host's resource provider. @@ -13507,7 +13511,7 @@ class LibvirtDriver(driver.ComputeDriver): return traits - def _get_cpu_feature_traits(self) -> ty.Dict[str, bool]: + def _get_cpu_feature_traits(self) -> dict[str, bool]: """Get CPU traits of VMs based on guest CPU model config. 1. If mode is 'host-model' or 'host-passthrough', use host's @@ -13532,7 +13536,7 @@ class LibvirtDriver(driver.ComputeDriver): caps = deepcopy(self._host.get_capabilities()) if cpu.mode in ('host-model', 'host-passthrough'): # Account for features in cpu_model_extra_flags conf - host_features: ty.Set[str] = { + host_features: set[str] = { f.name for f in caps.host.cpu.features | cpu.features } return libvirt_utils.cpu_features_to_traits(host_features) @@ -13547,7 +13551,7 @@ class LibvirtDriver(driver.ComputeDriver): feature_names = [f.name for f in cpu.features] return feature_names - features: ty.Set[str] = set() + features: set[str] = set() # Choose a default CPU model when cpu_mode is not specified if cpu.mode is None: caps.host.cpu.model = libvirt_utils.get_cpu_model_from_arch( @@ -13635,7 +13639,7 @@ class LibvirtDriver(driver.ComputeDriver): fs.target_dir = share.tag guest.add_device(fs) - def _get_sound_model_traits(self) -> ty.Dict[str, bool]: + def _get_sound_model_traits(self) -> dict[str, bool]: """Determine what sound models are supported. Not all traits generated by this function may be valid and the result diff --git a/nova/virt/libvirt/event.py b/nova/virt/libvirt/event.py index 56951dc11c..6fa27e5e0b 100644 --- a/nova/virt/libvirt/event.py +++ b/nova/virt/libvirt/event.py @@ -9,7 +9,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import typing as ty from nova.virt import event @@ -27,7 +26,7 @@ class DeviceEvent(LibvirtEvent): def __init__(self, uuid: str, dev: str, - timestamp: ty.Optional[float] = None): + timestamp: float | None = None): super().__init__(uuid, timestamp) self.dev = dev diff --git a/nova/virt/libvirt/guest.py b/nova/virt/libvirt/guest.py index cac2710a29..eef6c21a01 100644 --- a/nova/virt/libvirt/guest.py +++ b/nova/virt/libvirt/guest.py @@ -28,7 +28,6 @@ then used by all the other libvirt related classes """ import time -import typing as ty from lxml import etree from oslo_log import log as logging @@ -236,7 +235,7 @@ class Guest: self, cfg: vconfig.LibvirtConfigGuestDevice, from_persistent_config: bool = False - ) -> ty.Optional[vconfig.LibvirtConfigGuestDevice]: + ) -> vconfig.LibvirtConfigGuestDevice | None: """Lookup a full LibvirtConfigGuestDevice with LibvirtConfigGuesDevice generated by nova.virt.libvirt.vif.get_config. @@ -366,7 +365,7 @@ class Guest: self, device: str, from_persistent_config: bool = False - ) -> ty.Optional[vconfig.LibvirtConfigGuestDisk]: + ) -> vconfig.LibvirtConfigGuestDisk | None: """Returns the disk mounted at device :param device: the name of either the source or the target device @@ -416,7 +415,7 @@ class Guest: self, devtype: vconfig.LibvirtConfigGuestDevice = None, from_persistent_config: bool = False - ) -> ty.List[vconfig.LibvirtConfigGuestDevice]: + ) -> list[vconfig.LibvirtConfigGuestDevice]: """Returns all devices for a guest :param devtype: a LibvirtConfigGuestDevice subclass class diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 9d918aacf8..17aced16fa 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -27,6 +27,8 @@ the raw libvirt API. These APIs are then used by all the other libvirt related classes """ +from collections.abc import Callable +from collections.abc import Mapping from collections import defaultdict import fnmatch import glob @@ -128,8 +130,9 @@ class Host(object): self._read_only = read_only self._initial_connection = True self._conn_event_handler = conn_event_handler - self._conn_event_handler_queue: queue.Queue[ty.Callable] = ( - queue.Queue()) + self._conn_event_handler_queue: queue.Queue[ + Callable[[], None] + ] = queue.Queue() self._lifecycle_event_handler = lifecycle_event_handler self._caps = None self._domain_caps = None @@ -138,7 +141,9 @@ class Host(object): self._wrapped_conn = None self._wrapped_conn_lock = threading.Lock() - self._event_queue: ty.Optional[queue.Queue[ty.Callable]] = None + self._event_queue: queue.Queue[ + virtevent.InstanceEvent | Mapping[str, ty.Any] + ] | None = None self._events_delayed = {} # Note(toabctl): During a reboot of a domain, STOPPED and @@ -151,19 +156,19 @@ class Host(object): self._libvirt_proxy_classes = self._get_libvirt_proxy_classes(libvirt) self._libvirt_proxy = self._wrap_libvirt_proxy(libvirt) - self._loaders: ty.Optional[ty.List[dict]] = None + self._loaders: list[dict] | None = None # A number of features are conditional on support in the hardware, # kernel, QEMU, and/or libvirt. These are determined on demand and # memoized by various properties below - self._supports_amd_sev: ty.Optional[bool] = None - self._supports_amd_sev_es: ty.Optional[bool] = None - self._max_sev_guests: ty.Optional[int] = None - self._max_sev_es_guests: ty.Optional[int] = None - self._supports_uefi: ty.Optional[bool] = None - self._supports_secure_boot: ty.Optional[bool] = None + self._supports_amd_sev: bool | None = None + self._supports_amd_sev_es: bool | None = None + self._max_sev_guests: int | None = None + self._max_sev_es_guests: int | None = None + self._supports_uefi: bool | None = None + self._supports_secure_boot: bool | None = None - self._has_hyperthreading: ty.Optional[bool] = None + self._has_hyperthreading: bool | None = None @staticmethod def _get_libvirt_proxy_classes(libvirt_module): @@ -396,9 +401,7 @@ class Host(object): return while not self._event_queue.empty(): try: - event_type = ty.Union[ - virtevent.InstanceEvent, ty.Mapping[str, ty.Any]] - event: event_type = self._event_queue.get(block=False) + event: virtevent.InstanceEvent | Mapping[str, ty.Any] = self._event_queue.get(block=False) # noqa: E501 if issubclass(type(event), virtevent.InstanceEvent): # call possibly with delay self._event_emit_delayed(event) @@ -890,7 +893,7 @@ class Host(object): if self._domain_caps: return self._domain_caps - domain_caps: ty.Dict = defaultdict(dict) + domain_caps: dict = defaultdict(dict) caps = self.get_capabilities() virt_type = CONF.libvirt.virt_type @@ -1302,8 +1305,8 @@ class Host(object): def _get_pcinet_info( self, dev: 'libvirt.virNodeDevice', - net_devs: ty.List['libvirt.virNodeDevice'] - ) -> ty.Optional[ty.List[str]]: + net_devs: list['libvirt.virNodeDevice'] + ) -> list[str] | None: """Returns a dict of NET device.""" net_dev = {dev.parent(): dev for dev in net_devs}.get(dev.name(), None) if net_dev is None: @@ -1317,8 +1320,8 @@ class Host(object): self, vf_device: 'libvirt.virNodeDevice', parent_pf_name: str, - candidate_devs: ty.List['libvirt.virNodeDevice'] - ) -> ty.Optional[vconfig.LibvirtConfigNodeDeviceVpdCap]: + candidate_devs: list['libvirt.virNodeDevice'] + ) -> vconfig.LibvirtConfigNodeDeviceVpdCap | None: """Returns PCI VPD info of a parent device of a PCI VF. :param vf_device: a VF device object to use for lookup. @@ -1341,7 +1344,7 @@ class Host(object): @staticmethod def _get_vpd_card_serial_number( dev: 'libvirt.virNodeDevice', - ) -> ty.Optional[ty.List[str]]: + ) -> list[str] | None: """Returns a card serial number stored in PCI VPD (if present).""" xmlstr = dev.XMLDesc(0) cfgdev = vconfig.LibvirtConfigNodeDevice() @@ -1369,19 +1372,19 @@ class Host(object): self, devname: str, dev: 'libvirt.virNodeDevice', - net_devs: ty.List['libvirt.virNodeDevice'], - vdpa_devs: ty.List['libvirt.virNodeDevice'], - pci_devs: ty.List['libvirt.virNodeDevice'], - ) -> ty.Dict[str, ty.Union[str, dict]]: + net_devs: list['libvirt.virNodeDevice'], + vdpa_devs: list['libvirt.virNodeDevice'], + pci_devs: list['libvirt.virNodeDevice'], + ) -> dict[str, str | dict]: """Returns a dict of PCI device.""" def _get_device_type( cfgdev: vconfig.LibvirtConfigNodeDevice, pci_address: str, device: 'libvirt.virNodeDevice', - net_devs: ty.List['libvirt.virNodeDevice'], - vdpa_devs: ty.List['libvirt.virNodeDevice'], - ) -> ty.Dict[str, str]: + net_devs: list['libvirt.virNodeDevice'], + vdpa_devs: list['libvirt.virNodeDevice'], + ) -> dict[str, str]: """Get a PCI device's device type. An assignable PCI device can be a normal PCI device, @@ -1437,8 +1440,8 @@ class Host(object): def _get_vpd_details( device_dict: dict, device: 'libvirt.virNodeDevice', - pci_devs: ty.List['libvirt.virNodeDevice'] - ) -> ty.Dict[str, ty.Any]: + pci_devs: list['libvirt.virNodeDevice'] + ) -> dict[str, ty.Any]: """Get information from PCI VPD (if present). PCI/PCIe devices may include the optional VPD capability. It may @@ -1451,7 +1454,7 @@ class Host(object): the VPD capability or not may be controlled via a vendor-specific firmware setting. """ - vpd_info: ty.Dict[str, ty.Any] = {} + vpd_info: dict[str, ty.Any] = {} # At the time of writing only the serial number had a clear # use-case. However, the set of fields may be extended. card_serial_number = self._get_vpd_card_serial_number(device) @@ -1481,9 +1484,9 @@ class Host(object): def _get_sriov_netdev_details( device_dict: dict, device: 'libvirt.virNodeDevice', - ) -> ty.Dict[str, ty.Dict[str, ty.Any]]: + ) -> dict[str, dict[str, ty.Any]]: """Get SR-IOV related information""" - sriov_info: ty.Dict[str, ty.Any] = {} + sriov_info: dict[str, ty.Any] = {} if device_dict.get('dev_type') != fields.PciDeviceType.SRIOV_VF: return sriov_info @@ -1512,16 +1515,16 @@ class Host(object): def _get_device_capabilities( device_dict: dict, device: 'libvirt.virNodeDevice', - pci_devs: ty.List['libvirt.virNodeDevice'], - net_devs: ty.List['libvirt.virNodeDevice'] - ) -> ty.Dict[str, ty.Any]: + pci_devs: list['libvirt.virNodeDevice'], + net_devs: list['libvirt.virNodeDevice'] + ) -> dict[str, ty.Any]: """Get PCI VF device's additional capabilities. If a PCI device is a virtual function, this function reads the PCI parent's network capabilities (must be always a NIC device) and appends this information to the device's dictionary. """ - caps: ty.Dict[str, ty.Any] = {} + caps: dict[str, ty.Any] = {} if device_dict.get('dev_type') == fields.PciDeviceType.SRIOV_VF: pcinet_info = self._get_pcinet_info(device, net_devs) @@ -1612,28 +1615,28 @@ class Host(object): nodedev = self.get_vdpa_nodedev_by_address(pci_address) return nodedev.vdpa_capability.dev_path - def list_pci_devices(self, flags: int = 0) -> ty.List[str]: + def list_pci_devices(self, flags: int = 0) -> list[str]: """Lookup pci devices. :returns: a list of strings, names of the virNodeDevice instances """ return self._list_devices("pci", flags=flags) - def list_mdev_capable_devices(self, flags: int = 0) -> ty.List[str]: + def list_mdev_capable_devices(self, flags: int = 0) -> list[str]: """Lookup devices supporting mdev capabilities. :returns: a list of strings, names of the virNodeDevice instances """ return self._list_devices("mdev_types", flags=flags) - def list_mediated_devices(self, flags: int = 0) -> ty.List[str]: + def list_mediated_devices(self, flags: int = 0) -> list[str]: """Lookup mediated devices. :returns: a list of strings, names of the virNodeDevice instances """ return self._list_devices("mdev", flags=flags) - def _list_devices(self, cap, flags: int = 0) -> ty.List[str]: + def _list_devices(self, cap, flags: int = 0) -> list[str]: """Lookup devices. :returns: a list of strings, names of the virNodeDevice instances @@ -1652,7 +1655,7 @@ class Host(object): def list_all_devices( self, flags: int = 0, - ) -> ty.List['libvirt.virNodeDevice']: + ) -> list['libvirt.virNodeDevice']: """Lookup devices. :param flags: a bitmask of flags to filter the returned devices. @@ -1891,7 +1894,7 @@ class Host(object): return False @property - def supports_vtpm(self) -> ty.Optional[bool]: + def supports_vtpm(self) -> bool | None: # we only check the host architecture and the first machine type # because vtpm support is independent from cpu architecture arch = self.get_capabilities().host.cpu.arch @@ -1906,7 +1909,7 @@ class Host(object): return False @property - def tpm_versions(self) -> ty.Optional[ty.List[str]]: + def tpm_versions(self) -> list[str] | None: # we only check the host architecture and the first machine type # because vtpm support is independent from cpu architecture arch = self.get_capabilities().host.cpu.arch @@ -1924,7 +1927,7 @@ class Host(object): return [] @property - def tpm_models(self) -> ty.Optional[ty.List[str]]: + def tpm_models(self) -> list[str] | None: # we only check the host architecture and the first machine type # because vtpm support is independent from cpu architecture arch = self.get_capabilities().host.cpu.arch @@ -2027,7 +2030,7 @@ class Host(object): return self._supports_amd_sev_es @property - def max_sev_guests(self) -> ty.Optional[int]: + def max_sev_guests(self) -> int | None: """Determine maximum number of guests with AMD SEV. """ if not self.supports_amd_sev: @@ -2035,7 +2038,7 @@ class Host(object): return self._max_sev_guests @property - def max_sev_es_guests(self) -> ty.Optional[int]: + def max_sev_es_guests(self) -> int | None: """Determine maximum number of guests with AMD SEV-ES. """ if not self.supports_amd_sev: @@ -2074,7 +2077,7 @@ class Host(object): return self.has_min_version(lv_ver=(7, 9, 0)) @property - def loaders(self) -> ty.List[dict]: + def loaders(self) -> list[dict]: """Retrieve details of loader configuration for the host. Inspect the firmware metadata files provided by QEMU [1] to retrieve @@ -2100,7 +2103,7 @@ class Host(object): arch: str, machine: str, has_secure_boot: bool, - ) -> ty.Tuple[str, str, bool]: + ) -> tuple[str, str, bool]: """Get loader for the specified architecture and machine type. :returns: A the bootloader executable path and the NVRAM diff --git a/nova/virt/libvirt/machine_type_utils.py b/nova/virt/libvirt/machine_type_utils.py index a359498dd5..fba8fdf789 100644 --- a/nova/virt/libvirt/machine_type_utils.py +++ b/nova/virt/libvirt/machine_type_utils.py @@ -12,7 +12,6 @@ import itertools import re -import typing as ty from nova.compute import vm_states from nova import context as nova_context @@ -49,7 +48,7 @@ ALLOWED_UPDATE_VM_STATES = [ def get_machine_type( context: 'nova_context.RequestContext', instance_uuid: str, -) -> ty.Optional[str]: +) -> str | None: """Get the registered machine type of an instance :param context: Request context. @@ -137,7 +136,7 @@ def update_machine_type( instance_uuid: str, machine_type: str, force: bool = False, -) -> ty.Tuple[str, str]: +) -> tuple[str, str]: """Set or update the stored machine type of an instance :param instance_uuid: Instance UUID to update. @@ -183,7 +182,7 @@ def update_machine_type( def _get_instances_without_mtype( context: 'nova_context.RequestContext', -) -> ty.List[objects.instance.Instance]: +) -> list[objects.instance.Instance]: """Fetch a list of instance UUIDs from the DB without hw_machine_type set :param meta: 'sqlalchemy.MetaData' pointing to a given cell DB @@ -201,8 +200,8 @@ def _get_instances_without_mtype( def get_instances_without_type( context: 'nova_context.RequestContext', - cell_uuid: ty.Optional[str] = None, -) -> ty.List[objects.instance.Instance]: + cell_uuid: str | None = None, +) -> list[objects.instance.Instance]: """Find instances without hw_machine_type set, optionally within a cell. :param context: Request context diff --git a/nova/virt/libvirt/utils.py b/nova/virt/libvirt/utils.py index 2b5b1b34c7..7ac6cdc1f9 100644 --- a/nova/virt/libvirt/utils.py +++ b/nova/virt/libvirt/utils.py @@ -18,10 +18,12 @@ # License for the specific language governing permissions and limitations # under the License. +from collections.abc import Callable import grp import os import pwd import re +import subprocess import tempfile import typing as ty import uuid @@ -105,7 +107,7 @@ CPU_TRAITS_MAPPING = { } -def make_reverse_cpu_traits_mapping() -> ty.Dict[str, str]: +def make_reverse_cpu_traits_mapping() -> dict[str, str]: traits_cpu_mapping = dict() for k, v in CPU_TRAITS_MAPPING.items(): if isinstance(v, tuple): @@ -131,9 +133,9 @@ class EncryptionOptions(ty.TypedDict): def create_image( path: str, disk_format: str, - disk_size: ty.Optional[ty.Union[str, int]], - backing_file: ty.Optional[str] = None, - encryption: ty.Optional[EncryptionOptions] = None, + disk_size: str | int | None, + backing_file: str | None = None, + encryption: EncryptionOptions | None = None, safe: bool = False, ) -> None: """Disk image creation with qemu-img @@ -262,7 +264,7 @@ def create_image( def create_ploop_image( - disk_format: str, path: str, size: ty.Union[int, str], fs_type: str, + disk_format: str, path: str, size: int | str, fs_type: str, ) -> None: """Create ploop image @@ -284,7 +286,7 @@ def create_ploop_image( nova.privsep.libvirt.ploop_init(size, disk_format, fs_type, disk_path) -def get_disk_size(path: str, format: ty.Optional[str] = None) -> int: +def get_disk_size(path: str, format: str | None = None) -> int: """Get the (virtual) size of a disk image :param path: Path to the disk image @@ -297,8 +299,8 @@ def get_disk_size(path: str, format: ty.Optional[str] = None) -> int: def get_disk_backing_file( - path: str, basename: bool = True, format: ty.Optional[str] = None, -) -> ty.Optional[str]: + path: str, basename: bool = True, format: str | None = None, +) -> str | None: """Get the backing file of a disk image :param path: Path to the disk image @@ -314,10 +316,10 @@ def get_disk_backing_file( def copy_image( src: str, dest: str, - host: ty.Optional[str] = None, + host: str | None = None, receive: bool = False, - on_execute: ty.Optional[ty.Callable] = None, - on_completion: ty.Optional[ty.Callable] = None, + on_execute: Callable[[subprocess.Popen], None] | None = None, + on_completion: Callable[[subprocess.Popen], None] | None = None, compression: bool = True, ) -> None: """Copy a disk image to an existing directory @@ -352,7 +354,7 @@ def copy_image( def chown_for_id_maps( - path: str, id_maps: ty.List[vconfig.LibvirtConfigGuestIDMap], + path: str, id_maps: list[vconfig.LibvirtConfigGuestIDMap], ) -> None: """Change ownership of file or directory for an id mapped environment @@ -411,7 +413,7 @@ def file_open(*args, **kwargs): return open(*args, **kwargs) -def find_disk(guest: libvirt_guest.Guest) -> ty.Tuple[str, ty.Optional[str]]: +def find_disk(guest: libvirt_guest.Guest) -> tuple[str, str | None]: """Find root device path for instance May be file or device @@ -448,7 +450,7 @@ def find_disk(guest: libvirt_guest.Guest) -> ty.Tuple[str, ty.Optional[str]]: return disk_path, disk_format -def get_disk_type_from_path(path: str) -> ty.Optional[str]: +def get_disk_type_from_path(path: str) -> str | None: """Retrieve disk type (raw, qcow2, lvm, ploop) for given file.""" if path.startswith('/dev'): return 'lvm' @@ -462,7 +464,7 @@ def get_disk_type_from_path(path: str) -> ty.Optional[str]: return None -def get_fs_info(path: str) -> ty.Dict[str, int]: +def get_fs_info(path: str) -> dict[str, int]: """Get free/used/total space info for a filesystem :param path: Any dirent on the filesystem @@ -483,7 +485,7 @@ def fetch_image( context: nova_context.RequestContext, target: str, image_id: str, - trusted_certs: ty.Optional['objects.TrustedCerts'] = None, + trusted_certs: 'objects.TrustedCerts | None' = None, ) -> None: """Grab image. @@ -499,7 +501,7 @@ def fetch_raw_image( context: nova_context.RequestContext, target: str, image_id: str, - trusted_certs: ty.Optional['objects.TrustedCerts'] = None, + trusted_certs: 'objects.TrustedCerts | None' = None, ) -> None: """Grab initrd or kernel image. @@ -533,7 +535,7 @@ def get_instance_path( def get_instance_path_at_destination( instance: 'objects.Instance', - migrate_data: ty.Optional['objects.LibvirtLiveMigrateData'] = None, + migrate_data: 'objects.LibvirtLiveMigrateData | None' = None, ) -> str: """Get the instance path on destination node while live migration. @@ -581,7 +583,7 @@ def get_arch(image_meta: 'objects.ImageMeta') -> str: return obj_fields.Architecture.from_host() -def is_mounted(mount_path: str, source: ty.Optional[str] = None) -> bool: +def is_mounted(mount_path: str, source: str | None = None) -> bool: """Check if the given source is mounted at given destination point.""" if not os.path.ismount(mount_path): return False @@ -598,12 +600,12 @@ def is_valid_hostname(hostname: str) -> bool: return bool(re.match(r"^[\w\-\.:]+$", hostname)) -def version_to_string(version: ty.Tuple[int, int, int]) -> str: +def version_to_string(version: tuple[int, int, int]) -> str: """Returns string version based on tuple""" return '.'.join([str(x) for x in version]) -def cpu_features_to_traits(features: ty.Set[str]) -> ty.Dict[str, bool]: +def cpu_features_to_traits(features: set[str]) -> dict[str, bool]: """Returns this driver's CPU traits dict where keys are trait names from CPU_TRAITS_MAPPING, values are boolean indicates whether the trait should be set in the provider tree. @@ -636,7 +638,7 @@ def get_cpu_model_from_arch(arch: str) -> str: return mode -def get_machine_type(image_meta: 'objects.ImageMeta') -> ty.Optional[str]: +def get_machine_type(image_meta: 'objects.ImageMeta') -> str | None: """The guest machine type can be set as an image metadata property, or otherwise based on architecture-specific defaults. If no defaults are found then None will be returned. This will ultimately lead to QEMU using @@ -649,7 +651,7 @@ def get_machine_type(image_meta: 'objects.ImageMeta') -> ty.Optional[str]: return get_default_machine_type(get_arch(image_meta)) -def get_default_machine_type(arch: str) -> ty.Optional[str]: +def get_default_machine_type(arch: str) -> str | None: # NOTE(lyarwood): Values defined in [libvirt]/hw_machine_type take # precedence here if available for the provided arch. for mapping in CONF.libvirt.hw_machine_type or {}: @@ -694,7 +696,7 @@ def mdev_name2uuid(mdev_name: str) -> str: return str(uuid.UUID(mdev_uuid)) -def mdev_uuid2name(mdev_uuid: str, parent: ty.Optional[str] = None) -> str: +def mdev_uuid2name(mdev_uuid: str, parent: str | None = None) -> str: """Convert an mdev uuid (of the form 8-4-4-4-12) and optionally its parent device to a name (of the form mdev_[_]). @@ -708,7 +710,7 @@ def mdev_uuid2name(mdev_uuid: str, parent: ty.Optional[str] = None) -> str: return name -def get_flags_by_flavor_specs(flavor: 'objects.Flavor') -> ty.Set[str]: +def get_flags_by_flavor_specs(flavor: 'objects.Flavor') -> set[str]: req_spec = objects.RequestSpec(flavor=flavor) resource_request = scheduler_utils.ResourceRequest.from_request_spec( req_spec) @@ -725,8 +727,8 @@ def save_and_migrate_vtpm_dir( inst_base_resize: str, inst_base: str, dest: str, - on_execute: ty.Callable, - on_completion: ty.Callable, + on_execute: Callable[[subprocess.Popen], None] | None = None, + on_completion: Callable[[subprocess.Popen], None] | None = None, ) -> None: """Save vTPM data to instance directory and migrate to the destination. diff --git a/nova/virt/libvirt/vif.py b/nova/virt/libvirt/vif.py index cac808efff..2e5cfd7c80 100644 --- a/nova/virt/libvirt/vif.py +++ b/nova/virt/libvirt/vif.py @@ -19,7 +19,6 @@ """VIF drivers for libvirt.""" import os -import typing as ty import os_vif from os_vif import exception as osv_exception @@ -509,7 +508,7 @@ class LibvirtGenericVIFDriver(object): raise exception.InternalError( _('Unsupported VIF port profile type %s') % profile_name) - def _get_vdpa_dev_path(self, pci_address: ty.Text) -> ty.Text: + def _get_vdpa_dev_path(self, pci_address: str) -> str: if self.host is not None: return self.host.get_vdpa_device_path(pci_address) # TODO(sean-k-mooney) this should never be raised remove when host diff --git a/tox.ini b/tox.ini index c7e59ae163..4536aaf8ed 100644 --- a/tox.ini +++ b/tox.ini @@ -341,7 +341,10 @@ exclude = .venv,.git,.tox,dist,*lib/python*,*egg,build,releasenotes max-complexity = 40 [hacking] -import_exceptions = typing,nova.i18n +import_exceptions = + collections.abc + nova.i18n + typing [flake8:local-plugins] extension =