Merge "TPM: support instances with host secret security"

This commit is contained in:
Zuul
2025-11-19 17:45:06 +00:00
committed by Gerrit Code Review
10 changed files with 215 additions and 21 deletions
+5 -1
View File
@@ -1636,7 +1636,7 @@ Related options:
* ``swtpm_user`` must also be set.
"""),
cfg.ListOpt('supported_tpm_secret_security',
default=['user'],
default=['user', 'host'],
help="""
The list of TPM security policies supported by this compute host. If a value is
absent, it is not supported by this host, and any instance that requests it
@@ -1648,6 +1648,10 @@ Possible values are:
accessed by anyone else. The Libvirt secret is private and non-persistent.
The instance cannot be live-migrated or automatically resumed after host
reboot.
* ``host``: The Barbican secret is owned by the instance owner and cannot be
accessed by anyone else. The Libvirt secret is public and persistent. It
can be read by anyone with sufficient access on the host. The instance can
be live-migrated and automatically resumed after host reboot.
"""),
]
+3
View File
@@ -483,6 +483,9 @@ def tpm_secret_security_filter(
if security == 'user':
request_spec.root_required.add(
os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER)
elif security == 'host':
request_spec.root_required.add(
os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST)
else:
# We can get here if the requested TPM secret security passed extra
# spec validation but is not otherwise supported in the code at this
+4
View File
@@ -198,6 +198,7 @@ VIR_SECRET_USAGE_TYPE_NONE = 0
VIR_SECRET_USAGE_TYPE_VOLUME = 1
VIR_SECRET_USAGE_TYPE_CEPH = 2
VIR_SECRET_USAGE_TYPE_ISCSI = 3
VIR_SECRET_USAGE_TYPE_VTPM = 5
# metadata types
VIR_DOMAIN_METADATA_DESCRIPTION = 0
@@ -1834,12 +1835,15 @@ class Secret(object):
def _parse_xml(self, xml):
tree = etree.fromstring(xml)
self._uuid = tree.find('./uuid').text
self._ephemeral = tree.get('ephemeral') == 'yes'
self._private = tree.get('private') == 'yes'
self._usage_id = None
usage = tree.find('./usage')
if usage is not None:
if usage.get('type') == 'volume':
self._usage_id = usage.find('volume').text
if usage.get('type') == 'vtpm':
self._usage_id = usage.find('name').text
def setValue(self, value, flags=0):
self._value = value
+7 -3
View File
@@ -516,12 +516,16 @@ class InstanceHelperMixin:
self.api.delete_server(server['id'])
self._wait_until_deleted(server)
def _reboot_server(self, server, hard=False, expected_state='ACTIVE'):
def _reboot_server(self, server, hard=False, expected_state='ACTIVE',
api=None):
"""Reboot a server."""
self.api.post_server_action(
api = api or self.api
api.post_server_action(
server['id'], {'reboot': {'type': 'HARD' if hard else 'SOFT'}},
)
self.notifier.wait_for_versioned_notifications('instance.reboot.end')
if expected_state != 'ERROR':
self.notifier.wait_for_versioned_notifications(
'instance.reboot.end')
return self._wait_for_state_change(server, expected_state)
def _show_server(self, server):
@@ -18,6 +18,7 @@ from unittest import mock
from castellan.common import exception as castellan_exc
from castellan.common.objects import passphrase
from castellan.key_manager import key_manager
import ddt
import fixtures
from oslo_log import log as logging
from oslo_utils import uuidutils
@@ -140,6 +141,7 @@ class FakeKeyManager(key_manager.KeyManager):
)
@ddt.ddt
class VTPMServersTest(base.ServersTestBase):
# NOTE: ADMIN_API is intentionally not set to True in order to catch key
@@ -197,6 +199,14 @@ class VTPMServersTest(base.ServersTestBase):
self.assertNotIn('vtpm_secret_uuid', instance.system_metadata)
self.assertEqual(0, len(self.key_mgr._passphrases))
def _assert_libvirt_has_secret(self, host, instance_uuid):
s = host.driver._host.find_secret('vtpm', instance_uuid)
self.assertIsNotNone(s)
ctx = nova_context.get_admin_context()
instance = objects.Instance.get_by_uuid(ctx, instance_uuid)
secret_uuid = instance.system_metadata['vtpm_secret_uuid']
self.assertEqual(secret_uuid, s.UUIDString())
def _assert_libvirt_had_secret(self, compute, secret_uuid):
# This assert is for ephemeral private libvirt secrets that we
# undefine immediately after guest creation. Examples include 'user'
@@ -206,6 +216,10 @@ class VTPMServersTest(base.ServersTestBase):
conn = compute.driver._host.get_connection()
self.assertIn(secret_uuid, conn._removed_secrets)
def _assert_libvirt_secret_missing(self, host, instance_uuid):
s = host.driver._host.find_secret('vtpm', instance_uuid)
self.assertIsNone(s)
def test_tpm_secret_security_user(self):
self.flags(supported_tpm_secret_security=['user'], group='libvirt')
host = self.start_compute(hostname='tpm-host')
@@ -251,6 +265,38 @@ class VTPMServersTest(base.ServersTestBase):
# ensure we deleted the key now that we no longer need it
self.assertEqual(0, len(self.key_mgr._passphrases))
def test_create_server_secret_security_host(self):
self.flags(supported_tpm_secret_security=['host'], group='libvirt')
compute = self.start_compute()
# ensure we are reporting the correct traits
traits = self._get_provider_traits(self.compute_rp_uuids[compute])
self.assertIn('COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST', traits)
# create a server with vTPM
server = self._create_server_with_vtpm(secret_security='host')
# ensure our instance's system_metadata field and key manager inventory
# is correct
self.assertInstanceHasSecret(server)
# ensure the libvirt secret is defined correctly
ctx = nova_context.get_admin_context()
instance = objects.Instance.get_by_uuid(ctx, server['id'])
conn = self.computes[compute].driver._host.get_connection()
secret = conn._secrets[instance.system_metadata['vtpm_secret_uuid']]
self.assertFalse(secret._ephemeral)
self.assertFalse(secret._private)
# now delete the server
self._delete_server(server)
# ensure we deleted the key and undefined the secret now that we no
# longer need it
self.assertEqual(0, len(self.key_mgr._passphrases))
self.assertNotIn(instance.system_metadata['vtpm_secret_uuid'],
conn._secrets)
def test_suspend_resume_server(self):
self.start_compute()
@@ -300,6 +346,24 @@ class VTPMServersTest(base.ServersTestBase):
# is still correct
self.assertInstanceHasSecret(server)
@ddt.data(None, 'user', 'host')
def test_hard_reboot_server_as_admin(self, secret_security):
"""Test hard rebooting a non-admin user's instance as admin.
This should only work for the 'host' TPM secret security policy.
"""
self.start_compute()
# create a server with vTPM
server = self._create_server_with_vtpm(secret_security=secret_security)
# Attempt to reboot the server as admin, should only work for 'host'.
if secret_security == 'host':
self._reboot_server(server, hard=True, api=self.admin_api)
else:
self._reboot_server(server, hard=True, expected_state='ERROR',
api=self.admin_api)
def _test_resize_revert_server__vtpm_to_vtpm(self, extra_specs=None):
"""Test behavior of revert when a vTPM is retained across a resize.
@@ -749,8 +749,13 @@ class TestRequestFilter(test.NoDBTestCase):
reqspec.root_required)
self.assertEqual(set(), reqspec.root_forbidden)
@ddt.data('flavor', 'image')
def test_tpm_secret_security_filter(self, source):
@ddt.data(
('flavor', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER),
('flavor', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST),
('image', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER),
('image', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST))
@ddt.unpack
def test_tpm_secret_security_filter(self, source, secret_security, trait):
# First ensure that tpm_secret_security_filter is included
self.assertIn(request_filter.tpm_secret_security_filter,
request_filter.ALL_REQUEST_FILTERS)
@@ -761,14 +766,14 @@ class TestRequestFilter(test.NoDBTestCase):
extra_specs={
'hw:tpm_model': 'tpm-tis',
'hw:tpm_version': '1.2',
'hw:tpm_secret_security': 'user',
'hw:tpm_secret_security': secret_security,
}),
image=objects.ImageMeta(properties=objects.ImageMetaProps()))
elif source == 'image':
reqspec = objects.RequestSpec(
flavor=objects.Flavor(
extra_specs={
'hw:tpm_secret_security': 'user',
'hw:tpm_secret_security': secret_security,
}),
image=objects.ImageMeta(
properties=objects.ImageMetaProps(hw_tpm_model='tpm-tis',
@@ -778,9 +783,7 @@ class TestRequestFilter(test.NoDBTestCase):
self.assertEqual(set(), reqspec.root_forbidden)
self.assertTrue(
request_filter.tpm_secret_security_filter(self.context, reqspec))
self.assertEqual(
{ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER},
reqspec.root_required)
self.assertEqual({trait}, reqspec.root_required)
self.assertEqual(set(), reqspec.root_forbidden)
def test_tpm_secret_security_filter_skip(self):
+50 -1
View File
@@ -20978,6 +20978,55 @@ class LibvirtConnTestCase(test.NoDBTestCase,
# ...and undefined it after, despite the error
drvr._host.create_secret.return_value.undefine.assert_called_once()
@mock.patch('nova.virt.libvirt.host.Host')
@mock.patch('nova.crypto.ensure_vtpm_secret')
def test_get_or_create_secret_for_vtpm_host_security_found(
self, mock_secret, mock_host):
"""Test that the key manager service API is not called.
If the secret can be found locally from libvirt with 'host' TPM secret
security, there should be no call to the key manager API.
"""
instance = objects.Instance(**self.test_instance)
instance.flavor.extra_specs = {'hw:tpm_secret_security': 'host'}
mock_host.return_value.find_secret.return_value = mock.sentinel.secret
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
secret, security = drvr._get_or_create_secret_for_vtpm(self.context,
instance)
mock_secret.assert_not_called()
self.assertEqual(mock.sentinel.secret, secret)
self.assertEqual('host', security)
@mock.patch('nova.virt.libvirt.host.Host')
@mock.patch('nova.crypto.ensure_vtpm_secret')
def test_get_or_create_secret_for_vtpm_host_security_not_found(
self, mock_secret, mock_host):
"""Test that the key manager service API is called.
If the secret is not found locally from libvirt with 'host' TPM secret
security, there should be a call to the key manager API.
"""
instance = objects.Instance(**self.test_instance)
instance.flavor.extra_specs = {'hw:tpm_secret_security': 'host'}
mock_host.return_value.find_secret.return_value = None
mock_secret.return_value = (uuids.secret, mock.sentinel.passphrase)
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
secret, security = drvr._get_or_create_secret_for_vtpm(self.context,
instance)
mock_secret.assert_called_once_with(self.context, instance)
# ensure_vtpm_secret() returns (secret_uuid, passphrase)
mock_host.return_value.create_secret.assert_called_once_with(
'vtpm', uuids.instance, password=mock.sentinel.passphrase,
uuid=uuids.secret, ephemeral=False, private=False)
self.assertEqual(
mock_host.return_value.create_secret.return_value, secret)
self.assertEqual('host', security)
@mock.patch('nova.virt.disk.api.clean_lxc_namespace')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.get_info')
@mock.patch('nova.virt.disk.api.setup_container')
@@ -27807,7 +27856,7 @@ class LibvirtDriverTestCase(test.NoDBTestCase, TraitsComparisonMixin):
instance = objects.Instance(
uuid=uuids.instance, id=1,
ephemeral_key_uuid=uuids.ephemeral_key_uuid,
resources=None)
resources=None, flavor=objects.Flavor())
instance.system_metadata = {}
block_device_info = {'root_device_name': '/dev/vda',
'ephemerals': [],
+16
View File
@@ -21,6 +21,7 @@ import ddt
import eventlet
from eventlet import greenthread
from eventlet import tpool
from lxml import etree
from oslo_serialization import jsonutils
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import uuidutils
@@ -1051,6 +1052,21 @@ class HostTestCase(test.NoDBTestCase):
self.host.create_secret('iscsi', 'iscsivol', password="foo")
secret.setValue.assert_called_once_with("foo")
@mock.patch.object(fakelibvirt.virConnect, "secretDefineXML")
def test_create_secret_vtpm_ephemeral_private_default(self, mock_sec):
self.host.create_secret('vtpm', uuids.instance)
xml = etree.fromstring(mock_sec.call_args.args[0])
self.assertEqual('yes', xml.get('ephemeral'))
self.assertEqual('yes', xml.get('private'))
@mock.patch.object(fakelibvirt.virConnect, "secretDefineXML")
def test_create_secret_vtpm_not_ephemeral_private(self, mock_sec):
self.host.create_secret('vtpm', uuids.instance, ephemeral=False,
private=False)
xml = etree.fromstring(mock_sec.call_args.args[0])
self.assertEqual('no', xml.get('ephemeral'))
self.assertEqual('no', xml.get('private'))
@mock.patch('nova.virt.libvirt.host.Host.find_secret')
def test_delete_secret(self, mock_find_secret):
"""deleting secret."""
+44 -6
View File
@@ -1793,6 +1793,9 @@ class LibvirtDriver(driver.ComputeDriver):
pass
if cleanup_instance_disks:
if hardware.get_tpm_secret_security_constraint(
instance.flavor) == 'host':
self._host.delete_secret('vtpm', instance.uuid)
# Make sure that the instance directory files were successfully
# deleted before destroying the encryption secrets in the case of
# image backends that are not 'lvm' or 'rbd'. We don't want to
@@ -8172,6 +8175,43 @@ class LibvirtDriver(driver.ComputeDriver):
finally:
self._create_domain_cleanup_lxc(instance)
def _get_or_create_secret_for_vtpm(
self,
context: nova_context.RequestContext,
instance: 'objects.Instance',
) -> ty.Tuple[ty.Any, ty.Optional[str]]:
"""Get or create a libvirt vTPM secret.
For 'host' TPM secret security, this will look for a local libvirt
secret first and only call the key manager service API if it does not
find one.
For all others, it will call the key manager service API to get or
create a secret and then use it to create a libvirt secret.
"""
security = hardware.get_tpm_secret_security_constraint(
instance.flavor) or 'user'
libvirt_secret = None
kwargs = {}
if security == 'host':
# First try to look up the secret locally. If it's not found, we
# we will get None returned and we can still create it below.
libvirt_secret = self._host.find_secret('vtpm', instance.uuid)
# create_secret() already contains logic to default to the most
# secure ephemeral and private for TPM, so just specify if we
# don't want that.
kwargs = {'ephemeral': False, 'private': False}
if libvirt_secret is None:
secret_uuid, passphrase = crypto.ensure_vtpm_secret(context,
instance)
libvirt_secret = self._host.create_secret(
'vtpm', instance.uuid, password=passphrase, uuid=secret_uuid,
**kwargs)
return libvirt_secret, security
def _create_guest(
self,
context: nova_context.RequestContext,
@@ -8189,15 +8229,13 @@ class LibvirtDriver(driver.ComputeDriver):
:returns guest.Guest: Created guest.
"""
libvirt_secret = None
secret_security = None
# determine whether vTPM is in use and, if so, create the secret
if CONF.libvirt.swtpm_enabled and hardware.get_vtpm_constraint(
instance.flavor, instance.image_meta,
):
secret_uuid, passphrase = crypto.ensure_vtpm_secret(
context, instance)
libvirt_secret = self._host.create_secret(
'vtpm', instance.uuid, password=passphrase,
uuid=secret_uuid)
libvirt_secret, secret_security = (
self._get_or_create_secret_for_vtpm(context, instance))
try:
guest = libvirt_guest.Guest.create(xml, self._host)
@@ -8210,7 +8248,7 @@ class LibvirtDriver(driver.ComputeDriver):
return guest
finally:
if libvirt_secret is not None:
if libvirt_secret is not None and secret_security != 'host':
libvirt_secret.undefine()
def _neutron_failed_callback(self, event_name, instance):
+12 -3
View File
@@ -1079,6 +1079,8 @@ class Host(object):
usage_type_const = libvirt.VIR_SECRET_USAGE_TYPE_CEPH
elif usage_type == 'volume':
usage_type_const = libvirt.VIR_SECRET_USAGE_TYPE_VOLUME
elif usage_type == 'vtpm':
usage_type_const = libvirt.VIR_SECRET_USAGE_TYPE_VTPM
else:
msg = _("Invalid usage_type: %s")
raise exception.InternalError(msg % usage_type)
@@ -1090,7 +1092,8 @@ class Host(object):
if e.get_error_code() == libvirt.VIR_ERR_NO_SECRET:
return None
def create_secret(self, usage_type, usage_id, password=None, uuid=None):
def create_secret(self, usage_type, usage_id, password=None, uuid=None,
ephemeral=None, private=None):
"""Create a secret.
:param usage_type: one of 'iscsi', 'ceph', 'rbd', 'volume', 'vtpm'.
@@ -1102,8 +1105,14 @@ class Host(object):
libvirt
"""
secret_conf = vconfig.LibvirtConfigSecret()
secret_conf.ephemeral = usage_type == 'vtpm'
secret_conf.private = usage_type == 'vtpm'
if ephemeral is None:
secret_conf.ephemeral = usage_type == 'vtpm'
else:
secret_conf.ephemeral = ephemeral
if private is None:
secret_conf.private = usage_type == 'vtpm'
else:
secret_conf.private = private
secret_conf.usage_id = usage_id
secret_conf.uuid = uuid
if usage_type in ('rbd', 'ceph'):