Merge "TPM: support instances with user secret security"

This commit is contained in:
Zuul
2025-11-19 17:37:39 +00:00
committed by Gerrit Code Review
9 changed files with 246 additions and 4 deletions
+14
View File
@@ -1634,6 +1634,20 @@ ownership after being moved between nodes.
Related options:
* ``swtpm_user`` must also be set.
"""),
cfg.ListOpt('supported_tpm_secret_security',
default=['user'],
help="""
The list of TPM security policies supported by this compute host. If a value is
absent, it is not supported by this host, and any instance that requests it
will not be scheduled on this host.
Possible values are:
* ``user``: The Barbican secret is owned by the instance owner and cannot be
accessed by anyone else. The Libvirt secret is private and non-persistent.
The instance cannot be live-migrated or automatically resumed after host
reboot.
"""),
]
+30 -1
View File
@@ -465,6 +465,34 @@ def virtio_sound_filter(
return True
@trace_request_filter
def tpm_secret_security_filter(
ctxt: nova_context.RequestContext,
request_spec: 'objects.RequestSpec'
) -> bool:
has_vtpm = hardware.get_vtpm_constraint(
request_spec.flavor, request_spec.image) is not None
if not has_vtpm:
LOG.debug("tpm_secret_security_filter skipped")
return False
security = hardware.get_tpm_secret_security_constraint(
request_spec.flavor) or 'user'
if security == 'user':
request_spec.root_required.add(
os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER)
else:
# We can get here if the requested TPM secret security passed extra
# spec validation but is not otherwise supported in the code at this
# time.
msg = f"TPM secret security '{security}' is not supported."
LOG.warning(msg)
raise exception.RequestFilterFailed(reason=_(msg))
return True
ALL_REQUEST_FILTERS = [
require_tenant_aggregate,
map_az_to_placement_aggregate,
@@ -477,7 +505,8 @@ ALL_REQUEST_FILTERS = [
routed_networks_filter,
remote_managed_ports_filter,
ephemeral_encryption_filter,
virtio_sound_filter
virtio_sound_filter,
tpm_secret_security_filter,
]
+7 -1
View File
@@ -1910,6 +1910,12 @@ class Connection(object):
self._id_counter = 1 # libvirt reserves 0 for the hypervisor.
self._nodedevs = {}
self._secrets = {}
# NOTE(artom) The secret is undefined as soon as the guest has
# successfully started, but we still want to assert that is had been
# defined in this libvirt connection. We therefore keep a history of
# self._removed_secrets to allow functional tests to make those
# assertions.
self._removed_secrets = {}
self._event_callbacks = {}
self.fakeLibVersion = version
self.fakeVersion = hv_version
@@ -1931,7 +1937,7 @@ class Connection(object):
self._secrets[secret._uuid] = secret
def _remove_secret(self, secret):
del self._secrets[secret._uuid]
self._removed_secrets[secret._uuid] = self._secrets.pop(secret._uuid)
def _mark_running(self, dom):
self._running_vms[self._id_counter] = dom
+58 -2
View File
@@ -166,10 +166,14 @@ class VTPMServersTest(base.ServersTestBase):
self.key_mgr = crypto._get_key_manager()
def _create_server_with_vtpm(self):
def _create_server_with_vtpm(self, secret_security=None,
expected_state='ACTIVE'):
extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'}
if secret_security:
extra_specs.update({'hw:tpm_secret_security': secret_security})
flavor_id = self._create_flavor(extra_spec=extra_specs)
server = self._create_server(flavor_id=flavor_id)
server = self._create_server(flavor_id=flavor_id,
expected_state=expected_state)
return server
@@ -185,6 +189,7 @@ class VTPMServersTest(base.ServersTestBase):
self.assertIn(
instance.system_metadata['vtpm_secret_uuid'],
self.key_mgr._passphrases)
return instance.system_metadata['vtpm_secret_uuid']
def assertInstanceHasNoSecret(self, server):
ctx = nova_context.get_admin_context()
@@ -192,6 +197,39 @@ class VTPMServersTest(base.ServersTestBase):
self.assertNotIn('vtpm_secret_uuid', instance.system_metadata)
self.assertEqual(0, len(self.key_mgr._passphrases))
def _assert_libvirt_had_secret(self, compute, secret_uuid):
# This assert is for ephemeral private libvirt secrets that we
# undefine immediately after guest creation. Examples include 'user'
# and 'deployment' TPM secret security modes and legacy servers.
# The LibvirtFixture tracks secrets that existed before they were
# removed, so we can assert this.
conn = compute.driver._host.get_connection()
self.assertIn(secret_uuid, conn._removed_secrets)
def test_tpm_secret_security_user(self):
self.flags(supported_tpm_secret_security=['user'], group='libvirt')
host = self.start_compute(hostname='tpm-host')
compute = self.computes['tpm-host']
# ensure we are reporting the correct traits
traits = self._get_provider_traits(self.compute_rp_uuids[host])
self.assertIn('COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER', traits)
server = self._create_server_with_vtpm(secret_security='user')
# The server should have a secret in the key manager service.
secret_uuid = self.assertInstanceHasSecret(server)
# And it should have had a libvirt secret created and undefined.
self._assert_libvirt_had_secret(compute, secret_uuid)
def test_tpm_secret_security_user_negative(self):
self.flags(supported_tpm_secret_security=['deployment'],
group='libvirt')
self.start_compute(hostname='tpm-host')
self._create_server_with_vtpm(secret_security='user',
expected_state='ERROR')
def test_create_server(self):
compute = self.start_compute()
@@ -393,6 +431,24 @@ class VTPMServersTest(base.ServersTestBase):
# there is no going back now
self.assertInstanceHasNoSecret(server)
def test_create_server_secret_security_unsupported(self):
"""Test when a not supported TPM secret security mode is requested
We expect the create to fail for NoValidHost.
"""
# Start a compute host which supports no modes.
self.flags(supported_tpm_secret_security=[], group='libvirt')
self.start_compute('test_compute0')
# Try to create an instance on that host defaulting to 'user'.
server = self._create_server_with_vtpm(expected_state='ERROR')
# The create should have failed for NoValidHost.
event = self._wait_for_instance_action_event(
server, 'create', 'conductor_schedule_and_build_instances',
'Error')
self.assertIn('NoValidHost', event['traceback'])
def test_migrate_server(self):
"""Test cold migrate as a non-admin user.
@@ -13,6 +13,7 @@
import os_traits as ot
from unittest import mock
import ddt
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import timeutils
@@ -25,6 +26,7 @@ from nova import test
from nova.tests.unit import utils
@ddt.ddt
class TestRequestFilter(test.NoDBTestCase):
def setUp(self):
super(TestRequestFilter, self).setUp()
@@ -746,3 +748,68 @@ class TestRequestFilter(test.NoDBTestCase):
{ot.COMPUTE_SOUND_MODEL_VIRTIO},
reqspec.root_required)
self.assertEqual(set(), reqspec.root_forbidden)
@ddt.data('flavor', 'image')
def test_tpm_secret_security_filter(self, source):
# First ensure that tpm_secret_security_filter is included
self.assertIn(request_filter.tpm_secret_security_filter,
request_filter.ALL_REQUEST_FILTERS)
if source == 'flavor':
reqspec = objects.RequestSpec(
flavor=objects.Flavor(
extra_specs={
'hw:tpm_model': 'tpm-tis',
'hw:tpm_version': '1.2',
'hw:tpm_secret_security': 'user',
}),
image=objects.ImageMeta(properties=objects.ImageMetaProps()))
elif source == 'image':
reqspec = objects.RequestSpec(
flavor=objects.Flavor(
extra_specs={
'hw:tpm_secret_security': 'user',
}),
image=objects.ImageMeta(
properties=objects.ImageMetaProps(hw_tpm_model='tpm-tis',
hw_tpm_version='1.2')))
self.assertEqual(set(), reqspec.root_required)
self.assertEqual(set(), reqspec.root_forbidden)
self.assertTrue(
request_filter.tpm_secret_security_filter(self.context, reqspec))
self.assertEqual(
{ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER},
reqspec.root_required)
self.assertEqual(set(), reqspec.root_forbidden)
def test_tpm_secret_security_filter_skip(self):
reqspec = objects.RequestSpec(
flavor=objects.Flavor(extra_specs={}),
image=objects.ImageMeta(
properties=objects.ImageMetaProps()))
self.assertEqual(set(), reqspec.root_required)
self.assertEqual(set(), reqspec.root_forbidden)
self.assertFalse(
request_filter.tpm_secret_security_filter(self.context, reqspec))
def test_tpm_secret_security_filter_fail(self):
reqspec = objects.RequestSpec(
flavor=objects.Flavor(
extra_specs={
'hw:tpm_model': 'tpm-tis',
'hw:tpm_version': '1.2',
'hw:tpm_secret_security': 'bogus',
}),
image=objects.ImageMeta(
properties=objects.ImageMetaProps()))
self.assertEqual(set(), reqspec.root_required)
self.assertEqual(set(), reqspec.root_forbidden)
# Mock out get_tpm_secret_security_constraint() so we don't get caught
# by the Invalid check before we can test the filter fail.
with mock.patch(
'nova.virt.hardware.get_tpm_secret_security_constraint'):
self.assertRaises(
exception.RequestFilterFailed,
request_filter.tpm_secret_security_filter,
self.context, reqspec)
@@ -23503,6 +23503,30 @@ class TestUpdateProviderTree(test.NoDBTestCase):
'COMPUTE_SECURITY_TPM_2_0', 'COMPUTE_SECURITY_TPM_1_2'):
self.assertIn(trait, self.pt.data(self.cn_rp['uuid']).traits)
def test_update_provider_tree_with_tpm_secret_security_traits(self):
self.flags(swtpm_enabled=True, group='libvirt')
self.flags(
supported_tpm_secret_security=['user', 'host', 'deployment'],
group='libvirt')
self._test_update_provider_tree()
for trait in (
'COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER',
'COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST',
'COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT'
):
self.assertIn(trait, self.pt.data(self.cn_rp['uuid']).traits)
def test_update_provider_tree_with_tpm_secret_security_traits_none(self):
self.flags(swtpm_enabled=True, group='libvirt')
self.flags(supported_tpm_secret_security=[], group='libvirt')
self._test_update_provider_tree()
for trait in (
'COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER',
'COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST',
'COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT'
):
self.assertNotIn(trait, self.pt.data(self.cn_rp['uuid']).traits)
@mock.patch.object(
fakelibvirt.virConnect, '_domain_capability_devices', new=
fakelibvirt.virConnect._domain_capability_devices_with_tpm_supported
+22
View File
@@ -5931,6 +5931,28 @@ class VTPMConfigTest(test.NoDBTestCase):
expected, hw.get_vtpm_constraint(flavor, image_meta),
)
@ddt.unpack
@ddt.data(
# pass: no configuration
(None, None),
# pass: flavor-only
('user', 'user'),
('host', 'host'),
('deployment', 'deployment'),
)
def test_get_tpm_secret_security_constraint(self, flavor_security,
expected):
extra_specs = {}
if flavor_security:
extra_specs['hw:tpm_secret_security'] = flavor_security
flavor = objects.Flavor(
name='foo', vcpus=1, memory_mb=1024, extra_specs=extra_specs)
self.assertEqual(
expected, hw.get_tpm_secret_security_constraint(flavor))
@ddt.ddt
class SecureBootPolicyTest(test.NoDBTestCase):
+14
View File
@@ -2123,6 +2123,20 @@ def get_vtpm_constraint(
return VTPMConfig(version, model)
def get_tpm_secret_security_constraint(
flavor: 'objects.Flavor',
) -> ty.Optional[str]:
# NOTE(melwitt): An image property for TPM secret security is intentionally
# not provided because server rebuild is blocked in the API. If a user were
# to create a server with a given TPM secret security policy via an image
# property, that policy would become locked-in and unable to be changed.
# The user would not be able to change the image property because they
# would not be able to rebuild, and they would not be able to resize to a
# different TPM secret security policy because the image property and
# flavor extra spec would conflict.
return flavor.get('extra_specs', {}).get('hw:tpm_secret_security')
def get_secure_boot_constraint(
flavor: 'objects.Flavor',
image_meta: 'objects.ImageMeta',
+10
View File
@@ -13338,6 +13338,16 @@ class LibvirtDriver(driver.ComputeDriver):
ot.COMPUTE_SECURITY_TPM_1_2: '1.2' in tpm_versions,
})
if 'user' in CONF.libvirt.supported_tpm_secret_security:
tr.update({
ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER: True})
if 'host' in CONF.libvirt.supported_tpm_secret_security:
tr.update({
ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST: True})
if 'deployment' in CONF.libvirt.supported_tpm_secret_security:
tr.update({
ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT: True})
return tr
def _get_vif_model_traits(self) -> ty.Dict[str, bool]: