typing: Replace objects from typing with literals

We also replace the use of typing.Union and add missing parameters and
returns types for Callable types.

Change-Id: I75ed4d1cc4d84515910a5bd315f8626135258148
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2025-06-04 11:41:40 +01:00
parent c870873f7f
commit e785ab52dc
42 changed files with 476 additions and 472 deletions
+1 -2
View File
@@ -18,7 +18,6 @@ Provides a single directive that can be used to list all extra specs validators
and, thus, document all extra specs that nova recognizes and supports.
"""
import typing as ty
from docutils import nodes
from docutils.parsers import rst
@@ -90,7 +89,7 @@ def _indent(text, count=1):
def _format_validator_group_help(
validators: ty.Dict[str, base.ExtraSpecValidator],
validators: dict[str, base.ExtraSpecValidator],
summary: bool,
):
"""Generate reStructuredText snippets for a group of validators."""
+9 -10
View File
@@ -15,7 +15,6 @@
# under the License.
import functools
import typing as ty
import microversion_parse
from oslo_log import log as logging
@@ -228,10 +227,10 @@ class WSGICodes:
"""
def __init__(self) -> None:
self._codes: list[tuple[int, ty.Optional[str], ty.Optional[str]]] = []
self._codes: list[tuple[int, str | None, str | None]] = []
def add_code(
self, code: tuple[int, ty.Optional[str], ty.Optional[str]]
self, code: tuple[int, str | None, str | None]
) -> None:
self._codes.append(code)
@@ -251,8 +250,8 @@ class WSGICodes:
def response(
code: int,
min_version: ty.Optional[str] = None,
max_version: ty.Optional[str] = None,
min_version: str | None = None,
max_version: str | None = None,
):
"""Attaches response code to a method.
@@ -697,8 +696,8 @@ def removed(version: str, reason: str):
def api_version(
min_version: ty.Optional[str] = None,
max_version: ty.Optional[str] = None,
min_version: str | None = None,
max_version: str | None = None,
):
"""Mark an API as supporting lower and upper version bounds.
@@ -737,9 +736,9 @@ def api_version(
def expected_errors(
errors: ty.Union[int, tuple[int, ...]],
min_version: ty.Optional[str] = None,
max_version: ty.Optional[str] = None,
errors: int | tuple[int, ...],
min_version: str | None = None,
max_version: str | None = None,
):
"""Decorator for v2.1 API methods which specifies expected exceptions.
+10 -12
View File
@@ -58,8 +58,8 @@ class Schemas:
def add_schema(
self,
schema: tuple[dict[str, object]],
min_version: ty.Optional[str],
max_version: ty.Optional[str],
min_version: str | None,
max_version: str | None,
) -> None:
# we'd like to use bisect.insort but that doesn't accept a 'key' arg
# until Python 3.10, so we need to sort after insertion instead :(
@@ -76,9 +76,7 @@ class Schemas:
def validate_schemas(self) -> None:
"""Ensure there are no overlapping schemas."""
prev_max_version: ty.Optional[
api_version_request.APIVersionRequest
] = None
prev_max_version: api_version_request.APIVersionRequest | None = None
for schema, min_version, max_version in self._schemas:
if prev_max_version:
@@ -91,7 +89,7 @@ class Schemas:
prev_max_version = max_version
def __call__(self, req: wsgi.Request) -> ty.Optional[dict[str, object]]:
def __call__(self, req: wsgi.Request) -> dict[str, object] | None:
ver = req.api_version_request
for schema, min_version, max_version in self._schemas:
@@ -187,9 +185,9 @@ def _schema_validation_helper(
# response headers. As things stand, we're going to need five separate
# decorators.
def schema(
request_body_schema: ty.Dict[str, ty.Any],
min_version: ty.Optional[str] = None,
max_version: ty.Optional[str] = None,
request_body_schema: dict[str, ty.Any],
min_version: str | None = None,
max_version: str | None = None,
):
"""Register a schema to validate request body.
@@ -229,9 +227,9 @@ def schema(
def response_body_schema(
response_body_schema: ty.Dict[str, ty.Any],
min_version: ty.Optional[str] = None,
max_version: ty.Optional[str] = None,
response_body_schema: dict[str, ty.Any],
min_version: str | None = None,
max_version: str | None = None,
):
"""Register a schema to validate response body.
+2 -2
View File
@@ -25,9 +25,9 @@ from nova import exception
class ExtraSpecValidator:
name: str
description: str
value: ty.Dict[str, ty.Any]
value: dict[str, ty.Any]
deprecated: bool = False
parameters: ty.List[ty.Dict[str, ty.Any]] = dataclasses.field(
parameters: list[dict[str, ty.Any]] = dataclasses.field(
default_factory=list
)
@@ -15,7 +15,6 @@
"""Validators for all extra specs known by nova."""
import re
import typing as ty
from oslo_log import log as logging
from stevedore import extension
@@ -25,8 +24,8 @@ from nova import exception
LOG = logging.getLogger(__name__)
VALIDATORS: ty.Dict[str, base.ExtraSpecValidator] = {}
NAMESPACES: ty.Set[str] = set()
VALIDATORS: dict[str, base.ExtraSpecValidator] = {}
NAMESPACES: set[str] = set()
def validate(name: str, value: str):
+14 -9
View File
@@ -758,8 +758,13 @@ class CellV2Commands(object):
return CONF.database.connection
return database_connection
def _non_unique_transport_url_database_connection_checker(self, ctxt,
cell_mapping, transport_url, database_connection):
def _non_unique_transport_url_database_connection_checker(
self,
ctxt: context.RequestContext,
cell_mapping: 'objects.CellMapping | None',
transport_url: str | None,
database_connection: str | None,
) -> bool:
for cell in objects.CellMappingList.get_all(ctxt):
if cell_mapping and cell.uuid == cell_mapping.uuid:
# If we're looking for a specific cell, then don't check
@@ -1585,9 +1590,9 @@ class PlacementCommands(object):
@staticmethod
def _get_resource_request_from_ports(
ctxt: context.RequestContext,
ports: ty.List[ty.Dict[str, ty.Any]]
) -> ty.Tuple[
ty.Dict[str, ty.List['objects.RequestGroup']],
ports: list[dict[str, ty.Any]]
) -> tuple[
dict[str, list['objects.RequestGroup']],
'objects.RequestLevelParams']:
"""Collect RequestGroups and RequestLevelParams for all ports
@@ -1634,10 +1639,10 @@ class PlacementCommands(object):
def _get_port_binding_profile_allocation(
ctxt: context.RequestContext,
neutron: neutron_api.ClientWrapper,
port: ty.Dict[str, ty.Any],
request_groups: ty.List['objects.RequestGroup'],
resource_provider_mapping: ty.Dict[str, ty.List[str]],
) -> ty.Dict[str, str]:
port: dict[str, ty.Any],
request_groups: list['objects.RequestGroup'],
resource_provider_mapping: dict[str, list[str]],
) -> dict[str, str]:
"""Generate the value of the allocation key of the port binding profile
based on the provider mapping returned from placement
+3 -2
View File
@@ -20,6 +20,7 @@
networking and storage of VMs, and compute hosts on which they run)."""
import collections
from collections.abc import Mapping
import functools
import re
import typing as ty
@@ -1629,7 +1630,7 @@ class API:
def _update_ephemeral_encryption_bdms(
self,
flavor: 'objects.Flavor',
image_meta_dict: ty.Dict[str, ty.Any],
image_meta_dict: dict[str, ty.Any],
block_device_mapping: 'objects.BlockDeviceMappingList',
) -> None:
"""Update local BlockDeviceMappings when ephemeral encryption requested
@@ -5030,7 +5031,7 @@ class API:
self,
context: nova_context.RequestContext,
instance: objects.Instance,
volume: ty.Mapping[str, ty.Any],
volume: Mapping[str, ty.Any],
):
"""Avoid duplicate volume attachments.
+13 -11
View File
@@ -26,6 +26,7 @@ terminating it.
import base64
import binascii
from collections.abc import Callable, Iterator
import contextlib
import copy
import functools
@@ -35,7 +36,6 @@ import sys
import threading
import time
import traceback
import typing as ty
from cinderclient import exceptions as cinder_exception
from cursive import exception as cursive_exception
@@ -263,12 +263,12 @@ class ThreadingEventWithResult(threading.Event):
# Each collection of events is a dict of eventlet Events keyed by a tuple of
# event name and associated tag
_InstanceEvents = ty.Dict[ty.Tuple[str, str], ThreadingEventWithResult]
_InstanceEvents = dict[tuple[str, str], ThreadingEventWithResult]
class InstanceEvents(object):
def __init__(self):
self._events: ty.Optional[ty.Dict[str, _InstanceEvents]] = {}
self._events: dict[str, _InstanceEvents] | None = {}
@staticmethod
def _lock_name(instance) -> str:
@@ -483,7 +483,7 @@ class ComputeVirtAPI(virtapi.VirtAPI):
def _wait_for_instance_events(
instance: 'objects.Instance',
events: dict,
error_callback: ty.Callable,
error_callback: Callable[[str, 'objects.Instance'], bool],
timeout: int,
) -> None:
deadline = time.monotonic() + timeout
@@ -723,7 +723,7 @@ class ComputeManager(manager.Manager):
self.host, self.driver, reportclient=self.reportclient)
@contextlib.contextmanager
def syncs_in_progress(self) -> ty.Iterator[set[str]]:
def syncs_in_progress(self) -> Iterator[set[str]]:
with self._syncs_in_progress_lock:
yield self._syncs_in_progress
@@ -3349,7 +3349,7 @@ class ComputeManager(manager.Manager):
raise original_exception
def _get_multiattach_volume_lock_names_bdms(
self, bdms: objects.BlockDeviceMappingList) -> ty.List[str]:
self, bdms: objects.BlockDeviceMappingList) -> list[str]:
"""Get the lock names for multiattach volumes.
:param bdms: BlockDeviceMappingList object
@@ -8639,7 +8639,7 @@ class ComputeManager(manager.Manager):
context: nova.context.RequestContext,
instance: 'objects.Instance',
port_id: str,
port_allocation: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]],
port_allocation: dict[str, dict[str, dict[str, int]]],
) -> None:
if not port_allocation:
@@ -8691,7 +8691,7 @@ class ComputeManager(manager.Manager):
context: nova.context.RequestContext,
instance: 'objects.Instance',
pci_reqs: 'objects.InstancePCIRequests',
) -> ty.Optional['objects.PciDevice']:
) -> 'objects.PciDevice | None':
"""Claim PCI devices if there are PCI requests
:param context: nova.context.RequestContext
@@ -8728,10 +8728,12 @@ class ComputeManager(manager.Manager):
context: nova.context.RequestContext,
instance: 'objects.Instance',
pci_reqs: 'objects.InstancePCIRequests',
request_groups: ty.List['objects.RequestGroup'],
request_groups: list['objects.RequestGroup'],
request_level_params: 'objects.RequestLevelParams',
) -> ty.Tuple[ty.Optional[ty.Dict[str, ty.List[str]]],
ty.Optional[ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]]]]:
) -> tuple[
dict[str, list[str]] | None,
dict[str, dict[str, dict[str, int]]] | None
]:
"""Allocate resources for the request in placement
:param context: nova.context.RequestContext
+25 -25
View File
@@ -11,9 +11,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import typing as ty
import os_resource_classes
import os_traits
@@ -48,7 +48,7 @@ def _is_placement_tracking_enabled() -> bool:
return CONF.pci.report_in_placement
def _normalize_traits(traits: ty.List[str]) -> ty.List[str]:
def _normalize_traits(traits: list[str]) -> list[str]:
"""Make the trait names acceptable for placement.
It keeps the already valid standard or custom traits but normalizes trait
@@ -66,7 +66,7 @@ def _normalize_traits(traits: ty.List[str]) -> ty.List[str]:
return list(standard_traits) + custom_traits
def get_traits(traits_str: str) -> ty.Set[str]:
def get_traits(traits_str: str) -> set[str]:
"""Return a normalized set of placement standard and custom traits from
a string of comma separated trait names.
"""
@@ -77,8 +77,8 @@ def get_traits(traits_str: str) -> ty.Set[str]:
def _get_traits_for_dev(
dev_spec_tags: ty.Dict[str, str],
) -> ty.Set[str]:
dev_spec_tags: dict[str, str],
) -> set[str]:
return get_traits(dev_spec_tags.get("traits", "")) | {
os_traits.COMPUTE_MANAGED_PCI_DEVICE
}
@@ -98,7 +98,7 @@ def _normalize_resource_class(rc: str) -> str:
def get_resource_class(
requested_name: ty.Optional[str], vendor_id: str, product_id: str
requested_name: str | None, vendor_id: str, product_id: str
) -> str:
"""Return the normalized resource class name based on what is requested
or if nothing is requested then generated from the vendor_id and product_id
@@ -112,7 +112,7 @@ def get_resource_class(
def _get_rc_for_dev(
dev: pci_device.PciDevice,
dev_spec_tags: ty.Dict[str, str],
dev_spec_tags: dict[str, str],
) -> str:
"""Return the resource class to represent the device.
@@ -132,9 +132,9 @@ class PciResourceProvider:
def __init__(self, name: str) -> None:
self.name = name
self.parent_dev = None
self.children_devs: ty.List[pci_device.PciDevice] = []
self.resource_class: ty.Optional[str] = None
self.traits: ty.Optional[ty.Set[str]] = None
self.children_devs: list[pci_device.PciDevice] = []
self.resource_class: str | None = None
self.traits: set[str] | None = None
self.is_otu = False
# This is an adjustment for the total inventory based on normal device
# due to possibility of devices held in the tracker even though they
@@ -144,7 +144,7 @@ class PciResourceProvider:
self.adjustment = 0
@property
def devs(self) -> ty.List[pci_device.PciDevice]:
def devs(self) -> list[pci_device.PciDevice]:
return [self.parent_dev] if self.parent_dev else self.children_devs
@property
@@ -155,7 +155,7 @@ class PciResourceProvider:
def to_be_deleted(self):
return self.total == 0
def add_child(self, dev, dev_spec_tags: ty.Dict[str, str]) -> None:
def add_child(self, dev, dev_spec_tags: dict[str, str]) -> None:
if self.parent_dev:
raise exception.PlacementPciDependentDeviceException(
parent_dev=dev.address,
@@ -192,7 +192,7 @@ class PciResourceProvider:
self.resource_class = rc
self.traits = traits
def add_parent(self, dev, dev_spec_tags: ty.Dict[str, str]) -> None:
def add_parent(self, dev, dev_spec_tags: dict[str, str]) -> None:
if self.parent_dev or self.children_devs:
raise exception.PlacementPciDependentDeviceException(
parent_dev=dev.address,
@@ -222,7 +222,7 @@ class PciResourceProvider:
# Nothing to do here. The update_provider_tree we handle full RP
pass
def _get_allocations(self) -> ty.Mapping[str, int]:
def _get_allocations(self) -> collections.Counter[str]:
"""Return a dict of used resources keyed by consumer UUID.
Note that:
@@ -279,7 +279,7 @@ class PciResourceProvider:
def _adjust_for_removals_and_held_devices(
self,
provider_tree: provider_tree.ProviderTree,
rp_rc_usage: ty.Dict[str, ty.Dict[str, int]],
rp_rc_usage: dict[str, dict[str, int]],
) -> None:
rp_uuid = provider_tree.data(self.name).uuid
@@ -323,7 +323,7 @@ class PciResourceProvider:
self,
provider_tree: provider_tree.ProviderTree,
parent_rp_name: str,
rp_rc_usage: ty.Dict[str, ty.Dict[str, int]],
rp_rc_usage: dict[str, dict[str, int]],
) -> None:
if not provider_tree.exists(self.name):
@@ -362,7 +362,7 @@ class PciResourceProvider:
self,
allocations: dict,
provider_tree: provider_tree.ProviderTree,
same_host_instances: ty.List[str],
same_host_instances: list[str],
) -> bool:
updated = False
@@ -452,9 +452,9 @@ class PlacementView:
def __init__(
self,
hypervisor_hostname: str,
instances_under_same_host_resize: ty.List[str],
instances_under_same_host_resize: list[str],
) -> None:
self.rps: ty.Dict[str, PciResourceProvider] = {}
self.rps: dict[str, PciResourceProvider] = {}
self.root_rp_name = hypervisor_hostname
self.same_host_instances = instances_under_same_host_resize
@@ -478,7 +478,7 @@ class PlacementView:
return self._get_rp_name_for_address(dev.parent_addr)
def _add_dev(
self, dev: pci_device.PciDevice, dev_spec_tags: ty.Dict[str, str]
self, dev: pci_device.PciDevice, dev_spec_tags: dict[str, str]
) -> None:
if dev_spec_tags.get("physical_network"):
# NOTE(gibi): We ignore devices that has physnet configured as
@@ -533,7 +533,7 @@ class PlacementView:
def process_dev(
self,
dev: pci_device.PciDevice,
dev_spec: ty.Optional[devspec.PciDeviceSpec],
dev_spec: devspec.PciDeviceSpec | None,
) -> None:
# NOTE(gibi): We never observer dev.status DELETED as when that is set
# the device is also removed from the PCI tracker. So we can ignore
@@ -610,12 +610,12 @@ class PlacementView:
@staticmethod
def get_usage_per_rc_and_rp(
allocations
) -> ty.Dict[str, ty.Dict[str, int]]:
) -> dict[str, dict[str, int]]:
"""Returns a dict keyed by RP uuid and the value is a dict of
resource class: usage pairs telling how much total usage the given RP
has from the given resource class across all the allocations.
"""
rp_rc_usage: ty.Dict[str, ty.Dict[str, int]] = (
rp_rc_usage: dict[str, dict[str, int]] = (
collections.defaultdict(lambda: collections.defaultdict(int)))
for consumer in allocations.values():
for rp_uuid, alloc in consumer["allocations"].items():
@@ -672,7 +672,7 @@ class PlacementView:
return updated
def ensure_no_dev_spec_with_devname(dev_specs: ty.List[devspec.PciDeviceSpec]):
def ensure_no_dev_spec_with_devname(dev_specs: list[devspec.PciDeviceSpec]):
for dev_spec in dev_specs:
if dev_spec.dev_spec_conf.get("devname"):
msg = _(
@@ -709,7 +709,7 @@ def update_provider_tree_for_pci(
nodename: str,
pci_tracker: pci_manager.PciDevTracker,
allocations: dict,
instances_under_same_host_resize: ty.List[str],
instances_under_same_host_resize: list[str],
) -> bool:
"""Based on the PciDevice objects in the pci_tracker it calculates what
inventories and allocations needs to exist in placement and create the
+2 -3
View File
@@ -21,7 +21,6 @@ import copy
import functools
import sys
import threading
import typing as ty
from keystoneauth1 import exceptions as ks_exc
from oslo_config import cfg
@@ -979,8 +978,8 @@ class ComputeTaskManager:
flavor: 'objects.Flavor',
request_spec: 'objects.RequestSpec',
orig_num_req: int,
project_id: ty.Optional[str] = None,
user_id: ty.Optional[str] = None
project_id: str | None = None,
user_id: str | None = None
) -> None:
# A quota "recheck" is a quota check that is performed *after* quota
# limited resources are consumed. It is meant to address race
+7 -8
View File
@@ -24,7 +24,6 @@ import binascii
import hashlib
import io
import os
import typing as ty
from castellan.common import exception as castellan_exception
from castellan.common.objects import passphrase
@@ -82,7 +81,7 @@ def generate_fingerprint(public_key: str) -> str:
reason=_('failed to generate fingerprint'))
def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str:
def generate_x509_fingerprint(pem_key: bytes | str) -> str:
try:
if isinstance(pem_key, str):
pem_key = pem_key.encode('utf-8')
@@ -98,7 +97,7 @@ def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str:
'Error message: %s') % ex)
def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]:
def generate_key_pair(bits: int = 2048) -> tuple[str, str, str]:
key = paramiko.RSAKey.generate(bits)
keyout = io.StringIO()
key.write_private_key(keyout)
@@ -108,7 +107,7 @@ def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]:
return (private_key, public_key, fingerprint)
def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes:
def ssh_encrypt_text(ssh_public_key: str, text: str | bytes) -> bytes:
"""Encrypt text with an ssh public key.
If text is a Unicode string, encode it to UTF-8.
@@ -127,7 +126,7 @@ def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes:
def generate_winrm_x509_cert(
user_id: str,
bits: int = 2048
) -> ty.Tuple[str, str, str]:
) -> tuple[str, str, str]:
"""Generate a cert for passwordless auth for user in project."""
subject = '/CN=%s' % user_id
upn = '%s@localhost' % user_id
@@ -171,7 +170,7 @@ def _create_x509_openssl_config(conffile: str, upn: str):
def ensure_vtpm_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
) -> ty.Tuple[str, bytes]:
) -> tuple[str, bytes]:
"""Communicates with the key manager service to retrieve or create a secret
for an instance's emulated TPM.
@@ -280,7 +279,7 @@ def create_encryption_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
driver_bdm: 'driver_block_device.DriverBlockDevice',
for_detail: ty.Optional[str] = None,
for_detail: str | None = None,
):
# Use oslo.serialization to encode some random data as passphrase
secret = oslo_base64.encode_as_text(
@@ -301,7 +300,7 @@ def create_encryption_secret(
def get_encryption_secret(
context: nova_context.RequestContext,
secret_uuid: str,
) -> ty.Optional[str]:
) -> str | None:
key_mgr = _get_key_manager()
try:
key = key_mgr.get(context, secret_uuid)
+6 -6
View File
@@ -81,7 +81,7 @@ LEGACY_LIMITS = {
def get_in_use(
context: 'nova.context.RequestContext', project_id: str
) -> ty.Dict[str, int]:
) -> dict[str, int]:
"""Returns in use counts for each resource, for given project.
This sounds simple but many resources can't be counted per project,
@@ -107,8 +107,8 @@ def get_in_use(
def always_zero_usage(
project_id: str, resource_names: ty.List[str]
) -> ty.Dict[str, int]:
project_id: str, resource_names: list[str]
) -> dict[str, int]:
"""Called by oslo_limit's enforcer"""
# Return usage of 0 for API limits. Values in API requests will be used as
# the deltas.
@@ -196,8 +196,8 @@ def enforce_db_limit(
def _convert_keys_to_legacy_name(
new_dict: ty.Dict[str, int]
) -> ty.Dict[str, int]:
new_dict: dict[str, int]
) -> dict[str, int]:
legacy = {}
for new_name, old_name in LEGACY_LIMITS.items():
# defensive in case oslo or keystone doesn't give us an answer
@@ -205,7 +205,7 @@ def _convert_keys_to_legacy_name(
return legacy
def get_legacy_default_limits() -> ty.Dict[str, int]:
def get_legacy_default_limits() -> dict[str, int]:
# TODO(johngarbutt): need oslo.limit API for this, it should do caching
enforcer = limit.Enforcer(lambda: None)
new_limits = enforcer.get_registered_limits(LEGACY_LIMITS.keys())
+6 -8
View File
@@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
import os_resource_classes as orc
from oslo_limit import exception as limit_exceptions
from oslo_limit import limit
@@ -42,7 +40,7 @@ LEGACY_LIMITS = {
def _get_placement_usages(
context: 'nova.context.RequestContext', project_id: str
) -> ty.Dict[str, int]:
) -> dict[str, int]:
return report.report_client_singleton().get_usages_counts_for_limits(
context, project_id)
@@ -50,8 +48,8 @@ def _get_placement_usages(
def _get_usage(
context: 'nova.context.RequestContext',
project_id: str,
resource_names: ty.List[str],
) -> ty.Dict[str, int]:
resource_names: list[str],
) -> dict[str, int]:
"""Called by oslo_limit's enforcer"""
if not limit_utils.use_unified_limits():
raise NotImplementedError("Unified limits support is disabled")
@@ -120,7 +118,7 @@ def _get_usage(
def _get_deltas_by_flavor(
flavor: 'objects.Flavor', is_bfv: bool, count: int
) -> ty.Dict[str, int]:
) -> dict[str, int]:
if flavor is None:
raise ValueError("flavor")
if count < 0:
@@ -156,8 +154,8 @@ def enforce_num_instances_and_flavor(
is_bfvm: bool,
min_count: int,
max_count: int,
enforcer: ty.Optional[limit.Enforcer] = None,
delta_updates: ty.Optional[ty.Dict[str, int]] = None,
enforcer: limit.Enforcer | None = None,
delta_updates: dict[str, int] | None = None,
) -> int:
"""Return max instances possible, else raise TooManyInstances exception."""
if not limit_utils.use_unified_limits():
+11 -11
View File
@@ -645,14 +645,14 @@ class API:
# it is a dict of network dicts as returned by the neutron client keyed
# by network UUID
networks: ty.Dict[str, ty.Dict] = {}
networks: dict[str, dict] = {}
for port_id in ports:
# A port_id is optional in the NetworkRequest object so check here
# in case the caller forgot to filter the list.
if port_id is None:
continue
port_req_body: ty.Dict[str, ty.Any] = {
port_req_body: dict[str, ty.Any] = {
'port': {
constants.BINDING_HOST_ID: None,
}
@@ -1120,8 +1120,8 @@ class API:
self,
context: nova_context.RequestContext,
port_id: str,
resource_provider_mapping: ty.Dict[str, ty.List[str]],
) -> ty.Union[None, str, ty.Dict[str, str]]:
resource_provider_mapping: dict[str, list[str]],
) -> None | str | dict[str, str]:
"""Calculate the value of the allocation key of the binding:profile
based on the allocated resources.
@@ -1579,7 +1579,7 @@ class API:
client = get_client(context, admin=True)
bindings_by_port_id: ty.Dict[str, ty.Any] = {}
bindings_by_port_id: dict[str, ty.Any] = {}
for vif in network_info:
# Now bind each port to the destination host and keep track of each
# port that is bound to the resulting binding so we can rollback in
@@ -1650,7 +1650,7 @@ class API:
need to do the necessary plumbing in order to set a VF up for packet
forwarding.
"""
vf_profile: ty.Dict[str, ty.Union[str, int]] = {}
vf_profile: dict[str, str | int] = {}
pf_mac = pci_dev.sriov_cap.get('pf_mac_address')
vf_num = pci_dev.sriov_cap.get('vf_num')
@@ -1919,7 +1919,7 @@ class API:
# only
neutron_admin = get_client(context, admin=True)
neutron = get_client(context)
port_allocation: ty.Dict = {}
port_allocation: dict = {}
try:
# NOTE(gibi): we need to read the port resource information from
# neutron here as we might delete the port below
@@ -2652,8 +2652,8 @@ class API:
self,
context: nova_context.RequestContext,
instance_uuid: str
) -> ty.Tuple[
ty.List['objects.RequestGroup'], 'objects.RequestLevelParams']:
) -> tuple[
list['objects.RequestGroup'], 'objects.RequestLevelParams']:
"""Collect resource requests from the ports associated to the instance
:param context: nova request context
@@ -3945,7 +3945,7 @@ class API:
self,
context: nova.context.RequestContext,
network_id: str,
) -> ty.List[str]:
) -> list[str]:
"""Query the segmentation ids for the given network.
:param context: The request context.
@@ -3976,7 +3976,7 @@ class API:
self,
context: nova.context.RequestContext,
subnet_id: str,
) -> ty.Optional[str]:
) -> str | None:
"""Query the segmentation id for the given subnet.
:param context: The request context.
+3 -4
View File
@@ -13,7 +13,6 @@
# under the License.
import contextlib
import typing as ty
from oslo_config import cfg
from oslo_db import exception as db_exc
@@ -1282,9 +1281,9 @@ class Instance(base.NovaPersistentObject, base.NovaObject,
def get_pci_devices(
self,
source: ty.Optional[int] = None,
request_id: ty.Optional[str] = None,
) -> ty.List["objects.PciDevice"]:
source: int | None = None,
request_id: str | None = None,
) -> list["objects.PciDevice"]:
"""Return the PCI devices allocated to the instance
:param source: Filter by source. It can be
+2 -2
View File
@@ -489,7 +489,7 @@ class RequestSpec(base.NovaObject):
return filt_props
@staticmethod
def _rc_from_request(spec: ty.Dict[str, ty.Any]) -> str:
def _rc_from_request(spec: dict[str, ty.Any]) -> str:
return pci_placement_translator.get_resource_class(
spec.get("resource_class"),
spec.get("vendor_id"),
@@ -497,7 +497,7 @@ class RequestSpec(base.NovaObject):
)
@staticmethod
def _traits_from_request(spec: ty.Dict[str, ty.Any]) -> ty.Set[str]:
def _traits_from_request(spec: dict[str, ty.Any]) -> set[str]:
return pci_placement_translator.get_traits(spec.get("traits", ""))
def generate_request_groups_from_pci_requests(self):
+10 -10
View File
@@ -38,7 +38,7 @@ REGEX_ANY = '.*'
LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
PCISpecAddressType = ty.Union[ty.Dict[str, str], str]
PCISpecAddressType = dict[str, str] | str
class PciAddressSpec(metaclass=abc.ABCMeta):
@@ -242,7 +242,7 @@ class WhitelistPciAddress(object):
else:
self.pci_address_spec = PhysicalPciAddress(pci_addr)
def match(self, pci_addr: str, pci_phys_addr: ty.Optional[str]) -> bool:
def match(self, pci_addr: str, pci_phys_addr: str | None) -> bool:
"""Match a device to this PciAddress.
Assume this is called with a ``pci_addr`` and ``pci_phys_addr``
@@ -269,7 +269,7 @@ class WhitelistPciAddress(object):
class PciDeviceSpec(PciAddressSpec):
def __init__(self, dev_spec: ty.Dict[str, str]) -> None:
def __init__(self, dev_spec: dict[str, str]) -> None:
# stored for better error reporting
self.dev_spec_conf = copy.deepcopy(dev_spec)
# the non tag fields (i.e. address, devname) will be removed by
@@ -277,7 +277,7 @@ class PciDeviceSpec(PciAddressSpec):
self.tags = dev_spec
self._init_dev_details()
def _address_obj(self) -> ty.Optional[WhitelistPciAddress]:
def _address_obj(self) -> WhitelistPciAddress | None:
address_obj = None
if self.dev_name:
address_str, pf = utils.get_function_by_ifname(self.dev_name)
@@ -295,7 +295,7 @@ class PciDeviceSpec(PciAddressSpec):
self.vendor_id = self.tags.pop("vendor_id", ANY)
self.product_id = self.tags.pop("product_id", ANY)
self.dev_name = self.tags.pop("devname", None)
self.address: ty.Optional[WhitelistPciAddress] = None
self.address: WhitelistPciAddress | None = None
# Note(moshele): The address attribute can be a string or a dict.
# For glob syntax or specific pci it is a string and for regex syntax
# it is a dict. The WhitelistPciAddress class handles both types.
@@ -371,7 +371,7 @@ class PciDeviceSpec(PciAddressSpec):
'pf_addr': pf_addr})
def _ensure_remote_managed_dev_vpd_serial(
self, dev_dict: ty.Dict[str, ty.Any]) -> bool:
self, dev_dict: dict[str, ty.Any]) -> bool:
"""Ensure the presence of a serial number field in PCI VPD.
A card serial number extracted from PCI VPD is required to allow a
@@ -388,8 +388,8 @@ class PciDeviceSpec(PciAddressSpec):
# an empty string which is not useful for device identification.
return bool(card_sn)
def match(self, dev_dict: ty.Dict[str, ty.Any]) -> bool:
address_obj: ty.Optional[WhitelistPciAddress] = self._address_obj()
def match(self, dev_dict: dict[str, ty.Any]) -> bool:
address_obj: WhitelistPciAddress | None = self._address_obj()
if not address_obj:
return False
@@ -412,7 +412,7 @@ class PciDeviceSpec(PciAddressSpec):
}
return self.match(dev_dict)
def get_tags(self) -> ty.Dict[str, str]:
def get_tags(self) -> dict[str, str]:
return self.tags
def _normalize_device_spec_tag(self, tag):
@@ -426,7 +426,7 @@ class PciDeviceSpec(PciAddressSpec):
reason=f"Cannot parse tag '{tag}': " + str(e)
)
def enhanced_pci_device_with_spec_tags(self, dev: ty.Dict[str, ty.Any]):
def enhanced_pci_device_with_spec_tags(self, dev: dict[str, ty.Any]):
spec_tags = ["managed", "live_migratable"]
for tag in spec_tags:
tag_value = self.tags.get(tag)
+13 -11
View File
@@ -32,8 +32,8 @@ from nova.pci import whitelist
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
MappingType = ty.Dict[str, ty.List['objects.PciDevice']]
PCIInvType = ty.DefaultDict[str, ty.List['objects.PciDevice']]
MappingType = dict[str, list['objects.PciDevice']]
PCIInvType = collections.defaultdict[str, list['objects.PciDevice']]
class PciDevTracker(object):
@@ -68,8 +68,8 @@ class PciDevTracker(object):
:param compute_node: The object.ComputeNode whose PCI devices we're
tracking.
"""
self.stale: ty.Dict[str, objects.PciDevice] = {}
self.to_be_removed_when_freed: ty.Dict[str, objects.PciDevice] = {}
self.stale: dict[str, objects.PciDevice] = {}
self.to_be_removed_when_freed: dict[str, objects.PciDevice] = {}
self.node_id: str = compute_node.id
self.dev_filter = whitelist.Whitelist(CONF.pci.device_spec)
numa_topology = compute_node.numa_topology
@@ -177,7 +177,7 @@ class PciDevTracker(object):
self._set_hvdevs(devices)
@staticmethod
def _build_device_tree(all_devs: ty.List['objects.PciDevice']) -> None:
def _build_device_tree(all_devs: list['objects.PciDevice']) -> None:
"""Build a tree of devices that represents parent-child relationships.
We need to have the relationships set up so that we can easily make
@@ -214,7 +214,7 @@ class PciDevTracker(object):
if dev.parent_device:
parents[dev.parent_addr].child_devices.append(dev)
def _set_hvdevs(self, devices: ty.List[ty.Dict[str, ty.Any]]) -> None:
def _set_hvdevs(self, devices: list[dict[str, ty.Any]]) -> None:
exist_addrs = set([dev.address for dev in self.pci_devs])
new_addrs = set([dev['address'] for dev in devices])
@@ -273,7 +273,7 @@ class PciDevTracker(object):
self.stats.remove_device(existed)
else:
# Update tracked devices.
new_value: ty.Dict[str, ty.Any]
new_value: dict[str, ty.Any]
new_value = next((dev for dev in devices if
dev['address'] == existed.address))
new_value['compute_node_id'] = self.node_id
@@ -312,7 +312,7 @@ class PciDevTracker(object):
context: ctx.RequestContext,
pci_requests: 'objects.InstancePCIRequests',
instance_numa_topology: 'objects.InstanceNUMATopology',
) -> ty.List['objects.PciDevice']:
) -> list['objects.PciDevice']:
instance_cells = None
if instance_numa_topology:
instance_cells = instance_numa_topology.cells
@@ -337,7 +337,7 @@ class PciDevTracker(object):
context: ctx.RequestContext,
pci_requests: 'objects.InstancePCIRequests',
instance_numa_topology: 'objects.InstanceNUMATopology',
) -> ty.List['objects.PciDevice']:
) -> list['objects.PciDevice']:
devs = []
@@ -350,7 +350,7 @@ class PciDevTracker(object):
return devs
def _allocate_instance(
self, instance: 'objects.Instance', devs: ty.List['objects.PciDevice'],
self, instance: 'objects.Instance', devs: list['objects.PciDevice'],
) -> None:
for dev in devs:
dev.allocate(instance)
@@ -398,7 +398,9 @@ class PciDevTracker(object):
pci_mapping.pop(instance_uuid, None)
def _free_device(
self, dev: 'objects.PciDevice', instance: 'objects.Instance' = None,
self,
dev: 'objects.PciDevice',
instance: 'objects.Instance | None' = None,
) -> None:
freed_devs = dev.free(instance)
stale = self.stale.pop(dev.address, None)
+12 -12
View File
@@ -24,7 +24,7 @@
| "numa_policy": "legacy"
| }'
Aliases with the same name, device_type and numa_policy are ORed::
Aliases with the same name, device_type and numa_policy are ORed::
| [pci]
| alias = '{
@@ -34,11 +34,11 @@
| "device_type": "type-PCI",
| }'
These two aliases define a device request meaning: vendor_id is "8086" and
product_id is "0442" or "0443".
"""
These two aliases define a device request meaning: vendor_id is "8086" and
product_id is "0442" or "0443".
"""
import functools
import typing as ty
import jsonschema
from oslo_log import log as logging
@@ -55,7 +55,7 @@ from nova.objects import fields as obj_fields
from nova.pci import utils
from oslo_utils import strutils
Alias = ty.Dict[str, ty.Tuple[str, ty.List[ty.Dict[str, str]]]]
Alias = dict[str, tuple[str, list[dict[str, str]]]]
PCI_NET_TAG = 'physical_network'
PCI_TRUSTED_TAG = 'trusted'
@@ -235,12 +235,12 @@ def get_alias_from_config() -> Alias:
def _translate_alias_to_requests(
alias_spec: str, affinity_policy: ty.Optional[str] = None,
) -> ty.List['objects.InstancePCIRequest']:
alias_spec: str, affinity_policy: str | None = None,
) -> list['objects.InstancePCIRequest']:
"""Generate complete pci requests from pci aliases in extra_spec."""
pci_aliases = get_alias_from_config()
pci_requests: ty.List[objects.InstancePCIRequest] = []
pci_requests: list[objects.InstancePCIRequest] = []
for name, count in [spec.split(':') for spec in alias_spec.split(',')]:
name = name.strip()
if name not in pci_aliases:
@@ -267,7 +267,7 @@ def get_instance_pci_request_from_vif(
context: ctx.RequestContext,
instance: 'objects.Instance',
vif: network_model.VIF,
) -> ty.Optional['objects.InstancePCIRequest']:
) -> 'objects.InstancePCIRequest | None':
"""Given an Instance, return the PCI request associated
to the PCI device related to the given VIF (if any) on the
compute node the instance is currently running.
@@ -322,7 +322,7 @@ def get_instance_pci_request_from_vif(
def get_pci_requests_from_flavor(
flavor: 'objects.Flavor', affinity_policy: ty.Optional[str] = None,
flavor: 'objects.Flavor', affinity_policy: str | None = None,
) -> 'objects.InstancePCIRequests':
"""Validate and return PCI requests.
@@ -369,7 +369,7 @@ def get_pci_requests_from_flavor(
:raises: exception.PciInvalidAlias if the configuration contains invalid
aliases.
"""
pci_requests: ty.List[objects.InstancePCIRequest] = []
pci_requests: list[objects.InstancePCIRequest] = []
if ('extra_specs' in flavor and
'pci_passthrough:alias' in flavor['extra_specs']):
pci_requests = _translate_alias_to_requests(
+53 -51
View File
@@ -13,7 +13,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import collections.abc
import copy
import typing as ty
@@ -36,7 +38,7 @@ LOG = logging.getLogger(__name__)
# TODO(stephenfin): We might want to use TypedDict here. Refer to
# https://mypy.readthedocs.io/en/latest/kinds_of_types.html#typeddict for
# more information.
Pool = ty.Dict[str, ty.Any]
Pool = dict[str, ty.Any]
class PciDeviceStats(object):
@@ -81,8 +83,8 @@ class PciDeviceStats(object):
def __init__(
self,
numa_topology: 'objects.NUMATopology',
stats: 'objects.PciDevicePoolList' = None,
dev_filter: ty.Optional[whitelist.Whitelist] = None,
stats: 'objects.PciDevicePoolList | None' = None,
dev_filter: whitelist.Whitelist | None = None,
) -> None:
self.numa_topology = numa_topology
self.pools = (
@@ -93,12 +95,12 @@ class PciDeviceStats(object):
CONF.pci.device_spec)
def _equal_properties(
self, dev: Pool, entry: Pool, matching_keys: ty.List[str],
self, dev: Pool, entry: Pool, matching_keys: list[str],
) -> bool:
return all(dev.get(prop) == entry.get(prop)
for prop in matching_keys)
def _find_pool(self, dev_pool: Pool) -> ty.Optional[Pool]:
def _find_pool(self, dev_pool: Pool) -> Pool | None:
"""Return the first pool that matches dev."""
for pool in self.pools:
pool_keys = pool.copy()
@@ -137,7 +139,7 @@ class PciDeviceStats(object):
def _create_pool_keys_from_dev(
self, dev: 'objects.PciDevice',
) -> ty.Optional[Pool]:
) -> Pool | None:
"""Create a stats pool dict that this dev is supposed to be part of
Note that this pool dict contains the stats pool's keys and their
@@ -181,7 +183,7 @@ class PciDeviceStats(object):
def _get_pool_with_device_type_mismatch(
self, dev: 'objects.PciDevice',
) -> ty.Optional[ty.Tuple[Pool, 'objects.PciDevice']]:
) -> tuple[Pool, 'objects.PciDevice'] | None:
"""Check for device type mismatch in the pools for a given device.
Return (pool, device) if device type does not match or a single None
@@ -223,7 +225,7 @@ class PciDeviceStats(object):
@staticmethod
def _decrease_pool_count(
pool_list: ty.List[Pool], pool: Pool, count: int = 1,
pool_list: list[Pool], pool: Pool, count: int = 1,
) -> int:
"""Decrement pool's size by count.
@@ -248,15 +250,15 @@ class PciDeviceStats(object):
pool['devices'].remove(dev)
self._decrease_pool_count(self.pools, pool)
def get_free_devs(self) -> ty.List['objects.PciDevice']:
free_devs: ty.List[objects.PciDevice] = []
def get_free_devs(self) -> list['objects.PciDevice']:
free_devs: list[objects.PciDevice] = []
for pool in self.pools:
free_devs.extend(pool['devices'])
return free_devs
def _allocate_devs(
self, pool: Pool, num: int, request_id: str
) -> ty.List["objects.PciDevice"]:
) -> list["objects.PciDevice"]:
alloc_devices = []
for _ in range(num):
pci_dev = pool['devices'].pop()
@@ -268,10 +270,10 @@ class PciDeviceStats(object):
def consume_requests(
self,
pci_requests: 'objects.InstancePCIRequests',
numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None,
) -> ty.Optional[ty.List['objects.PciDevice']]:
numa_cells: list['objects.InstanceNUMACell'] | None = None,
) -> list['objects.PciDevice'] | None:
alloc_devices: ty.List[objects.PciDevice] = []
alloc_devices: list[objects.PciDevice] = []
for request in pci_requests:
count = request.count
@@ -360,8 +362,8 @@ class PciDeviceStats(object):
return
def _filter_pools_for_spec(
self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest',
) -> ty.List[Pool]:
self, pools: list[Pool], request: 'objects.InstancePCIRequest',
) -> list[Pool]:
"""Filter out pools that don't match the request's device spec.
Exclude pools that do not match the specified ``vendor_id``,
@@ -390,10 +392,10 @@ class PciDeviceStats(object):
def _filter_pools_for_numa_cells(
self,
pools: ty.List[Pool],
pools: list[Pool],
request: 'objects.InstancePCIRequest',
numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']],
) -> ty.List[Pool]:
numa_cells: list['objects.InstanceNUMACell'] | None,
) -> list[Pool]:
"""Filter out pools with the wrong NUMA affinity, if required.
Exclude pools that do not have *suitable* PCI NUMA affinity.
@@ -473,9 +475,9 @@ class PciDeviceStats(object):
def _filter_pools_for_socket_affinity(
self,
pools: ty.List[Pool],
numa_cells: ty.List['objects.InstanceNUMACell'],
) -> ty.List[Pool]:
pools: list[Pool],
numa_cells: list['objects.InstanceNUMACell'],
) -> list[Pool]:
host_cells = self.numa_topology.cells
# bail early if we don't have socket information for all host_cells.
# This could happen if we're running on an weird older system with
@@ -509,8 +511,8 @@ class PciDeviceStats(object):
]
def _filter_pools_for_unrequested_pfs(
self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest',
) -> ty.List[Pool]:
self, pools: list[Pool], request: 'objects.InstancePCIRequest',
) -> list[Pool]:
"""Filter out pools with PFs, unless these are required.
This is necessary in cases where PFs and VFs have the same product_id
@@ -534,9 +536,9 @@ class PciDeviceStats(object):
def _filter_pools_for_unrequested_vdpa_devices(
self,
pools: ty.List[Pool],
pools: list[Pool],
request: 'objects.InstancePCIRequest',
) -> ty.List[Pool]:
) -> list[Pool]:
"""Filter out pools with VDPA devices, unless these are required.
This is necessary as vdpa devices require special handling and
@@ -559,8 +561,8 @@ class PciDeviceStats(object):
return pools
def _filter_pools_for_unrequested_remote_managed_devices(
self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest',
) -> ty.List[Pool]:
self, pools: list[Pool], request: 'objects.InstancePCIRequest',
) -> list[Pool]:
"""Filter out pools with remote_managed devices, unless requested.
Remote-managed devices are not usable for legacy SR-IOV or hardware
@@ -581,10 +583,10 @@ class PciDeviceStats(object):
def _filter_pools_based_on_placement_allocation(
self,
pools: ty.List[Pool],
pools: list[Pool],
request: 'objects.InstancePCIRequest',
rp_uuids: ty.List[str],
) -> ty.List[Pool]:
rp_uuids: list[str],
) -> list[Pool]:
if not rp_uuids:
# If there is no placement allocation then we don't need to filter
# by it. This could happen if the instance only has neutron port
@@ -620,8 +622,8 @@ class PciDeviceStats(object):
return matching_pools
def _filter_pools_for_live_migratable_devices(
self, pools: ty.List[Pool], request: 'objects.InstancePCIRequest',
) -> ty.List[Pool]:
self, pools: list[Pool], request: 'objects.InstancePCIRequest',
) -> list[Pool]:
"""Filter out pools with non live_migratable devices.
:param pools: A list of PCI device pool dicts
@@ -653,11 +655,11 @@ class PciDeviceStats(object):
def _filter_pools(
self,
pools: ty.List[Pool],
pools: list[Pool],
request: 'objects.InstancePCIRequest',
numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']],
rp_uuids: ty.List[str],
) -> ty.Optional[ty.List[Pool]]:
numa_cells: list['objects.InstanceNUMACell'] | None,
rp_uuids: list[str],
) -> list[Pool] | None:
"""Determine if an individual PCI request can be met.
Filter pools, which are collections of devices with similar traits, to
@@ -789,9 +791,9 @@ class PciDeviceStats(object):
def support_requests(
self,
requests: ty.List['objects.InstancePCIRequest'],
provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]],
numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None,
requests: list['objects.InstancePCIRequest'],
provider_mapping: dict[str, list[str]] | None,
numa_cells: list['objects.InstanceNUMACell'] | None = None,
) -> bool:
"""Determine if the PCI requests can be met.
@@ -833,10 +835,10 @@ class PciDeviceStats(object):
def _apply_request(
self,
pools: ty.List[Pool],
pools: list[Pool],
request: 'objects.InstancePCIRequest',
rp_uuids: ty.List[str],
numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None,
rp_uuids: list[str],
numa_cells: list['objects.InstanceNUMACell'] | None = None,
) -> bool:
"""Apply an individual PCI request.
@@ -895,9 +897,9 @@ class PciDeviceStats(object):
def _get_rp_uuids_for_request(
self,
provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]],
provider_mapping: dict[str, list[str]] | None,
request: 'objects.InstancePCIRequest'
) -> ty.List[str]:
) -> list[str]:
"""Return the list of RP uuids that are fulfilling the request.
An RP will be in the list as many times as many devices needs to
@@ -937,9 +939,9 @@ class PciDeviceStats(object):
def apply_requests(
self,
requests: ty.List['objects.InstancePCIRequest'],
provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]],
numa_cells: ty.Optional[ty.List['objects.InstanceNUMACell']] = None,
requests: list['objects.InstancePCIRequest'],
provider_mapping: dict[str, list[str]] | None,
numa_cells: list['objects.InstanceNUMACell'] | None = None,
) -> None:
"""Apply PCI requests to the PCI stats.
@@ -970,8 +972,8 @@ class PciDeviceStats(object):
if not self._apply_request(self.pools, r, rp_uuids, numa_cells):
raise exception.PciDeviceRequestFailed(requests=requests)
def __iter__(self) -> ty.Iterator[Pool]:
pools: ty.List[Pool] = []
def __iter__(self) -> collections.abc.Iterator[Pool]:
pools: list[Pool] = []
for pool in self.pools:
pool = copy.deepcopy(pool)
# 'devices' shouldn't be part of stats
@@ -1045,7 +1047,7 @@ class PciDeviceStats(object):
pool['rp_uuid'] = next(iter(pool_rps))
@staticmethod
def _assert_one_pool_per_rp_uuid(pools: ty.List[Pool]) -> bool:
def _assert_one_pool_per_rp_uuid(pools: list[Pool]) -> bool:
"""Asserts that each pool has a unique rp_uuid if any
:param pools: A list of Pool objects.
+6 -6
View File
@@ -38,7 +38,7 @@ _SRIOV_TOTALVFS = "sriov_totalvfs"
def pci_device_prop_match(
pci_dev: 'stats.Pool', specs: ty.List[ty.Dict[str, str]],
pci_dev: 'stats.Pool', specs: list[dict[str, str]],
) -> bool:
"""Check if the pci_dev meet spec requirement
@@ -53,7 +53,7 @@ def pci_device_prop_match(
"""
def _matching_devices(spec: ty.Dict[str, str]) -> bool:
def _matching_devices(spec: dict[str, str]) -> bool:
for k, v in spec.items():
pci_dev_v = pci_dev.get(k)
if isinstance(v, list) and isinstance(pci_dev_v, list):
@@ -87,7 +87,7 @@ def parse_address(address: str) -> ty.Sequence[str]:
return m.groups()
def get_pci_address_fields(pci_addr: str) -> ty.Tuple[str, str, str, str]:
def get_pci_address_fields(pci_addr: str) -> tuple[str, str, str, str]:
"""Parse a fully-specified PCI device address.
Does not validate that the components are valid hex or wildcard values.
@@ -111,7 +111,7 @@ def get_pci_address(domain: str, bus: str, slot: str, func: str) -> str:
return '%s:%s:%s.%s' % (domain, bus, slot, func)
def get_function_by_ifname(ifname: str) -> ty.Tuple[ty.Optional[str], bool]:
def get_function_by_ifname(ifname: str) -> tuple[str | None, bool]:
"""Given the device name, returns the PCI address of a device
and returns True if the address is in a physical function.
"""
@@ -246,14 +246,14 @@ def get_vf_product_id_by_pf_addr(pci_addr: str) -> str:
return vf_product_id
def get_pci_ids_by_pci_addr(pci_addr: str) -> ty.Tuple[str, ...]:
def get_pci_ids_by_pci_addr(pci_addr: str) -> tuple[str, ...]:
"""Get the product ID and vendor ID for a given PCI device.
:param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>".
:return: A list containing a vendor and product ids.
"""
id_prefix = f"/sys/bus/pci/devices/{pci_addr}"
ids: ty.List[str] = []
ids: list[str] = []
for id_name in ("vendor", "product"):
try:
with open(os.path.join(id_prefix, id_name)) as f:
+5 -5
View File
@@ -33,7 +33,7 @@ class Whitelist(object):
assignable.
"""
def __init__(self, whitelist_spec: ty.Optional[str] = None) -> None:
def __init__(self, whitelist_spec: str | None = None) -> None:
"""White list constructor
For example, the following json string specifies that devices whose
@@ -55,7 +55,7 @@ class Whitelist(object):
@staticmethod
def _parse_white_list_from_config(
whitelists: str,
) -> ty.List[devspec.PciDeviceSpec]:
) -> list[devspec.PciDeviceSpec]:
"""Parse and validate the pci whitelist from the nova config."""
specs = []
for jsonspec in whitelists:
@@ -83,8 +83,8 @@ class Whitelist(object):
return specs
def device_assignable(
self, dev: ty.Dict[str, ty.Any]
) -> ty.Optional[devspec.PciDeviceSpec]:
self, dev: dict[str, ty.Any]
) -> devspec.PciDeviceSpec | None:
"""Check if a device is part of pci device_spec (whitelist) and so
can be assigned to a guest.
If yes return the spec, else return None
@@ -99,7 +99,7 @@ class Whitelist(object):
def get_devspec(
self, pci_dev: 'objects.PciDevice',
) -> ty.Optional[devspec.PciDeviceSpec]:
) -> devspec.PciDeviceSpec | None:
for spec in self.specs:
if spec.match_pci_obj(pci_dev):
return spec
+4 -4
View File
@@ -54,12 +54,12 @@ def convert_image(source, dest, in_format, out_format, instances_path,
def unprivileged_convert_image(
source: str,
dest: str,
in_format: ty.Optional[str],
in_format: str | None,
out_format: str,
instances_path: str,
compress: bool,
src_encryption: ty.Optional[EncryptionOptions] = None,
dest_encryption: ty.Optional[EncryptionOptions] = None,
src_encryption: EncryptionOptions | None = None,
dest_encryption: EncryptionOptions | None = None,
) -> None:
"""Disk image conversion with qemu-img
@@ -126,7 +126,7 @@ def unprivileged_convert_image(
src_secret_file = None
dest_secret_file = None
encryption_opts: ty.List[str] = []
encryption_opts: list[str] = []
with contextlib.ExitStack() as stack:
if src_encryption:
src_secret_file = stack.enter_context(
+9 -9
View File
@@ -14,12 +14,12 @@
# under the License.
import collections
import collections.abc
import contextlib
import copy
import functools
import random
import time
import typing as ty
from keystoneauth1 import exceptions as ks_exc
import os_resource_classes as orc
@@ -233,7 +233,7 @@ class SchedulerReportClient(object):
# provider and inventory information
self._provider_tree: provider_tree.ProviderTree = None
# Track the last time we updated providers' aggregates and traits
self._association_refresh_time: ty.Dict[str, float] = {}
self._association_refresh_time: dict[str, float] = {}
self._client = self._create_client()
# NOTE(danms): Keep track of how naggy we've been
self._warn_count = 0
@@ -1064,8 +1064,8 @@ class SchedulerReportClient(object):
self,
context: nova_context.RequestContext,
rp_uuid: str,
traits: ty.Iterable[str],
generation: ty.Optional[int] = None
traits: collections.abc.Iterable[str],
generation: int | None = None
):
"""Replace a provider's traits with those specified.
@@ -1699,7 +1699,7 @@ class SchedulerReportClient(object):
self,
context: nova_context.RequestContext,
consumer_uuid: str,
resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]],
resources: dict[str, dict[str, dict[str, int]]],
) -> None:
"""Adds certain resources to the current allocation of the
consumer.
@@ -1750,7 +1750,7 @@ class SchedulerReportClient(object):
self,
context: nova_context.RequestContext,
consumer_uuid: str,
resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]],
resources: dict[str, dict[str, dict[str, int]]],
) -> bool:
current_allocs = self.get_allocs_for_consumer(context, consumer_uuid)
@@ -1790,7 +1790,7 @@ class SchedulerReportClient(object):
self,
context: nova_context.RequestContext,
consumer_uuid: str,
resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]]
resources: dict[str, dict[str, dict[str, int]]]
) -> None:
"""Removes certain resources from the current allocation of the
consumer.
@@ -1840,7 +1840,7 @@ class SchedulerReportClient(object):
self,
context: nova_context.RequestContext,
consumer_uuid: str,
resources: ty.Dict[str, ty.Dict[str, ty.Dict[str, int]]]
resources: dict[str, dict[str, dict[str, int]]]
) -> bool:
if not resources:
# Nothing to remove so do not query or update allocation in
@@ -2613,7 +2613,7 @@ class SchedulerReportClient(object):
pcpus = usages['usages'].get(orc.PCPU, 0)
return vcpus + pcpus
total_counts: ty.Dict[str, ty.Dict[str, int]] = {'project': {}}
total_counts: dict[str, dict[str, int]] = {'project': {}}
# First query counts across all users of a project
LOG.debug('Getting usages for project_id %s from placement',
project_id)
+11 -10
View File
@@ -17,7 +17,6 @@
import collections
import re
import sys
import typing as ty
from urllib import parse
import os_resource_classes as orc
@@ -63,14 +62,14 @@ class ResourceRequest(object):
Do not call this directly, use the existing static factory methods
from_*()
"""
self._rg_by_id: ty.Dict[str, objects.RequestGroup] = {}
self._group_policy: ty.Optional[str] = None
self._rg_by_id: dict[str, objects.RequestGroup] = {}
self._group_policy: str | None = None
# Default to the configured limit but _limit can be
# set to None to indicate "no limit".
self._limit = CONF.scheduler.max_placement_results
self._root_required: ty.Set[str] = set()
self._root_forbidden: ty.Set[str] = set()
self._same_subtree: ty.List[ty.List[str]] = []
self._root_required: set[str] = set()
self._root_forbidden: set[str] = set()
self._same_subtree: list[list[str]] = []
self.suffixed_groups_from_flavor = 0
# TODO(stephenfin): Remove this parameter once we drop support for
# 'vcpu_pin_set'
@@ -205,7 +204,7 @@ class ResourceRequest(object):
@classmethod
def from_request_groups(
cls,
request_groups: ty.List['objects.RequestGroup'],
request_groups: list['objects.RequestGroup'],
request_level_params: 'objects.RequestLevelParams',
group_policy: str,
) -> 'ResourceRequest':
@@ -351,7 +350,8 @@ class ResourceRequest(object):
if not vpmem_labels:
# No vpmems required
return
amount_by_rc: ty.DefaultDict[str, int] = collections.defaultdict(int)
amount_by_rc: collections.defaultdict[
str, int] = collections.defaultdict(int)
for vpmem_label in vpmem_labels:
resource_class = orc.normalize_name(
"PMEM_NAMESPACE_" + vpmem_label)
@@ -535,7 +535,8 @@ class ResourceRequest(object):
:return: A dict of the form {resource_class: amount}
"""
ret: ty.DefaultDict[str, int] = collections.defaultdict(lambda: 0)
ret: collections.defaultdict[
str, int] = collections.defaultdict(lambda: 0)
for rg in self._rg_by_id.values():
for resource_class, amount in rg.resources.items():
ret[resource_class] += amount
@@ -579,7 +580,7 @@ class ResourceRequest(object):
@property
def all_required_traits(self):
traits: ty.Set[str] = set()
traits: set[str] = set()
for rr in self._rg_by_id.values():
traits = traits.union(rr.required_traits)
return traits
+7 -8
View File
@@ -16,7 +16,6 @@ Handles all requests relating to shares + manila.
from dataclasses import dataclass
import functools
from typing import Optional
from openstack import exceptions as sdk_exc
from oslo_log import log as logging
@@ -53,18 +52,18 @@ def _manilaclient(context, admin=False):
class Share():
id: str
size: int
availability_zone: Optional[str]
availability_zone: str | None
created_at: str
status: str
name: Optional[str]
description: Optional[str]
name: str | None
description: str | None
project_id: str
snapshot_id: Optional[str]
share_network_id: Optional[str]
snapshot_id: str | None
share_network_id: str | None
share_proto: str
export_location: str
metadata: dict
share_type: Optional[str]
share_type: str | None
is_public: bool
@classmethod
@@ -95,7 +94,7 @@ class Access():
state: str
access_type: str
access_to: str
access_key: Optional[str]
access_key: str | None
@classmethod
def from_manila_access(cls, manila_access):
+1 -2
View File
@@ -17,7 +17,6 @@ import os
import sys
import textwrap
import time
import typing as ty
from unittest import mock
import fixtures
@@ -1126,7 +1125,7 @@ class NodeDevice(object):
def name(self) -> str:
return self._name
def listCaps(self) -> ty.List[str]:
def listCaps(self) -> list[str]:
return [self.name().split('_')[0]]
+1 -3
View File
@@ -44,8 +44,6 @@ from nova.tests import fixtures as nova_fixtures
from nova.tests.functional.api import client as api_client
from nova.tests.functional import fixtures as func_fixtures
from nova import utils
import typing as ty
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
@@ -728,7 +726,7 @@ class InstanceHelperMixin:
{'createImage': {'name': snapshot_name}}
)
def _attach_volumes(self, server, vol_ids: ty.List[str]):
def _attach_volumes(self, server, vol_ids: list[str]):
# attach volumes to server
# these attachments are done by nova api, that means
# nova know about these attachments and so they are valid ones.
@@ -15,7 +15,6 @@
import copy
import pprint
import typing as ty
from unittest import mock
from urllib import parse as urlparse
@@ -87,7 +86,7 @@ class PciPlacementHealingFixture(fixtures.Fixture):
)
)
def last_healing(self, hostname: str) -> ty.Optional[ty.Tuple[dict, dict]]:
def last_healing(self, hostname: str) -> tuple[dict, dict] | None:
for h, updated, before, after in self.calls:
if h == hostname and updated:
return before, after
+3 -2
View File
@@ -20,6 +20,7 @@ Driver base-classes:
types that support that contract
"""
from collections.abc import Mapping
import dataclasses
import itertools
import sys
@@ -190,8 +191,8 @@ def block_device_info_get_mapping(block_device_info):
def block_device_info_get_encrypted_disks(
block_device_info: ty.Mapping[str, ty.Any],
) -> ty.List['nova.virt.block_device.DriverBlockDevice']:
block_device_info: Mapping[str, ty.Any],
) -> list['nova.virt.block_device.DriverBlockDevice']:
block_device_info = block_device_info or {}
# swap is a single device, not a list
+48 -48
View File
@@ -106,7 +106,7 @@ def get_cpu_shared_set():
return shared_ids
def parse_cpu_spec(spec: str) -> ty.Set[int]:
def parse_cpu_spec(spec: str) -> set[int]:
"""Parse a CPU set specification.
Each element in the list is either a single CPU number, a range of
@@ -117,8 +117,8 @@ def parse_cpu_spec(spec: str) -> ty.Set[int]:
:returns: a set of CPU indexes
"""
cpuset_ids: ty.Set[int] = set()
cpuset_reject_ids: ty.Set[int] = set()
cpuset_ids: set[int] = set()
cpuset_reject_ids: set[int] = set()
for rule in spec.split(','):
rule = rule.strip()
# Handle multi ','
@@ -169,7 +169,7 @@ def parse_cpu_spec(spec: str) -> ty.Set[int]:
def format_cpu_spec(
cpuset: ty.Set[int],
cpuset: set[int],
allow_ranges: bool = True,
) -> str:
"""Format a libvirt CPU range specification.
@@ -189,7 +189,7 @@ def format_cpu_spec(
# trying to do range negations to minimize the overall
# spec string length
if allow_ranges:
ranges: ty.List[ty.List[int]] = []
ranges: list[list[int]] = []
previndex = None
for cpuindex in sorted(cpuset):
if previndex is None or previndex != (cpuindex - 1):
@@ -535,7 +535,7 @@ def _sort_possible_cpu_topologies(possible, wanttopology):
# We don't use python's sort(), since we want to
# preserve the sorting done when populating the
# 'possible' list originally
scores: ty.Dict[int, ty.List['objects.VirtCPUTopology']] = (
scores: dict[int, list['objects.VirtCPUTopology']] = (
collections.defaultdict(list)
)
for topology in possible:
@@ -683,7 +683,7 @@ def _pack_instance_onto_cores(host_cell, instance_cell,
# We build up a data structure that answers the question: 'Given the
# number of threads I want to pack, give me a list of all the available
# sibling sets (or groups thereof) that can accommodate it'
sibling_sets: ty.Dict[int, ty.List[ty.Set[int]]] = (
sibling_sets: dict[int, list[set[int]]] = (
collections.defaultdict(list)
)
for sib in host_cell.free_siblings:
@@ -922,9 +922,9 @@ def _pack_instance_onto_cores(host_cell, instance_cell,
def _numa_fit_instance_cell(
host_cell: 'objects.NUMACell',
instance_cell: 'objects.InstanceNUMACell',
limits: ty.Optional['objects.NUMATopologyLimits'] = None,
limits: 'objects.NUMATopologyLimits | None' = None,
cpuset_reserved: int = 0,
) -> ty.Optional['objects.InstanceNUMACell']:
) -> 'objects.InstanceNUMACell | None':
"""Ensure an instance cell can fit onto a host cell
Ensure an instance cell can fit onto a host cell and, if so, return
@@ -1114,7 +1114,7 @@ def _get_flavor_image_meta(
image_meta: 'objects.ImageMeta',
default: ty.Any = None,
prefix: str = 'hw',
) -> ty.Tuple[ty.Any, ty.Any]:
) -> tuple[ty.Any, ty.Any]:
"""Extract both flavor- and image-based variants of metadata."""
flavor_key = ':'.join([prefix, key])
image_key = '_'.join([prefix, key])
@@ -1160,8 +1160,8 @@ def _get_unique_flavor_image_meta(
def get_mem_encryption_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
machine_type: ty.Optional[str] = None,
) -> ty.Optional[MemEncryptionConfig]:
machine_type: str | None = None,
) -> MemEncryptionConfig | None:
"""Return memory encryption context requested either via flavor extra specs
or image properties (or both).
@@ -1374,7 +1374,7 @@ def _check_mem_encryption_machine_type(image_meta, machine_type=None):
def _get_numa_pagesize_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[int]:
) -> int | None:
"""Return the requested memory page size
:param flavor: a Flavor object to read extra specs from
@@ -1442,7 +1442,7 @@ def _get_constraint_mappings_from_flavor(flavor, key, func):
def get_locked_memory_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[bool]:
) -> bool | None:
"""Validate and return the requested locked memory.
:param flavor: ``nova.objects.Flavor`` instance
@@ -1484,7 +1484,7 @@ def get_locked_memory_constraint(
def _get_numa_cpu_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[ty.List[ty.Set[int]]]:
) -> list[set[int]] | None:
"""Validate and return the requested guest NUMA-guest CPU mapping.
Extract the user-provided mapping of guest CPUs to guest NUMA nodes. For
@@ -1516,7 +1516,7 @@ def _get_numa_cpu_constraint(
def _get_numa_mem_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[ty.List[int]]:
) -> list[int] | None:
"""Validate and return the requested guest NUMA-guest memory mapping.
Extract the user-provided mapping of guest memory to guest NUMA nodes. For
@@ -1548,7 +1548,7 @@ def _get_numa_mem_constraint(
def _get_numa_node_count_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[int]:
) -> int | None:
"""Validate and return the requested NUMA nodes.
:param flavor: ``nova.objects.Flavor`` instance
@@ -1577,7 +1577,7 @@ def _get_numa_node_count_constraint(
def get_cpu_policy_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
"""Validate and return the requested CPU policy.
:param flavor: ``nova.objects.Flavor`` instance
@@ -1628,7 +1628,7 @@ def get_cpu_policy_constraint(
def get_cpu_thread_policy_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
"""Validate and return the requested CPU thread policy.
:param flavor: ``nova.objects.Flavor`` instance
@@ -1669,8 +1669,8 @@ def get_cpu_thread_policy_constraint(
def _get_numa_topology_auto(
nodes: int,
flavor: 'objects.Flavor',
vcpus: ty.Set[int],
pcpus: ty.Set[int],
vcpus: set[int],
pcpus: set[int],
) -> 'objects.InstanceNUMATopology':
"""Generate a NUMA topology automatically based on CPUs and memory.
@@ -1702,10 +1702,10 @@ def _get_numa_topology_auto(
def _get_numa_topology_manual(
nodes: int,
flavor: 'objects.Flavor',
vcpus: ty.Set[int],
pcpus: ty.Set[int],
cpu_list: ty.List[ty.Set[int]],
mem_list: ty.List[int],
vcpus: set[int],
pcpus: set[int],
cpu_list: list[set[int]],
mem_list: list[int],
) -> 'objects.InstanceNUMATopology':
"""Generate a NUMA topology based on user-provided NUMA topology hints.
@@ -1763,7 +1763,7 @@ def is_realtime_enabled(flavor):
def _get_vcpu_pcpu_resources(
flavor: 'objects.Flavor',
) -> ty.Tuple[int, int]:
) -> tuple[int, int]:
requested_vcpu = 0
requested_pcpu = 0
@@ -1787,7 +1787,7 @@ def _get_vcpu_pcpu_resources(
def _get_hyperthreading_trait(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
for key, val in flavor.get('extra_specs', {}).items():
if re.match('trait([1-9][0-9]*)?:%s' % os_traits.HW_CPU_HYPERTHREADING,
key):
@@ -1803,7 +1803,7 @@ def _get_hyperthreading_trait(
# NOTE(stephenfin): This must be public as it's used elsewhere
def get_dedicated_cpu_constraint(
flavor: 'objects.Flavor',
) -> ty.Optional[ty.Set[int]]:
) -> set[int] | None:
"""Validate and return the requested dedicated CPU mask.
:param flavor: ``nova.objects.Flavor`` instance
@@ -1835,7 +1835,7 @@ def get_dedicated_cpu_constraint(
def get_realtime_cpu_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[ty.Set[int]]:
) -> set[int] | None:
"""Validate and return the requested realtime CPU mask.
:param flavor: ``nova.objects.Flavor`` instance
@@ -1881,7 +1881,7 @@ def get_realtime_cpu_constraint(
# NOTE(stephenfin): This must be public as it's used elsewhere
def get_emulator_thread_policy_constraint(
flavor: 'objects.Flavor',
) -> ty.Optional[str]:
) -> str | None:
"""Validate and return the requested emulator threads policy.
:param flavor: ``nova.objects.Flavor`` instance
@@ -1931,7 +1931,7 @@ def get_pci_numa_policy_constraint(
def get_pmu_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[bool]:
) -> bool | None:
"""Validate and return the requested vPMU configuration.
This one's a little different since we don't return False in the default
@@ -2085,7 +2085,7 @@ def get_packed_virtqueue_constraint(
def get_vtpm_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[VTPMConfig]:
) -> VTPMConfig | None:
"""Validate and return the requested vTPM configuration.
:param flavor: ``nova.objects.Flavor`` instance
@@ -2125,7 +2125,7 @@ def get_vtpm_constraint(
def get_tpm_secret_security_constraint(
flavor: 'objects.Flavor',
) -> ty.Optional[str]:
) -> str | None:
# NOTE(melwitt): An image property for TPM secret security is intentionally
# not provided because server rebuild is blocked in the API. If a user were
# to create a server with a given TPM secret security policy via an image
@@ -2140,7 +2140,7 @@ def get_tpm_secret_security_constraint(
def get_secure_boot_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
"""Validate and return the requested secure boot policy.
:param flavor: ``nova.objects.Flavor`` instance
@@ -2406,7 +2406,7 @@ def numa_get_constraints(flavor, image_meta):
def _numa_cells_support_network_metadata(
host_topology: 'objects.NUMATopology',
chosen_host_cells: ty.List['objects.NUMACell'],
chosen_host_cells: list['objects.NUMACell'],
network_metadata: 'objects.NetworkMetadata',
) -> bool:
"""Determine whether the cells can accept the network requests.
@@ -2424,7 +2424,7 @@ def _numa_cells_support_network_metadata(
if not network_metadata:
return True
required_physnets: ty.Set[str] = set()
required_physnets: set[str] = set()
if 'physnets' in network_metadata:
# use set() to avoid modifying the original data structure
required_physnets = set(network_metadata.physnets)
@@ -2509,10 +2509,10 @@ def _numa_cells_support_network_metadata(
def numa_fit_instance_to_host(
host_topology: 'objects.NUMATopology',
instance_topology: 'objects.InstanceNUMATopology',
provider_mapping: ty.Optional[ty.Dict[str, ty.List[str]]],
limits: ty.Optional['objects.NUMATopologyLimits'] = None,
pci_requests: ty.Optional['objects.InstancePCIRequests'] = None,
pci_stats: ty.Optional[stats.PciDeviceStats] = None,
provider_mapping: dict[str, list[str]] | None,
limits: 'objects.NUMATopologyLimits | None' = None,
pci_requests: 'objects.InstancePCIRequests | None' = None,
pci_stats: stats.PciDeviceStats | None = None,
):
"""Fit the instance topology onto the host topology.
@@ -2602,7 +2602,7 @@ def numa_fit_instance_to_host(
if pci_stats:
# Create dict with numa cell id as key
# and total number of free pci devices as value.
total_pci_in_cell: ty.Dict[int, int] = {}
total_pci_in_cell: dict[int, int] = {}
for pool in pci_stats.pools:
if pool['numa_node'] in list(total_pci_in_cell):
total_pci_in_cell[pool['numa_node']] += pool['count']
@@ -2630,8 +2630,8 @@ def numa_fit_instance_to_host(
fit_cache = set()
for host_cell_perm in itertools.permutations(
host_cells, len(instance_topology)):
chosen_instance_cells: ty.List['objects.InstanceNUMACell'] = []
chosen_host_cells: ty.List['objects.NUMACell'] = []
chosen_instance_cells: list['objects.InstanceNUMACell'] = []
chosen_host_cells: list['objects.NUMACell'] = []
for host_cell, instance_cell in zip(
host_cell_perm, instance_topology.cells):
@@ -2714,7 +2714,7 @@ def numa_get_reserved_huge_pages():
return {}
try:
bucket: ty.Dict[int, ty.Dict[int, int]] = collections.defaultdict(dict)
bucket: dict[int, dict[int, int]] = collections.defaultdict(dict)
for cfg in CONF.reserved_huge_pages:
try:
pagesize = int(cfg['size'])
@@ -2864,7 +2864,7 @@ def get_vpmems(flavor):
def get_maxphysaddr_mode(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
"""Return maxphysaddr mode.
:param flavor: a flavor object to read extra specs from
@@ -2944,7 +2944,7 @@ def get_ephemeral_encryption_constraint(
def get_ephemeral_encryption_format(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
"""Get the ephemeral encryption format.
:param flavor: an objects.Flavor object
@@ -2995,7 +2995,7 @@ def check_shares_supported(context, instance):
def get_sound_model(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
"""Get the sound device model, if any.
:param flavor: ``nova.objects.Flavor`` instance
@@ -3019,7 +3019,7 @@ def get_sound_model(
def get_usb_model(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
) -> ty.Optional[str]:
) -> str | None:
"""Get the USB controller model, if any.
:param flavor: ``nova.objects.Flavor`` instance
+1 -2
View File
@@ -24,7 +24,6 @@ helpers for populating up config object instances.
"""
import time
import typing as ty
from collections import OrderedDict
from lxml import etree
@@ -104,7 +103,7 @@ class LibvirtConfigObject(object):
return xml_str
@classmethod
def parse_on_off_str(self, value: ty.Optional[str]) -> bool:
def parse_on_off_str(self, value: str | None) -> bool:
if value is not None and value not in ('on', 'off'):
msg = _(
"Element should contain either 'on' or 'off'; "
+3 -4
View File
@@ -11,7 +11,6 @@
# under the License.
from dataclasses import dataclass
import typing as ty
from oslo_log import log as logging
@@ -60,7 +59,7 @@ class Core:
return str(self.ident)
@property
def governor(self) -> ty.Optional[str]:
def governor(self) -> str | None:
try:
return core.get_governor(self.ident)
# NOTE(sbauza): cpufreq/scaling_governor is not enabled for some OS
@@ -91,7 +90,7 @@ class API(object):
"""
return Core(i)
def power_up(self, cpus: ty.Set[int]) -> None:
def power_up(self, cpus: set[int]) -> None:
if not CONF.libvirt.cpu_power_management:
return
cpu_dedicated_set = hardware.get_cpu_dedicated_set_nozero() or set()
@@ -123,7 +122,7 @@ class API(object):
pcpus = pcpus.union(pins)
self.power_up(pcpus)
def _power_down(self, cpus: ty.Set[int]) -> None:
def _power_down(self, cpus: set[int]) -> None:
if not CONF.libvirt.cpu_power_management:
return
cpu_dedicated_set = hardware.get_cpu_dedicated_set_nozero() or set()
+1 -2
View File
@@ -11,7 +11,6 @@
# under the License.
import os
import typing as ty
from oslo_log import log as logging
@@ -27,7 +26,7 @@ AVAILABLE_PATH = '/sys/devices/system/cpu/present'
CPU_PATH_TEMPLATE = '/sys/devices/system/cpu/cpu%(core)s'
def get_available_cores() -> ty.Set[int]:
def get_available_cores() -> set[int]:
cores = filesystem.read_sys(AVAILABLE_PATH)
return hardware.parse_cpu_spec(cores) if cores else set()
+77 -73
View File
@@ -26,6 +26,7 @@ Supports KVM, LXC, QEMU, and Parallels.
import binascii
import collections
from collections.abc import Callable
from collections import deque
import contextlib
import copy
@@ -260,6 +261,12 @@ REGISTER_IMAGE_PROPERTY_DEFAULTS = [
'hw_vif_model',
]
# Type Aliases
DetachableDevice: ty.TypeAlias = (
vconfig.LibvirtConfigGuestDisk | vconfig.LibvirtConfigGuestInterface
)
class AsyncDeviceEventsHandler:
"""A synchornization point between libvirt events an clients waiting for
@@ -278,13 +285,13 @@ class AsyncDeviceEventsHandler:
self,
instance_uuid: str,
device_name: str,
event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]]
event_types: set[type[libvirtevent.DeviceEvent]]
):
self.instance_uuid = instance_uuid
self.device_name = device_name
self.event_types = event_types
self.threading_event = threading.Event()
self.result: ty.Optional[libvirtevent.DeviceEvent] = None
self.result: libvirtevent.DeviceEvent | None = None
def matches(self, event: libvirtevent.DeviceEvent) -> bool:
"""Returns true if the event is one of the expected event types
@@ -306,13 +313,13 @@ class AsyncDeviceEventsHandler:
self._lock = threading.Lock()
# Ongoing device operations in libvirt where we wait for the events
# about success or failure.
self._waiters: ty.Set[AsyncDeviceEventsHandler.Waiter] = set()
self._waiters: set[AsyncDeviceEventsHandler.Waiter] = set()
def create_waiter(
self,
instance_uuid: str,
device_name: str,
event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]]
event_types: set[type[libvirtevent.DeviceEvent]]
) -> 'AsyncDeviceEventsHandler.Waiter':
"""Returns an opaque token the caller can use in wait() to
wait for the libvirt event
@@ -344,7 +351,7 @@ class AsyncDeviceEventsHandler:
def wait(
self, token: 'AsyncDeviceEventsHandler.Waiter', timeout: float,
) -> ty.Optional[libvirtevent.DeviceEvent]:
) -> libvirtevent.DeviceEvent | None:
"""Blocks waiting for the libvirt event represented by the opaque token
:param token: A token created by calling create_waiter()
@@ -398,6 +405,13 @@ class AsyncDeviceEventsHandler:
instance_waiters)
class GetDeviceConfFunction(ty.Protocol):
def __call__(
self, from_persistent_config: bool = ..., **kwargs,
) -> vconfig.LibvirtConfigGuestDevice:
...
class LibvirtDriver(driver.ComputeDriver):
def __init__(self, virtapi, read_only=False):
if libvirt is None:
@@ -467,7 +481,7 @@ class LibvirtDriver(driver.ComputeDriver):
self.vif_driver = libvirt_vif.LibvirtGenericVIFDriver(self._host)
# NOTE(lyarwood): Volume drivers are loaded on-demand
self.volume_drivers: ty.Dict[str, volume.LibvirtBaseVolumeDriver] = {}
self.volume_drivers: dict[str, volume.LibvirtBaseVolumeDriver] = {}
self._disk_cachemode = None
self.image_cache_manager = imagecache.ImageCacheManager()
@@ -535,7 +549,7 @@ class LibvirtDriver(driver.ComputeDriver):
# This dict is for knowing which mdev class is supported by a specific
# PCI device like we do (the key being the PCI address and the value
# the mdev class)
self.mdev_class_mapping: ty.Dict[str, str] = (
self.mdev_class_mapping: dict[str, str] = (
collections.defaultdict(lambda: orc.VGPU)
)
# This set is for knowing all the mdev classes the operator provides
@@ -577,10 +591,10 @@ class LibvirtDriver(driver.ComputeDriver):
return {}, {}
# vpmem keyed by name {name: objects.LibvirtVPMEMDevice,...}
vpmems_by_name: ty.Dict[str, 'objects.LibvirtVPMEMDevice'] = {}
vpmems_by_name: dict[str, 'objects.LibvirtVPMEMDevice'] = {}
# vpmem list keyed by resource class
# {'RC_0': [objects.LibvirtVPMEMDevice, ...], 'RC_1': [...]}
vpmems_by_rc: ty.Dict[str, ty.List['objects.LibvirtVPMEMDevice']] = (
vpmems_by_rc: dict[str, list['objects.LibvirtVPMEMDevice']] = (
collections.defaultdict(list)
)
@@ -1022,9 +1036,9 @@ class LibvirtDriver(driver.ComputeDriver):
self,
instance: 'objects.Instance',
image_property: str,
disk_info: ty.Optional[ty.Dict[str, ty.Any]],
guest_config: ty.Optional[vconfig.LibvirtConfigGuest],
) -> ty.Optional[str]:
disk_info: dict[str, ty.Any] | None,
guest_config: vconfig.LibvirtConfigGuest | None,
) -> str | None:
if image_property == 'hw_machine_type':
return libvirt_utils.get_machine_type(instance.image_meta)
@@ -1998,7 +2012,7 @@ class LibvirtDriver(driver.ComputeDriver):
LOG.debug(e, instance=instance)
def _get_volume_driver(
self, connection_info: ty.Dict[str, ty.Any]
self, connection_info: dict[str, ty.Any]
) -> 'volume.LibvirtBaseVolumeDriver':
"""Fetch the nova.virt.libvirt.volume driver
@@ -2493,9 +2507,7 @@ class LibvirtDriver(driver.ComputeDriver):
self,
guest: libvirt_guest.Guest,
instance_uuid: str,
# to properly typehint this param we would need typing.Protocol but
# that is only available since python 3.8
get_device_conf_func: ty.Callable,
get_device_conf_func: GetDeviceConfFunction,
device_name: str,
) -> None:
"""Detaches a device from the guest
@@ -2579,12 +2591,10 @@ class LibvirtDriver(driver.ComputeDriver):
self,
guest: libvirt_guest.Guest,
instance_uuid: str,
persistent_dev: ty.Union[
vconfig.LibvirtConfigGuestDisk,
vconfig.LibvirtConfigGuestInterface],
get_device_conf_func,
persistent_dev: DetachableDevice,
get_device_conf_func: GetDeviceConfFunction,
device_name: str,
):
) -> None:
LOG.debug(
'Attempting to detach device %s from instance %s from '
'the persistent domain config.', device_name, instance_uuid)
@@ -2613,12 +2623,10 @@ class LibvirtDriver(driver.ComputeDriver):
self,
guest: libvirt_guest.Guest,
instance_uuid: str,
live_dev: ty.Union[
vconfig.LibvirtConfigGuestDisk,
vconfig.LibvirtConfigGuestInterface],
get_device_conf_func,
live_dev: DetachableDevice,
get_device_conf_func: GetDeviceConfFunction,
device_name: str,
):
) -> None:
max_attempts = CONF.libvirt.device_detach_attempts
for attempt in range(max_attempts):
LOG.debug(
@@ -2656,9 +2664,7 @@ class LibvirtDriver(driver.ComputeDriver):
def _detach_from_live_and_wait_for_event(
self,
dev: ty.Union[
vconfig.LibvirtConfigGuestDisk,
vconfig.LibvirtConfigGuestInterface],
dev: DetachableDevice,
guest: libvirt_guest.Guest,
instance_uuid: str,
device_name: str,
@@ -2738,9 +2744,7 @@ class LibvirtDriver(driver.ComputeDriver):
@staticmethod
def _detach_sync(
dev: ty.Union[
vconfig.LibvirtConfigGuestDisk,
vconfig.LibvirtConfigGuestInterface],
dev: DetachableDevice,
guest: libvirt_guest.Guest,
instance_uuid: str,
device_name: str,
@@ -4690,8 +4694,8 @@ class LibvirtDriver(driver.ComputeDriver):
self,
context: nova_context.RequestContext,
instance: 'objects.Instance',
block_device_info: ty.Dict[str, ty.Any],
) -> ty.Optional[ty.Dict[str, ty.Any]]:
block_device_info: dict[str, ty.Any],
) -> dict[str, ty.Any] | None:
"""Add ephemeral encryption attributes to driver BDMs before use."""
encrypted_bdms = driver.block_device_info_get_encrypted_disks(
block_device_info)
@@ -6261,7 +6265,7 @@ class LibvirtDriver(driver.ComputeDriver):
return idmaps
def _get_guest_idmaps(self):
id_maps: ty.List[vconfig.LibvirtConfigGuestIDMap] = []
id_maps: list[vconfig.LibvirtConfigGuestIDMap] = []
if CONF.libvirt.virt_type == 'lxc' and CONF.libvirt.uid_maps:
uid_maps = self._create_idmaps(vconfig.LibvirtConfigGuestUIDMap,
CONF.libvirt.uid_maps)
@@ -6754,7 +6758,7 @@ class LibvirtDriver(driver.ComputeDriver):
def _get_video_type(
self,
image_meta: objects.ImageMeta,
) -> ty.Optional[str]:
) -> str | None:
# NOTE(ldbragst): The following logic returns the video type
# depending on supported defaults given the architecture,
# virtualization type, and features. The video type can
@@ -7186,7 +7190,7 @@ class LibvirtDriver(driver.ComputeDriver):
instance: 'objects.Instance',
inst_path: str,
image_meta: 'objects.ImageMeta',
disk_info: ty.Dict[str, ty.Any],
disk_info: dict[str, ty.Any],
):
if rescue:
self._set_guest_for_rescue(
@@ -7865,7 +7869,7 @@ class LibvirtDriver(driver.ComputeDriver):
self,
guest: vconfig.LibvirtConfigGuest,
image_meta: objects.ImageMeta,
) -> ty.Tuple[ty.Optional[str], ty.Optional[str]]:
) -> tuple[str | None, str | None]:
pointer_bus = image_meta.properties.get('hw_input_bus')
pointer_model = image_meta.properties.get('hw_pointer_model')
@@ -7959,7 +7963,7 @@ class LibvirtDriver(driver.ComputeDriver):
guest: vconfig.LibvirtConfigGuest,
image_meta: 'objects.ImageMeta',
flavor: 'objects.Flavor',
) -> ty.Optional[str]:
) -> str | None:
model = flavor.extra_specs.get(
'hw:viommu_model') or image_meta.properties.get(
'hw_viommu_model')
@@ -8170,7 +8174,7 @@ class LibvirtDriver(driver.ComputeDriver):
self,
context: nova_context.RequestContext,
instance: 'objects.Instance',
) -> ty.Tuple[ty.Any, ty.Optional[str]]:
) -> tuple[ty.Any, str | None]:
"""Get or create a libvirt vTPM secret.
For 'host' TPM secret security, this will look for a local libvirt
@@ -8210,7 +8214,7 @@ class LibvirtDriver(driver.ComputeDriver):
instance: 'objects.Instance',
power_on: bool = True,
pause: bool = False,
post_xml_callback: ty.Optional[ty.Callable] = None,
post_xml_callback: Callable[[], None] | None = None,
) -> libvirt_guest.Guest:
"""Create a Guest from XML.
@@ -8266,11 +8270,11 @@ class LibvirtDriver(driver.ComputeDriver):
xml: str,
instance: 'objects.Instance',
network_info: network_model.NetworkInfo,
block_device_info: ty.Optional[ty.Dict[str, ty.Any]],
block_device_info: dict[str, ty.Any] | None,
power_on: bool = True,
vifs_already_plugged: bool = False,
post_xml_callback: ty.Optional[ty.Callable] = None,
external_events: ty.Optional[ty.List[ty.Tuple[str, str]]] = None,
post_xml_callback: Callable[[], None] | None = None,
external_events: list[tuple[str, str]] | None = None,
cleanup_instance_dir: bool = False,
cleanup_instance_disks: bool = False,
) -> libvirt_guest.Guest:
@@ -8519,7 +8523,7 @@ class LibvirtDriver(driver.ComputeDriver):
@staticmethod
def _get_pci_id_from_libvirt_name(
libvirt_address: str
) -> ty.Optional[str]:
) -> str | None:
"""Returns a PCI ID from a libvirt pci address name.
:param libvirt_address: the libvirt PCI device name,
@@ -8586,7 +8590,7 @@ class LibvirtDriver(driver.ComputeDriver):
mdev device handles for that GPU
"""
counts_per_parent: ty.Dict[str, int] = collections.defaultdict(int)
counts_per_parent: dict[str, int] = collections.defaultdict(int)
mediated_devices = self._get_mediated_devices(types=enabled_mdev_types)
for mdev in mediated_devices:
parent_vgpu_type = self._get_vgpu_type_per_pgpu(mdev['parent'])
@@ -8608,7 +8612,7 @@ class LibvirtDriver(driver.ComputeDriver):
"""
mdev_capable_devices = self._get_mdev_capable_devices(
types=enabled_mdev_types)
counts_per_dev: ty.Dict[str, int] = collections.defaultdict(int)
counts_per_dev: dict[str, int] = collections.defaultdict(int)
for dev in mdev_capable_devices:
# dev_id is the libvirt name for the PCI device,
# eg. pci_0000_84_00_0 which matches a PCI address of 0000:84:00.0
@@ -8647,7 +8651,7 @@ class LibvirtDriver(driver.ComputeDriver):
return {}
inventories = {}
# counting how many mdevs we are currently supporting per type
type_limit_mapping: ty.Dict[str, int] = collections.defaultdict(int)
type_limit_mapping: dict[str, int] = collections.defaultdict(int)
count_per_parent = self._count_mediated_devices(enabled_mdev_types)
for dev_name, count in count_per_parent.items():
mdev_type = self._get_vgpu_type_per_pgpu(dev_name)
@@ -9286,7 +9290,7 @@ class LibvirtDriver(driver.ComputeDriver):
return cell.get(page_size, 0)
def _get_physnet_numa_affinity():
affinities: ty.Dict[int, ty.Set[str]] = {
affinities: dict[int, set[str]] = {
cell.id: set() for cell in topology.cells
}
for physnet in CONF.neutron.physnets:
@@ -9504,7 +9508,7 @@ class LibvirtDriver(driver.ComputeDriver):
# otherwise.
inv = provider_tree.data(nodename).inventory
ratios = self._get_allocation_ratios(inv)
resources: ty.Dict[str, ty.Set['objects.Resource']] = (
resources: dict[str, set['objects.Resource']] = (
collections.defaultdict(set)
)
@@ -9862,11 +9866,11 @@ class LibvirtDriver(driver.ComputeDriver):
return inventories
@property
def static_traits(self) -> ty.Dict[str, bool]:
def static_traits(self) -> dict[str, bool]:
if self._static_traits is not None:
return self._static_traits
traits: ty.Dict[str, bool] = {}
traits: dict[str, bool] = {}
traits.update(self._get_cpu_traits())
traits.update(self._get_packed_virtqueue_traits())
traits.update(self._get_storage_bus_traits())
@@ -10035,7 +10039,7 @@ class LibvirtDriver(driver.ComputeDriver):
:return: dict, keyed by PGPU device ID, to count of VGPUs on that
device
"""
vgpu_count_per_pgpu: ty.Dict[str, int] = collections.defaultdict(int)
vgpu_count_per_pgpu: dict[str, int] = collections.defaultdict(int)
for mdev_uuid in mdev_uuids:
# libvirt name is like mdev_00ead764_fdc0_46b6_8db9_2963f5c815b4
dev_name = libvirt_utils.mdev_uuid2name(mdev_uuid)
@@ -10589,7 +10593,7 @@ class LibvirtDriver(driver.ComputeDriver):
raise exception.MigrationPreCheckError(reason)
dst_mdevs = self._allocate_mdevs(allocs)
dst_mdev_types = self._get_mdev_types_from_uuids(dst_mdevs)
target_mdevs: ty.Dict[str, str] = {}
target_mdevs: dict[str, str] = {}
for src_mdev, src_type in src_mdev_types.items():
for dst_mdev, dst_type in dst_mdev_types.items():
# we want to associate by 1:1 between dst and src mdevs
@@ -11373,7 +11377,7 @@ class LibvirtDriver(driver.ComputeDriver):
migrate_data, future,
disk_paths):
on_migration_failure: ty.Deque[str] = deque()
on_migration_failure: deque[str] = deque()
data_gb = self._live_migration_data_gb(instance, disk_paths)
downtime_steps = list(libvirt_migrate.downtime_steps(data_gb))
migration = migrate_data.migration
@@ -12645,8 +12649,8 @@ class LibvirtDriver(driver.ComputeDriver):
network_info: network_model.NetworkInfo,
image_meta: 'objects.ImageMeta',
resize_instance: bool,
allocations: ty.Dict[str, ty.Any],
block_device_info: ty.Optional[ty.Dict[str, ty.Any]] = None,
allocations: dict[str, ty.Any],
block_device_info: dict[str, ty.Any] | None = None,
power_on: bool = True,
) -> None:
"""Complete the migration process on the destination host."""
@@ -12791,7 +12795,7 @@ class LibvirtDriver(driver.ComputeDriver):
instance: 'objects.Instance',
network_info: network_model.NetworkInfo,
migration: 'objects.Migration',
block_device_info: ty.Optional[ty.Dict[str, ty.Any]] = None,
block_device_info: dict[str, ty.Any] | None = None,
power_on: bool = True,
) -> None:
"""Finish the second half of reverting a resize on the source host."""
@@ -12849,7 +12853,7 @@ class LibvirtDriver(driver.ComputeDriver):
@staticmethod
def _get_io_devices(xml_doc):
"""get the list of io devices from the xml document."""
result: ty.Dict[str, ty.List[str]] = {"volumes": [], "ifaces": []}
result: dict[str, list[str]] = {"volumes": [], "ifaces": []}
try:
doc = etree.fromstring(xml_doc)
except Exception:
@@ -13320,7 +13324,7 @@ class LibvirtDriver(driver.ComputeDriver):
nova.privsep.fs.FS_FORMAT_EXT4,
nova.privsep.fs.FS_FORMAT_XFS]
def _get_tpm_traits(self) -> ty.Dict[str, bool]:
def _get_tpm_traits(self) -> dict[str, bool]:
# Assert or deassert TPM support traits
if not CONF.libvirt.swtpm_enabled:
return {
@@ -13374,7 +13378,7 @@ class LibvirtDriver(driver.ComputeDriver):
return tr
def _get_vif_model_traits(self) -> ty.Dict[str, bool]:
def _get_vif_model_traits(self) -> dict[str, bool]:
"""Get vif model traits based on the currently enabled virt_type.
Not all traits generated by this function may be valid and the result
@@ -13403,14 +13407,14 @@ class LibvirtDriver(driver.ComputeDriver):
in supported_models for model in all_models
}
def _get_iommu_model_traits(self) -> ty.Dict[str, bool]:
def _get_iommu_model_traits(self) -> dict[str, bool]:
"""Get iommu model traits based on the currently enabled virt_type.
Not all traits generated by this function may be valid and the result
should be validated.
:return: A dict of trait names mapped to boolean values.
"""
dom_caps = self._host.get_domain_capabilities()
supported_models: ty.Set[str] = {fields.VIOMMUModel.AUTO}
supported_models: set[str] = {fields.VIOMMUModel.AUTO}
# our min version of qemu/libvirt support q35 and virt machine types.
# They also support the smmuv3 and intel iommu modeles so if the qemu
# binary is available we can report the trait.
@@ -13427,7 +13431,7 @@ class LibvirtDriver(driver.ComputeDriver):
in supported_models for model in fields.VIOMMUModel.ALL
}
def _get_storage_bus_traits(self) -> ty.Dict[str, bool]:
def _get_storage_bus_traits(self) -> dict[str, bool]:
"""Get storage bus traits based on the currently enabled virt_type.
For QEMU and KVM this function uses the information returned by the
@@ -13445,7 +13449,7 @@ class LibvirtDriver(driver.ComputeDriver):
if CONF.libvirt.virt_type in ('qemu', 'kvm'):
dom_caps = self._host.get_domain_capabilities()
supported_buses: ty.Set[str] = set()
supported_buses: set[str] = set()
for arch_type in dom_caps:
for machine_type in dom_caps[arch_type]:
supported_buses.update(
@@ -13462,7 +13466,7 @@ class LibvirtDriver(driver.ComputeDriver):
supported_buses for bus in all_buses
}
def _get_video_model_traits(self) -> ty.Dict[str, bool]:
def _get_video_model_traits(self) -> dict[str, bool]:
"""Get video model traits from libvirt.
Not all traits generated by this function may be valid and the result
@@ -13473,7 +13477,7 @@ class LibvirtDriver(driver.ComputeDriver):
all_models = fields.VideoModel.ALL
dom_caps = self._host.get_domain_capabilities()
supported_models: ty.Set[str] = set()
supported_models: set[str] = set()
for arch_type in dom_caps:
for machine_type in dom_caps[arch_type]:
supported_models.update(
@@ -13486,7 +13490,7 @@ class LibvirtDriver(driver.ComputeDriver):
in supported_models for model in all_models
}
def _get_packed_virtqueue_traits(self) -> ty.Dict[str, bool]:
def _get_packed_virtqueue_traits(self) -> dict[str, bool]:
"""Get Virtio Packed Ring traits to be set on the host's
resource provider.
@@ -13494,7 +13498,7 @@ class LibvirtDriver(driver.ComputeDriver):
"""
return {ot.COMPUTE_NET_VIRTIO_PACKED: True}
def _get_cpu_traits(self) -> ty.Dict[str, bool]:
def _get_cpu_traits(self) -> dict[str, bool]:
"""Get CPU-related traits to be set and unset on the host's resource
provider.
@@ -13507,7 +13511,7 @@ class LibvirtDriver(driver.ComputeDriver):
return traits
def _get_cpu_feature_traits(self) -> ty.Dict[str, bool]:
def _get_cpu_feature_traits(self) -> dict[str, bool]:
"""Get CPU traits of VMs based on guest CPU model config.
1. If mode is 'host-model' or 'host-passthrough', use host's
@@ -13532,7 +13536,7 @@ class LibvirtDriver(driver.ComputeDriver):
caps = deepcopy(self._host.get_capabilities())
if cpu.mode in ('host-model', 'host-passthrough'):
# Account for features in cpu_model_extra_flags conf
host_features: ty.Set[str] = {
host_features: set[str] = {
f.name for f in caps.host.cpu.features | cpu.features
}
return libvirt_utils.cpu_features_to_traits(host_features)
@@ -13547,7 +13551,7 @@ class LibvirtDriver(driver.ComputeDriver):
feature_names = [f.name for f in cpu.features]
return feature_names
features: ty.Set[str] = set()
features: set[str] = set()
# Choose a default CPU model when cpu_mode is not specified
if cpu.mode is None:
caps.host.cpu.model = libvirt_utils.get_cpu_model_from_arch(
@@ -13635,7 +13639,7 @@ class LibvirtDriver(driver.ComputeDriver):
fs.target_dir = share.tag
guest.add_device(fs)
def _get_sound_model_traits(self) -> ty.Dict[str, bool]:
def _get_sound_model_traits(self) -> dict[str, bool]:
"""Determine what sound models are supported.
Not all traits generated by this function may be valid and the result
+1 -2
View File
@@ -9,7 +9,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from nova.virt import event
@@ -27,7 +26,7 @@ class DeviceEvent(LibvirtEvent):
def __init__(self,
uuid: str,
dev: str,
timestamp: ty.Optional[float] = None):
timestamp: float | None = None):
super().__init__(uuid, timestamp)
self.dev = dev
+3 -4
View File
@@ -28,7 +28,6 @@ then used by all the other libvirt related classes
"""
import time
import typing as ty
from lxml import etree
from oslo_log import log as logging
@@ -236,7 +235,7 @@ class Guest:
self,
cfg: vconfig.LibvirtConfigGuestDevice,
from_persistent_config: bool = False
) -> ty.Optional[vconfig.LibvirtConfigGuestDevice]:
) -> vconfig.LibvirtConfigGuestDevice | None:
"""Lookup a full LibvirtConfigGuestDevice with
LibvirtConfigGuesDevice generated
by nova.virt.libvirt.vif.get_config.
@@ -366,7 +365,7 @@ class Guest:
self,
device: str,
from_persistent_config: bool = False
) -> ty.Optional[vconfig.LibvirtConfigGuestDisk]:
) -> vconfig.LibvirtConfigGuestDisk | None:
"""Returns the disk mounted at device
:param device: the name of either the source or the target device
@@ -416,7 +415,7 @@ class Guest:
self,
devtype: vconfig.LibvirtConfigGuestDevice = None,
from_persistent_config: bool = False
) -> ty.List[vconfig.LibvirtConfigGuestDevice]:
) -> list[vconfig.LibvirtConfigGuestDevice]:
"""Returns all devices for a guest
:param devtype: a LibvirtConfigGuestDevice subclass class
+51 -48
View File
@@ -27,6 +27,8 @@ the raw libvirt API. These APIs are then used by all
the other libvirt related classes
"""
from collections.abc import Callable
from collections.abc import Mapping
from collections import defaultdict
import fnmatch
import glob
@@ -128,8 +130,9 @@ class Host(object):
self._read_only = read_only
self._initial_connection = True
self._conn_event_handler = conn_event_handler
self._conn_event_handler_queue: queue.Queue[ty.Callable] = (
queue.Queue())
self._conn_event_handler_queue: queue.Queue[
Callable[[], None]
] = queue.Queue()
self._lifecycle_event_handler = lifecycle_event_handler
self._caps = None
self._domain_caps = None
@@ -138,7 +141,9 @@ class Host(object):
self._wrapped_conn = None
self._wrapped_conn_lock = threading.Lock()
self._event_queue: ty.Optional[queue.Queue[ty.Callable]] = None
self._event_queue: queue.Queue[
virtevent.InstanceEvent | Mapping[str, ty.Any]
] | None = None
self._events_delayed = {}
# Note(toabctl): During a reboot of a domain, STOPPED and
@@ -151,19 +156,19 @@ class Host(object):
self._libvirt_proxy_classes = self._get_libvirt_proxy_classes(libvirt)
self._libvirt_proxy = self._wrap_libvirt_proxy(libvirt)
self._loaders: ty.Optional[ty.List[dict]] = None
self._loaders: list[dict] | None = None
# A number of features are conditional on support in the hardware,
# kernel, QEMU, and/or libvirt. These are determined on demand and
# memoized by various properties below
self._supports_amd_sev: ty.Optional[bool] = None
self._supports_amd_sev_es: ty.Optional[bool] = None
self._max_sev_guests: ty.Optional[int] = None
self._max_sev_es_guests: ty.Optional[int] = None
self._supports_uefi: ty.Optional[bool] = None
self._supports_secure_boot: ty.Optional[bool] = None
self._supports_amd_sev: bool | None = None
self._supports_amd_sev_es: bool | None = None
self._max_sev_guests: int | None = None
self._max_sev_es_guests: int | None = None
self._supports_uefi: bool | None = None
self._supports_secure_boot: bool | None = None
self._has_hyperthreading: ty.Optional[bool] = None
self._has_hyperthreading: bool | None = None
@staticmethod
def _get_libvirt_proxy_classes(libvirt_module):
@@ -396,9 +401,7 @@ class Host(object):
return
while not self._event_queue.empty():
try:
event_type = ty.Union[
virtevent.InstanceEvent, ty.Mapping[str, ty.Any]]
event: event_type = self._event_queue.get(block=False)
event: virtevent.InstanceEvent | Mapping[str, ty.Any] = self._event_queue.get(block=False) # noqa: E501
if issubclass(type(event), virtevent.InstanceEvent):
# call possibly with delay
self._event_emit_delayed(event)
@@ -890,7 +893,7 @@ class Host(object):
if self._domain_caps:
return self._domain_caps
domain_caps: ty.Dict = defaultdict(dict)
domain_caps: dict = defaultdict(dict)
caps = self.get_capabilities()
virt_type = CONF.libvirt.virt_type
@@ -1302,8 +1305,8 @@ class Host(object):
def _get_pcinet_info(
self,
dev: 'libvirt.virNodeDevice',
net_devs: ty.List['libvirt.virNodeDevice']
) -> ty.Optional[ty.List[str]]:
net_devs: list['libvirt.virNodeDevice']
) -> list[str] | None:
"""Returns a dict of NET device."""
net_dev = {dev.parent(): dev for dev in net_devs}.get(dev.name(), None)
if net_dev is None:
@@ -1317,8 +1320,8 @@ class Host(object):
self,
vf_device: 'libvirt.virNodeDevice',
parent_pf_name: str,
candidate_devs: ty.List['libvirt.virNodeDevice']
) -> ty.Optional[vconfig.LibvirtConfigNodeDeviceVpdCap]:
candidate_devs: list['libvirt.virNodeDevice']
) -> vconfig.LibvirtConfigNodeDeviceVpdCap | None:
"""Returns PCI VPD info of a parent device of a PCI VF.
:param vf_device: a VF device object to use for lookup.
@@ -1341,7 +1344,7 @@ class Host(object):
@staticmethod
def _get_vpd_card_serial_number(
dev: 'libvirt.virNodeDevice',
) -> ty.Optional[ty.List[str]]:
) -> list[str] | None:
"""Returns a card serial number stored in PCI VPD (if present)."""
xmlstr = dev.XMLDesc(0)
cfgdev = vconfig.LibvirtConfigNodeDevice()
@@ -1369,19 +1372,19 @@ class Host(object):
self,
devname: str,
dev: 'libvirt.virNodeDevice',
net_devs: ty.List['libvirt.virNodeDevice'],
vdpa_devs: ty.List['libvirt.virNodeDevice'],
pci_devs: ty.List['libvirt.virNodeDevice'],
) -> ty.Dict[str, ty.Union[str, dict]]:
net_devs: list['libvirt.virNodeDevice'],
vdpa_devs: list['libvirt.virNodeDevice'],
pci_devs: list['libvirt.virNodeDevice'],
) -> dict[str, str | dict]:
"""Returns a dict of PCI device."""
def _get_device_type(
cfgdev: vconfig.LibvirtConfigNodeDevice,
pci_address: str,
device: 'libvirt.virNodeDevice',
net_devs: ty.List['libvirt.virNodeDevice'],
vdpa_devs: ty.List['libvirt.virNodeDevice'],
) -> ty.Dict[str, str]:
net_devs: list['libvirt.virNodeDevice'],
vdpa_devs: list['libvirt.virNodeDevice'],
) -> dict[str, str]:
"""Get a PCI device's device type.
An assignable PCI device can be a normal PCI device,
@@ -1437,8 +1440,8 @@ class Host(object):
def _get_vpd_details(
device_dict: dict,
device: 'libvirt.virNodeDevice',
pci_devs: ty.List['libvirt.virNodeDevice']
) -> ty.Dict[str, ty.Any]:
pci_devs: list['libvirt.virNodeDevice']
) -> dict[str, ty.Any]:
"""Get information from PCI VPD (if present).
PCI/PCIe devices may include the optional VPD capability. It may
@@ -1451,7 +1454,7 @@ class Host(object):
the VPD capability or not may be controlled via a vendor-specific
firmware setting.
"""
vpd_info: ty.Dict[str, ty.Any] = {}
vpd_info: dict[str, ty.Any] = {}
# At the time of writing only the serial number had a clear
# use-case. However, the set of fields may be extended.
card_serial_number = self._get_vpd_card_serial_number(device)
@@ -1481,9 +1484,9 @@ class Host(object):
def _get_sriov_netdev_details(
device_dict: dict,
device: 'libvirt.virNodeDevice',
) -> ty.Dict[str, ty.Dict[str, ty.Any]]:
) -> dict[str, dict[str, ty.Any]]:
"""Get SR-IOV related information"""
sriov_info: ty.Dict[str, ty.Any] = {}
sriov_info: dict[str, ty.Any] = {}
if device_dict.get('dev_type') != fields.PciDeviceType.SRIOV_VF:
return sriov_info
@@ -1512,16 +1515,16 @@ class Host(object):
def _get_device_capabilities(
device_dict: dict,
device: 'libvirt.virNodeDevice',
pci_devs: ty.List['libvirt.virNodeDevice'],
net_devs: ty.List['libvirt.virNodeDevice']
) -> ty.Dict[str, ty.Any]:
pci_devs: list['libvirt.virNodeDevice'],
net_devs: list['libvirt.virNodeDevice']
) -> dict[str, ty.Any]:
"""Get PCI VF device's additional capabilities.
If a PCI device is a virtual function, this function reads the PCI
parent's network capabilities (must be always a NIC device) and
appends this information to the device's dictionary.
"""
caps: ty.Dict[str, ty.Any] = {}
caps: dict[str, ty.Any] = {}
if device_dict.get('dev_type') == fields.PciDeviceType.SRIOV_VF:
pcinet_info = self._get_pcinet_info(device, net_devs)
@@ -1612,28 +1615,28 @@ class Host(object):
nodedev = self.get_vdpa_nodedev_by_address(pci_address)
return nodedev.vdpa_capability.dev_path
def list_pci_devices(self, flags: int = 0) -> ty.List[str]:
def list_pci_devices(self, flags: int = 0) -> list[str]:
"""Lookup pci devices.
:returns: a list of strings, names of the virNodeDevice instances
"""
return self._list_devices("pci", flags=flags)
def list_mdev_capable_devices(self, flags: int = 0) -> ty.List[str]:
def list_mdev_capable_devices(self, flags: int = 0) -> list[str]:
"""Lookup devices supporting mdev capabilities.
:returns: a list of strings, names of the virNodeDevice instances
"""
return self._list_devices("mdev_types", flags=flags)
def list_mediated_devices(self, flags: int = 0) -> ty.List[str]:
def list_mediated_devices(self, flags: int = 0) -> list[str]:
"""Lookup mediated devices.
:returns: a list of strings, names of the virNodeDevice instances
"""
return self._list_devices("mdev", flags=flags)
def _list_devices(self, cap, flags: int = 0) -> ty.List[str]:
def _list_devices(self, cap, flags: int = 0) -> list[str]:
"""Lookup devices.
:returns: a list of strings, names of the virNodeDevice instances
@@ -1652,7 +1655,7 @@ class Host(object):
def list_all_devices(
self, flags: int = 0,
) -> ty.List['libvirt.virNodeDevice']:
) -> list['libvirt.virNodeDevice']:
"""Lookup devices.
:param flags: a bitmask of flags to filter the returned devices.
@@ -1891,7 +1894,7 @@ class Host(object):
return False
@property
def supports_vtpm(self) -> ty.Optional[bool]:
def supports_vtpm(self) -> bool | None:
# we only check the host architecture and the first machine type
# because vtpm support is independent from cpu architecture
arch = self.get_capabilities().host.cpu.arch
@@ -1906,7 +1909,7 @@ class Host(object):
return False
@property
def tpm_versions(self) -> ty.Optional[ty.List[str]]:
def tpm_versions(self) -> list[str] | None:
# we only check the host architecture and the first machine type
# because vtpm support is independent from cpu architecture
arch = self.get_capabilities().host.cpu.arch
@@ -1924,7 +1927,7 @@ class Host(object):
return []
@property
def tpm_models(self) -> ty.Optional[ty.List[str]]:
def tpm_models(self) -> list[str] | None:
# we only check the host architecture and the first machine type
# because vtpm support is independent from cpu architecture
arch = self.get_capabilities().host.cpu.arch
@@ -2027,7 +2030,7 @@ class Host(object):
return self._supports_amd_sev_es
@property
def max_sev_guests(self) -> ty.Optional[int]:
def max_sev_guests(self) -> int | None:
"""Determine maximum number of guests with AMD SEV.
"""
if not self.supports_amd_sev:
@@ -2035,7 +2038,7 @@ class Host(object):
return self._max_sev_guests
@property
def max_sev_es_guests(self) -> ty.Optional[int]:
def max_sev_es_guests(self) -> int | None:
"""Determine maximum number of guests with AMD SEV-ES.
"""
if not self.supports_amd_sev:
@@ -2074,7 +2077,7 @@ class Host(object):
return self.has_min_version(lv_ver=(7, 9, 0))
@property
def loaders(self) -> ty.List[dict]:
def loaders(self) -> list[dict]:
"""Retrieve details of loader configuration for the host.
Inspect the firmware metadata files provided by QEMU [1] to retrieve
@@ -2100,7 +2103,7 @@ class Host(object):
arch: str,
machine: str,
has_secure_boot: bool,
) -> ty.Tuple[str, str, bool]:
) -> tuple[str, str, bool]:
"""Get loader for the specified architecture and machine type.
:returns: A the bootloader executable path and the NVRAM
+5 -6
View File
@@ -12,7 +12,6 @@
import itertools
import re
import typing as ty
from nova.compute import vm_states
from nova import context as nova_context
@@ -49,7 +48,7 @@ ALLOWED_UPDATE_VM_STATES = [
def get_machine_type(
context: 'nova_context.RequestContext',
instance_uuid: str,
) -> ty.Optional[str]:
) -> str | None:
"""Get the registered machine type of an instance
:param context: Request context.
@@ -137,7 +136,7 @@ def update_machine_type(
instance_uuid: str,
machine_type: str,
force: bool = False,
) -> ty.Tuple[str, str]:
) -> tuple[str, str]:
"""Set or update the stored machine type of an instance
:param instance_uuid: Instance UUID to update.
@@ -183,7 +182,7 @@ def update_machine_type(
def _get_instances_without_mtype(
context: 'nova_context.RequestContext',
) -> ty.List[objects.instance.Instance]:
) -> list[objects.instance.Instance]:
"""Fetch a list of instance UUIDs from the DB without hw_machine_type set
:param meta: 'sqlalchemy.MetaData' pointing to a given cell DB
@@ -201,8 +200,8 @@ def _get_instances_without_mtype(
def get_instances_without_type(
context: 'nova_context.RequestContext',
cell_uuid: ty.Optional[str] = None,
) -> ty.List[objects.instance.Instance]:
cell_uuid: str | None = None,
) -> list[objects.instance.Instance]:
"""Find instances without hw_machine_type set, optionally within a cell.
:param context: Request context
+29 -27
View File
@@ -18,10 +18,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections.abc import Callable
import grp
import os
import pwd
import re
import subprocess
import tempfile
import typing as ty
import uuid
@@ -105,7 +107,7 @@ CPU_TRAITS_MAPPING = {
}
def make_reverse_cpu_traits_mapping() -> ty.Dict[str, str]:
def make_reverse_cpu_traits_mapping() -> dict[str, str]:
traits_cpu_mapping = dict()
for k, v in CPU_TRAITS_MAPPING.items():
if isinstance(v, tuple):
@@ -131,9 +133,9 @@ class EncryptionOptions(ty.TypedDict):
def create_image(
path: str,
disk_format: str,
disk_size: ty.Optional[ty.Union[str, int]],
backing_file: ty.Optional[str] = None,
encryption: ty.Optional[EncryptionOptions] = None,
disk_size: str | int | None,
backing_file: str | None = None,
encryption: EncryptionOptions | None = None,
safe: bool = False,
) -> None:
"""Disk image creation with qemu-img
@@ -262,7 +264,7 @@ def create_image(
def create_ploop_image(
disk_format: str, path: str, size: ty.Union[int, str], fs_type: str,
disk_format: str, path: str, size: int | str, fs_type: str,
) -> None:
"""Create ploop image
@@ -284,7 +286,7 @@ def create_ploop_image(
nova.privsep.libvirt.ploop_init(size, disk_format, fs_type, disk_path)
def get_disk_size(path: str, format: ty.Optional[str] = None) -> int:
def get_disk_size(path: str, format: str | None = None) -> int:
"""Get the (virtual) size of a disk image
:param path: Path to the disk image
@@ -297,8 +299,8 @@ def get_disk_size(path: str, format: ty.Optional[str] = None) -> int:
def get_disk_backing_file(
path: str, basename: bool = True, format: ty.Optional[str] = None,
) -> ty.Optional[str]:
path: str, basename: bool = True, format: str | None = None,
) -> str | None:
"""Get the backing file of a disk image
:param path: Path to the disk image
@@ -314,10 +316,10 @@ def get_disk_backing_file(
def copy_image(
src: str,
dest: str,
host: ty.Optional[str] = None,
host: str | None = None,
receive: bool = False,
on_execute: ty.Optional[ty.Callable] = None,
on_completion: ty.Optional[ty.Callable] = None,
on_execute: Callable[[subprocess.Popen], None] | None = None,
on_completion: Callable[[subprocess.Popen], None] | None = None,
compression: bool = True,
) -> None:
"""Copy a disk image to an existing directory
@@ -352,7 +354,7 @@ def copy_image(
def chown_for_id_maps(
path: str, id_maps: ty.List[vconfig.LibvirtConfigGuestIDMap],
path: str, id_maps: list[vconfig.LibvirtConfigGuestIDMap],
) -> None:
"""Change ownership of file or directory for an id mapped
environment
@@ -411,7 +413,7 @@ def file_open(*args, **kwargs):
return open(*args, **kwargs)
def find_disk(guest: libvirt_guest.Guest) -> ty.Tuple[str, ty.Optional[str]]:
def find_disk(guest: libvirt_guest.Guest) -> tuple[str, str | None]:
"""Find root device path for instance
May be file or device
@@ -448,7 +450,7 @@ def find_disk(guest: libvirt_guest.Guest) -> ty.Tuple[str, ty.Optional[str]]:
return disk_path, disk_format
def get_disk_type_from_path(path: str) -> ty.Optional[str]:
def get_disk_type_from_path(path: str) -> str | None:
"""Retrieve disk type (raw, qcow2, lvm, ploop) for given file."""
if path.startswith('/dev'):
return 'lvm'
@@ -462,7 +464,7 @@ def get_disk_type_from_path(path: str) -> ty.Optional[str]:
return None
def get_fs_info(path: str) -> ty.Dict[str, int]:
def get_fs_info(path: str) -> dict[str, int]:
"""Get free/used/total space info for a filesystem
:param path: Any dirent on the filesystem
@@ -483,7 +485,7 @@ def fetch_image(
context: nova_context.RequestContext,
target: str,
image_id: str,
trusted_certs: ty.Optional['objects.TrustedCerts'] = None,
trusted_certs: 'objects.TrustedCerts | None' = None,
) -> None:
"""Grab image.
@@ -499,7 +501,7 @@ def fetch_raw_image(
context: nova_context.RequestContext,
target: str,
image_id: str,
trusted_certs: ty.Optional['objects.TrustedCerts'] = None,
trusted_certs: 'objects.TrustedCerts | None' = None,
) -> None:
"""Grab initrd or kernel image.
@@ -533,7 +535,7 @@ def get_instance_path(
def get_instance_path_at_destination(
instance: 'objects.Instance',
migrate_data: ty.Optional['objects.LibvirtLiveMigrateData'] = None,
migrate_data: 'objects.LibvirtLiveMigrateData | None' = None,
) -> str:
"""Get the instance path on destination node while live migration.
@@ -581,7 +583,7 @@ def get_arch(image_meta: 'objects.ImageMeta') -> str:
return obj_fields.Architecture.from_host()
def is_mounted(mount_path: str, source: ty.Optional[str] = None) -> bool:
def is_mounted(mount_path: str, source: str | None = None) -> bool:
"""Check if the given source is mounted at given destination point."""
if not os.path.ismount(mount_path):
return False
@@ -598,12 +600,12 @@ def is_valid_hostname(hostname: str) -> bool:
return bool(re.match(r"^[\w\-\.:]+$", hostname))
def version_to_string(version: ty.Tuple[int, int, int]) -> str:
def version_to_string(version: tuple[int, int, int]) -> str:
"""Returns string version based on tuple"""
return '.'.join([str(x) for x in version])
def cpu_features_to_traits(features: ty.Set[str]) -> ty.Dict[str, bool]:
def cpu_features_to_traits(features: set[str]) -> dict[str, bool]:
"""Returns this driver's CPU traits dict where keys are trait names from
CPU_TRAITS_MAPPING, values are boolean indicates whether the trait should
be set in the provider tree.
@@ -636,7 +638,7 @@ def get_cpu_model_from_arch(arch: str) -> str:
return mode
def get_machine_type(image_meta: 'objects.ImageMeta') -> ty.Optional[str]:
def get_machine_type(image_meta: 'objects.ImageMeta') -> str | None:
"""The guest machine type can be set as an image metadata property, or
otherwise based on architecture-specific defaults. If no defaults are
found then None will be returned. This will ultimately lead to QEMU using
@@ -649,7 +651,7 @@ def get_machine_type(image_meta: 'objects.ImageMeta') -> ty.Optional[str]:
return get_default_machine_type(get_arch(image_meta))
def get_default_machine_type(arch: str) -> ty.Optional[str]:
def get_default_machine_type(arch: str) -> str | None:
# NOTE(lyarwood): Values defined in [libvirt]/hw_machine_type take
# precedence here if available for the provided arch.
for mapping in CONF.libvirt.hw_machine_type or {}:
@@ -694,7 +696,7 @@ def mdev_name2uuid(mdev_name: str) -> str:
return str(uuid.UUID(mdev_uuid))
def mdev_uuid2name(mdev_uuid: str, parent: ty.Optional[str] = None) -> str:
def mdev_uuid2name(mdev_uuid: str, parent: str | None = None) -> str:
"""Convert an mdev uuid (of the form 8-4-4-4-12) and optionally its parent
device to a name (of the form mdev_<uuid_with_underscores>[_<pciid>]).
@@ -708,7 +710,7 @@ def mdev_uuid2name(mdev_uuid: str, parent: ty.Optional[str] = None) -> str:
return name
def get_flags_by_flavor_specs(flavor: 'objects.Flavor') -> ty.Set[str]:
def get_flags_by_flavor_specs(flavor: 'objects.Flavor') -> set[str]:
req_spec = objects.RequestSpec(flavor=flavor)
resource_request = scheduler_utils.ResourceRequest.from_request_spec(
req_spec)
@@ -725,8 +727,8 @@ def save_and_migrate_vtpm_dir(
inst_base_resize: str,
inst_base: str,
dest: str,
on_execute: ty.Callable,
on_completion: ty.Callable,
on_execute: Callable[[subprocess.Popen], None] | None = None,
on_completion: Callable[[subprocess.Popen], None] | None = None,
) -> None:
"""Save vTPM data to instance directory and migrate to the destination.
+1 -2
View File
@@ -19,7 +19,6 @@
"""VIF drivers for libvirt."""
import os
import typing as ty
import os_vif
from os_vif import exception as osv_exception
@@ -509,7 +508,7 @@ class LibvirtGenericVIFDriver(object):
raise exception.InternalError(
_('Unsupported VIF port profile type %s') % profile_name)
def _get_vdpa_dev_path(self, pci_address: ty.Text) -> ty.Text:
def _get_vdpa_dev_path(self, pci_address: str) -> str:
if self.host is not None:
return self.host.get_vdpa_device_path(pci_address)
# TODO(sean-k-mooney) this should never be raised remove when host
+4 -1
View File
@@ -341,7 +341,10 @@ exclude = .venv,.git,.tox,dist,*lib/python*,*egg,build,releasenotes
max-complexity = 40
[hacking]
import_exceptions = typing,nova.i18n
import_exceptions =
collections.abc
nova.i18n
typing
[flake8:local-plugins]
extension =