TPM: support instances with deployment secret security

This means authenticating as the Nova service user to Barbican,
so that the latter can make the secret owned by Nova. This requires
the [service_user] config section to be set.

An API block is also added to prevent resizes to change to or from
the ``deployment`` TPM secret security mode. This is because doing so
would require conversion of secret ownership to or from the user to the
Nova service user. The change is complicated and will be implemented
as a separate patch later in the series.

Resizing from ``deployment`` TPM secret security mode to ``deployment``
TPM secret security mode is allowed.

Related to blueprint vtpm-live-migration

Change-Id: I007f9993451d9197f53dee9a5fd29daa307ebe6b
Signed-off-by: melanie witt <melwittt@gmail.com>
This commit is contained in:
Artom Lifshitz
2025-02-17 18:22:58 -05:00
committed by melanie witt
parent 66bb1e77f2
commit 880019baaf
14 changed files with 518 additions and 29 deletions
+38
View File
@@ -87,6 +87,7 @@ from nova import servicegroup
from nova import utils
from nova.virt import hardware
from nova.volume import cinder
from nova import vtpm
LOG = logging.getLogger(__name__)
@@ -4197,6 +4198,39 @@ class API:
return node
@staticmethod
def _validate_vtpm_secret_security(current_flavor, new_flavor, image_meta):
"""Validate whether or not the requested resize is supported for vTPM.
This will reject requests that would require secret ownership
conversions for the time being. Secret ownership changes will be
complex, so we plan to add support for it as a separate follow-up
enhancement.
TODO(melwitt): Remove this when support for key manager service secret
ownership conversions is added.
"""
if not (hardware.get_vtpm_constraint(current_flavor, image_meta) and
hardware.get_vtpm_constraint(new_flavor, image_meta)):
# If either of the flavors has no vTPM at all, we don't need to
# validate anything because no secret ownership change would be
# involved.
return
from_security = vtpm.get_instance_tpm_secret_security(current_flavor)
to_security = vtpm.get_instance_tpm_secret_security(new_flavor)
if (from_security != to_security and
(from_security == 'deployment' or to_security == 'deployment')):
# Resizing to 'deployment' TPM secret security from any other
# mode or resizing to any other mode from 'deployment' TPM secret
# security would involve converting key manager service secret
# ownership from the user to the Nova service user or from the Nova
# service user to the user, and we don't support that yet.
msg = _("Resize between 'deployment' TPM secret security and "
"other TPM secret security modes is not supported.")
raise exception.OperationNotSupportedForVTPM(msg)
@block_shares_not_supported()
# TODO(stephenfin): This logic would be so much easier to grok if we
# finally split resize and cold migration into separate code paths
@@ -4356,6 +4390,10 @@ class API:
self._check_compute_service_for_mixed_instance(
request_spec.numa_topology, min_comp_ver)
if not same_flavor:
self._validate_vtpm_secret_security(current_flavor, new_flavor,
instance.image_meta)
instance.task_state = task_states.RESIZE_PREP
instance.progress = 0
instance.auto_disk_config = auto_disk_config or False
+5 -2
View File
@@ -67,7 +67,6 @@ from nova.compute import vm_states
from nova import conductor
import nova.conf
import nova.context
from nova import crypto
from nova import exception
from nova import exception_wrapper
from nova.i18n import _
@@ -99,6 +98,7 @@ import nova.virt.node
from nova.virt import storage_users
from nova.virt import virtapi
from nova.volume import cinder
from nova import vtpm
CONF = nova.conf.CONF
@@ -970,6 +970,9 @@ class ComputeManager(manager.Manager):
self.host, action=fields.NotificationAction.DELETE,
phase=fields.NotificationPhase.END, bdms=bdms)
def _complete_deletion_vtpm(self, context, instance):
vtpm.delete_secret(context, instance)
def _complete_deletion(self, context, instance):
self._update_resource_tracker(context, instance)
@@ -985,7 +988,7 @@ class ComputeManager(manager.Manager):
self._delete_scheduler_instance_info(context, instance.uuid)
# Delete the vTPM secret in the key manager service if needed.
crypto.delete_vtpm_secret(context, instance)
self._complete_deletion_vtpm(context, instance)
def _validate_pinning_configuration(self, instances):
if not self.driver.capabilities.get('supports_pcpus', False):
+4 -1
View File
@@ -1636,7 +1636,7 @@ Related options:
* ``swtpm_user`` must also be set.
"""),
cfg.ListOpt('supported_tpm_secret_security',
default=['user', 'host'],
default=['user', 'host', 'deployment'],
help="""
The list of TPM security policies supported by this compute host. If a value is
absent, it is not supported by this host, and any instance that requests it
@@ -1652,6 +1652,9 @@ Possible values are:
accessed by anyone else. The Libvirt secret is public and persistent. It
can be read by anyone with sufficient access on the host. The instance can
be live-migrated and automatically resumed after host reboot.
* ``deployment``: The Barbican secret is owned by the Nova service user. The
Libvirt secret is private and non-persistent. The instance can be
live-migrated and resumed automatically after host reboot.
"""),
cfg.BoolOpt(
'use_default_aio_mode_for_volumes',
+38
View File
@@ -28,13 +28,16 @@ from oslo_db.sqlalchemy import enginefacade
from oslo_log import log as logging
from oslo_utils import timeutils
import nova.conf
from nova import exception
from nova.i18n import _
from nova import objects
from nova import policy
from nova import service_auth
from nova import utils
LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
CELL_CACHE = {}
# NOTE(melwitt): Used for the scatter-gather utility to indicate we timed out
# waiting for a result from a cell.
@@ -47,6 +50,14 @@ CELLS = []
CELL_TIMEOUT = 60
def reset_globals():
global CELL_CACHE
global CELLS
CELL_CACHE = {}
CELLS = []
service_auth.reset_globals()
class _ContextAuthPlugin(plugin.BaseAuthPlugin):
"""A keystoneauth auth plugin that uses the values from the Context.
@@ -277,6 +288,33 @@ def get_admin_context(read_deleted="no"):
overwrite=False)
def get_nova_service_user_context():
"""Get a context that will authenticate as the Nova service user.
This will pull authentication parameters from the [<conf_group>]
section of the Nova configuration and load an auth plugin, then create
and return a RequestContext object containing that auth plugin.
Then, code using the RequestContext will call its get_auth_plugin() method
to authenticate with another service.
"""
conf_group = nova.conf.service_token.SERVICE_USER_GROUP
auth = service_auth.get_service_auth_plugin(conf_group)
session = service_auth.get_service_auth_session(conf_group)
if auth is None or session is None:
raise exception.InvalidConfiguration(
'Failed to load auth plugin or session from configuration. '
f'Ensure the [{conf_group}] section of the Nova configuration '
'file is correctly configured for the Nova service user.')
return RequestContext(user_id=auth.get_user_id(session),
project_id=auth.get_project_id(session),
roles=auth.get_access(session).role_names,
user_auth_plugin=auth, overwrite=False)
def is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:
+3
View File
@@ -486,6 +486,9 @@ def tpm_secret_security_filter(
elif security == 'host':
request_spec.root_required.add(
os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST)
elif security == 'deployment':
request_spec.root_required.add(
os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT)
else:
# We can get here if the requested TPM secret security passed extra
# spec validation but is not otherwise supported in the code at this
+1 -2
View File
@@ -243,8 +243,7 @@ class TestCase(base.BaseTestCase):
# NOTE(danms): Reset the cached list of cells
from nova.compute import api
api.CELLS = []
context.CELL_CACHE = {}
context.CELLS = []
context.reset_globals()
self.computes = {}
self.cell_mappings = {}
+207 -11
View File
@@ -168,6 +168,15 @@ class VTPMServersTest(base.ServersTestBase):
self.key_mgr = crypto._get_key_manager()
# Mock the get_nova_service_user_context() method so we can
# differentiate request contexts for the 'nova' service user.
def fake_get_nova_service_user_context():
return nova_context.RequestContext(user_id='nova')
self.useFixture(fixtures.MockPatch(
'nova.context.get_nova_service_user_context',
fake_get_nova_service_user_context))
def _create_server_with_vtpm(self, secret_security=None,
expected_state='ACTIVE'):
extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'}
@@ -183,14 +192,15 @@ class VTPMServersTest(base.ServersTestBase):
# use the default flavor (i.e. one without vTPM extra specs)
return self._create_server()
def assertInstanceHasSecret(self, server):
def assertInstanceHasSecret(self, server, user_id='fake'):
# user_id='fake' is the normal non-admin user.
ctx = nova_context.get_admin_context()
instance = objects.Instance.get_by_uuid(ctx, server['id'])
self.assertIn('vtpm_secret_uuid', instance.system_metadata)
self.assertEqual(1, len(self.key_mgr._passphrases))
self.assertIn(
instance.system_metadata['vtpm_secret_uuid'],
self.key_mgr._passphrases)
secret_uuid = instance.system_metadata['vtpm_secret_uuid']
self.assertIn(secret_uuid, self.key_mgr._passphrases)
self.assertEqual(user_id, self.key_mgr._contexts[secret_uuid].user_id)
return instance.system_metadata['vtpm_secret_uuid']
def assertInstanceHasNoSecret(self, server):
@@ -297,6 +307,41 @@ class VTPMServersTest(base.ServersTestBase):
self.assertNotIn(instance.system_metadata['vtpm_secret_uuid'],
conn._secrets)
def test_create_server_secret_security_deployment(self):
self.flags(
supported_tpm_secret_security=['deployment'], group='libvirt')
self.start_compute(hostname='tpm-host')
compute = self.computes['tpm-host']
# ensure we are reporting the correct traits
traits = self._get_provider_traits(self.compute_rp_uuids['tpm-host'])
self.assertIn(
'COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT', traits)
# create a server with vTPM
server = self._create_server_with_vtpm(secret_security='deployment')
# ensure our instance's system_metadata field and key manager inventory
# is correct
self.assertInstanceHasSecret(server, user_id='nova')
# ensure the libvirt secret is defined correctly
ctx = nova_context.get_admin_context()
instance = objects.Instance.get_by_uuid(ctx, server['id'])
self._assert_libvirt_had_secret(
compute, instance.system_metadata['vtpm_secret_uuid'])
# Now delete the server, this delete will fail if the secret ownership
# does not match. And we verified the secret owner is 'nova' above.
self._delete_server(server)
# ensure we deleted the key and undefined the secret now that we no
# longer need it
self.assertEqual(0, len(self.key_mgr._passphrases))
conn = compute.driver._host.get_connection()
self.assertNotIn(instance.system_metadata['vtpm_secret_uuid'],
conn._secrets)
def test_suspend_resume_server(self):
self.start_compute()
@@ -409,7 +454,22 @@ class VTPMServersTest(base.ServersTestBase):
self._test_resize_revert_server__vtpm_to_vtpm(
extra_specs=extra_specs)
def test_resize_server__no_vtpm_to_vtpm(self):
@ddt.data(None, 'user', 'host', 'deployment')
def test_resize_server__no_vtpm_to_vtpm(self, secret_security):
"""Resize a server from a flavor without TPM to a flavor with TPM.
This tests a scenario where the instance does not have a TPM before
the resize but *does* have a TPM after the resize.
A TPM secret security of 'None' means the instance is either:
* A legacy vTPM instance
* A vTPM instance where the user did not specify TPM secret security
In both of these cases, the default TPM secret security policy is
'user'. So 'None' is the equivalent of 'user'.
"""
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)
@@ -423,6 +483,8 @@ class VTPMServersTest(base.ServersTestBase):
# create a flavor with vTPM
extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'}
if secret_security is not None:
extra_specs['hw:tpm_secret_security'] = secret_security
flavor_id = self._create_flavor(extra_spec=extra_specs)
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
@@ -436,7 +498,8 @@ class VTPMServersTest(base.ServersTestBase):
# ensure our instance's system_metadata field and key manager inventory
# is updated to reflect the new vTPM requirement
self.assertInstanceHasSecret(server)
user_id = 'nova' if secret_security == 'deployment' else 'fake'
self.assertInstanceHasSecret(server, user_id=user_id)
# revert the instance rather than confirming it, and ensure the secret
# is correctly cleaned up
@@ -453,16 +516,32 @@ class VTPMServersTest(base.ServersTestBase):
# ensure we delete the new key since we no longer need it
self.assertInstanceHasNoSecret(server)
def test_resize_server__vtpm_to_no_vtpm(self):
@ddt.data(None, 'user', 'host', 'deployment')
def test_resize_server__vtpm_to_no_vtpm(self, secret_security):
"""Resize a server from a flavor with TPM to a flavor without TPM.
This tests a scenario where the instance has a TPM before the resize
but does *not* have a TPM after the resize.
A TPM secret security of 'None' means the instance is either:
* A legacy vTPM instance
* A vTPM instance where the user did not specify TPM secret security
In both of these cases, the default TPM secret security policy is
'user'. So 'None' is the equivalent of 'user'.
"""
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)
# create a server with vTPM
server = self._create_server_with_vtpm()
server = self._create_server_with_vtpm(secret_security=secret_security)
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field is correct
self.assertInstanceHasSecret(server)
user_id = 'nova' if secret_security == 'deployment' else 'fake'
self.assertInstanceHasSecret(server, user_id=user_id)
# create a flavor without vTPM
flavor_id = self._create_flavor()
@@ -478,7 +557,8 @@ class VTPMServersTest(base.ServersTestBase):
# ensure we still have the key for the vTPM device in storage in case
# we revert
self.assertInstanceHasSecret(server)
user_id = 'nova' if secret_security == 'deployment' else 'fake'
self.assertInstanceHasSecret(server, user_id=user_id)
# confirm the instance and ensure the secret is correctly cleaned up
@@ -488,13 +568,129 @@ class VTPMServersTest(base.ServersTestBase):
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
# revert back to the old flavor *with* vTPM
# confirm to the new flavor *without* vTPM
server = self._confirm_resize(server)
# ensure we have finally deleted the key for the vTPM device since
# there is no going back now
self.assertInstanceHasNoSecret(server)
@ddt.unpack
@ddt.data(
(None, 'deployment'), ('deployment', None),
('user', 'deployment'), ('deployment', 'user'),
('host', 'deployment'), ('deployment', 'host'))
def test_resize_vtpm_server_secret_security_deployment_unsupported(
self, from_secret_security, to_secret_security):
"""Resizes that require secret ownership changes are not allowed.
This tests a scenario where the instance has a TPM before the resize
and has a TPM after the resize.
A TPM secret security of 'None' means the instance is either:
* A legacy vTPM instance
* A vTPM instance where the user did not specify TPM secret security
In both of these cases, the default TPM secret security policy is
'user'. So 'None' is the equivalent of 'user'.
Until a later patch in the series adds code to convert to and from a
user-owned secret <=> Nova service user owned secret, we will want to
reject requests that would require conversion. Otherwise, these
attempts will fail with secret access permission errors.
"""
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)
# create a server with vTPM with from_secret_security
server = self._create_server_with_vtpm(
secret_security=from_secret_security)
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field is correct
user_id = 'nova' if from_secret_security == 'deployment' else 'fake'
self.assertInstanceHasSecret(server, user_id=user_id)
# create a flavor with to_secret_security
extra_specs = {'hw:tpm_version': '1.2',
'hw:tpm_model': 'tpm-tis'}
if to_secret_security is not None:
extra_specs['hw:tpm_secret_security'] = to_secret_security
flavor_id = self._create_flavor(extra_spec=extra_specs)
with mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
ex = self.assertRaises(
client.OpenStackApiException, self._resize_server, server,
flavor_id=flavor_id)
self.assertEqual(400, ex.response.status_code)
self.assertIn(
"Resize between 'deployment' TPM secret security and "
"other TPM secret security modes is not supported.",
str(ex))
@ddt.unpack
@ddt.data(
(None, None),
('user', 'user'),
('host', 'host'),
('deployment', 'deployment'),
(None, 'user'), ('user', None),
(None, 'host'), ('host', None),
('user', 'host'), ('host', 'user'))
def test_resize_vtpm_server_secret_security_deployment_supported(
self, from_secret_security, to_secret_security):
"""Resizes that do not require secret ownership changes are allowed.
This tests a scenario where the instance has a TPM before the resize
and has a TPM after the resize.
A TPM secret security of 'None' means the instance is either:
* A legacy vTPM instance
* A vTPM instance where the user did not specify TPM secret security
In both of these cases, the default TPM secret security policy is
'user'. So 'None' is the equivalent of 'user'.
A resize from 'deployment' to 'deployment' is allowed because in both
cases the key manager service secret will be owned by the Nova service
user and no ownership change will be needed.
"""
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)
# create a server with vTPM with from_secret_security
server = self._create_server_with_vtpm(
secret_security=from_secret_security)
self.addCleanup(self._delete_server, server)
# ensure our instance's system_metadata field is correct
user_id = 'nova' if from_secret_security == 'deployment' else 'fake'
self.assertInstanceHasSecret(server, user_id=user_id)
# create a flavor with to_secret_security
extra_specs = {'hw:tpm_version': '1.2',
'hw:tpm_model': 'tpm-tis'}
if to_secret_security is not None:
extra_specs['hw:tpm_secret_security'] = to_secret_security
flavor_id = self._create_flavor(extra_spec=extra_specs)
with mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
# resize should succeed
self._resize_server(server, flavor_id=flavor_id)
# And the secret should still be as expected.
self.assertInstanceHasSecret(server, user_id=user_id)
def test_create_server_secret_security_unsupported(self):
"""Test when a not supported TPM secret security mode is requested
+2 -1
View File
@@ -1794,7 +1794,8 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase,
self, reclaim_instance_interval, mock_delete_vtpm,
mock_delete_alloc):
self.flags(reclaim_instance_interval=reclaim_instance_interval)
instance = objects.Instance(uuid=uuids.instance)
instance = objects.Instance(uuid=uuids.instance,
flavor=objects.Flavor())
with mock.patch.multiple(
self.compute,
@@ -752,8 +752,12 @@ class TestRequestFilter(test.NoDBTestCase):
@ddt.data(
('flavor', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER),
('flavor', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST),
('flavor',
'deployment', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT),
('image', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER),
('image', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST))
('image', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST),
('image',
'deployment', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT))
@ddt.unpack
def test_tpm_secret_security_filter(self, source, secret_security, trait):
# First ensure that tpm_secret_security_filter is included
+110 -1
View File
@@ -15,8 +15,12 @@
import threading
from unittest import mock
import ddt
import futurist.waiters
from keystoneauth1.fixture import plugin as ks_plugin_fixture
from keystoneauth1 import loading as ks_loading
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_context import context as o_context
from oslo_context import fixture as o_fixture
from oslo_utils.fixture import uuidsentinel as uuids
@@ -24,10 +28,28 @@ from oslo_utils.fixture import uuidsentinel as uuids
from nova import context
from nova import exception
from nova import objects
from nova import service_auth
from nova import test
from nova import utils
class TestPluginWithAccess(ks_plugin_fixture.TestPlugin):
def get_access(self, session):
return mock.Mock(role_names=['service'])
class LoadingFixtureWithAccess(ks_plugin_fixture.LoadingFixture):
def create_plugin(self):
return TestPluginWithAccess(
token=self.token,
endpoint=self.endpoint,
user_id=self.user_id,
project_id=self.project_id)
@ddt.ddt
class ContextTestCase(test.NoDBTestCase):
# NOTE(danms): Avoid any cells setup by claiming we will
# do things ourselves.
@@ -178,6 +200,93 @@ class ContextTestCase(test.NoDBTestCase):
context.get_admin_context()
self.assertIs(o_context.get_current(), ctx1)
@mock.patch('keystoneauth1.loading.load_auth_from_conf_options')
@mock.patch('keystoneauth1.loading.load_session_from_conf_options')
def test_get_nova_service_user_context(self, mock_load_session,
mock_load_auth):
"""Verify the basic get of a Nova service user context."""
# Get a Nova service user context.
ctxt = context.get_nova_service_user_context()
# Verify we called the loading functions as expected.
mock_load_auth.assert_called_once_with(context.CONF, 'service_user')
mock_load_session.assert_called_once_with(context.CONF, 'service_user',
auth=None)
mock_plugin = mock_load_auth.return_value
mock_session = mock_load_session.return_value
# Verify we called the user_id and project_id getting methods as
# expected.
mock_plugin.get_user_id.assert_called_once_with(mock_session)
mock_plugin.get_project_id.assert_called_once_with(mock_session)
# Verify the RequestContext attributes got set as expected.
self.assertEqual(mock_plugin.get_user_id.return_value, ctxt.user_id)
self.assertEqual(mock_plugin.get_project_id.return_value,
ctxt.project_id)
self.assertEqual(mock_plugin, ctxt.user_auth_plugin)
# Get another context to verify we create the context with
# overwrite=False to avoid overwriting the thread local storage.
with mock.patch('nova.context.RequestContext') as mock_context:
ctxt = context.get_nova_service_user_context()
mock_context.assert_called_once_with(
user_id=mock_plugin.get_user_id.return_value,
project_id=mock_plugin.get_project_id.return_value,
roles=mock_plugin.get_access.return_value.role_names,
user_auth_plugin=mock_plugin, overwrite=False)
def test_get_nova_service_user_context_user_project(self):
"""Verify the user_id and project_id get set to what we expect."""
# Use a new config fixture so that the options we register here will
# get unregistered after the test.
conf_fixture = self.useFixture(
config_fixture.Config(conf=cfg.ConfigOpts()))
# Register the auth and session options in the [service_user]
# config section.
oslo_opts = (ks_loading.get_auth_common_conf_options() +
ks_loading.get_session_conf_options() +
ks_loading.get_auth_plugin_conf_options('password'))
conf_fixture.register_opts(oslo_opts, group='service_user')
# Fill in typical values for the Nova service user.
conf_fixture.config(
group='service_user', auth_type='password', username='nova',
project_name='service', auth_url='http://anyhost/auth')
# Use the plugin loading fixture from keystoneauth in order to skip all
# of the real authentication steps of calling Keystone and set expected
# user_id and project_id values.
self.useFixture(LoadingFixtureWithAccess(user_id=uuids.nova,
project_id=uuids.service))
# Verify we get the expected user_id and project_id in the
# RequestContext.
with mock.patch.object(service_auth, 'CONF', conf_fixture.conf):
ctxt = context.get_nova_service_user_context()
self.assertEqual(uuids.nova, ctxt.user_id)
self.assertEqual(uuids.service, ctxt.project_id)
self.assertEqual(['service'], ctxt.roles)
@mock.patch('keystoneauth1.loading.load_auth_from_conf_options')
@mock.patch('keystoneauth1.loading.load_session_from_conf_options')
@ddt.data('auth', 'session')
def test_get_nova_service_user_context_load_fail(
self, to_fail, mock_load_session, mock_load_auth):
if to_fail == 'auth':
mock_load_auth.return_value = None
elif to_fail == 'session':
mock_load_session.return_value = None
ex = self.assertRaises(exception.InvalidConfiguration,
context.get_nova_service_user_context)
msg = (
'Failed to load auth plugin or session from configuration. '
'Ensure the [service_user] section of the Nova configuration '
'file is correctly configured for the Nova service user.')
self.assertIn(msg, str(ex))
def test_convert_from_rc_to_dict(self):
ctx = context.RequestContext(
111, 222, request_id='req-679033b7-1755-4929-bf85-eb3bfaef7e0b',
+2 -1
View File
@@ -279,7 +279,8 @@ class VTPMTest(test.NoDBTestCase):
We should create a new one.
"""
instance = objects.Instance()
instance = objects.Instance(flavor=objects.Flavor(),
image_ref=uuids.image)
instance.uuid = uuids.instance
instance.system_metadata = {}
mock_get_manager.return_value.store.return_value = uuids.secret
@@ -17241,6 +17241,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
self.flags(swtpm_enabled=True, group='libvirt')
self.useFixture(nova_fixtures.LibvirtImageBackendFixture())
mock_ensure_vtpm.return_value = uuids.secret, mock.sentinel.password
mock_get_info.return_value = hardware.InstanceInfo(
state=power_state.RUNNING)
@@ -21238,6 +21239,24 @@ class LibvirtConnTestCase(test.NoDBTestCase,
mock_host.return_value.create_secret.return_value, secret)
self.assertEqual('host', security)
@mock.patch('nova.context.get_nova_service_user_context')
@mock.patch('nova.crypto.ensure_vtpm_secret')
def test_get_or_create_secret_for_vtpm_security_deployment(
self, mock_ensure_secret, mock_get_ctxt):
# Test that vTPM secret security 'deployment' will use the Nova service
# user auth to create the secret in the key manager service.
mock_ensure_secret.return_value = uuids.secret, mock.sentinel.password
instance = objects.Instance(**self.test_instance)
instance.flavor.extra_specs = {'hw:tpm_secret_security': 'deployment'}
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
drvr._get_or_create_secret_for_vtpm(self.context, instance)
# We should use the service user context.
mock_get_ctxt.assert_called_once_with()
mock_ensure_secret.assert_called_once_with(
mock_get_ctxt.return_value, instance)
@mock.patch('nova.virt.disk.api.clean_lxc_namespace')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.get_info')
@mock.patch('nova.virt.disk.api.setup_container')
+9 -9
View File
@@ -134,6 +134,7 @@ from nova.virt.libvirt.volume import remotefs
from nova.virt.libvirt.volume import volume
from nova.virt import netutils
from nova.volume import cinder
from nova import vtpm
try:
# This is optional for unit testing but required at runtime. We check for
@@ -1793,7 +1794,7 @@ class LibvirtDriver(driver.ComputeDriver):
pass
if cleanup_instance_disks:
if hardware.get_tpm_secret_security_constraint(
if vtpm.get_instance_tpm_secret_security(
instance.flavor) == 'host':
self._host.delete_secret('vtpm', instance.uuid)
# Make sure that the instance directory files were successfully
@@ -1964,7 +1965,7 @@ class LibvirtDriver(driver.ComputeDriver):
# secret; the deletion of the instance directory and undefining of
# the domain will take care of the TPM files themselves
LOG.info('New flavor no longer requests vTPM; deleting secret.')
crypto.delete_vtpm_secret(context, instance)
vtpm.delete_secret(context, instance, flavor=instance.old_flavor)
# TODO(stephenfin): Fold this back into its only caller, cleanup_resize
def _cleanup_resize(self, context, instance, network_info):
@@ -4823,7 +4824,7 @@ class LibvirtDriver(driver.ComputeDriver):
# it to hand when generating the XML. This is slightly wasteful
# as we'll perform a redundant key manager API call later when
# we create the domain but the alternative is an ugly mess
crypto.ensure_vtpm_secret(context, instance)
self._get_or_create_secret_for_vtpm(context, instance)
xml = self._get_guest_xml(context, instance, network_info,
disk_info, image_meta,
@@ -8195,8 +8196,7 @@ class LibvirtDriver(driver.ComputeDriver):
For all others, it will call the key manager service API to get or
create a secret and then use it to create a libvirt secret.
"""
security = hardware.get_tpm_secret_security_constraint(
instance.flavor) or 'user'
security = vtpm.get_instance_tpm_secret_security(instance.flavor)
libvirt_secret = None
kwargs = {}
@@ -8210,8 +8210,8 @@ class LibvirtDriver(driver.ComputeDriver):
kwargs = {'ephemeral': False, 'private': False}
if libvirt_secret is None:
secret_uuid, passphrase = crypto.ensure_vtpm_secret(context,
instance)
secret_uuid, passphrase = vtpm.get_or_create_secret(
context, instance)
libvirt_secret = self._host.create_secret(
'vtpm', instance.uuid, password=passphrase, uuid=secret_uuid,
**kwargs)
@@ -12649,7 +12649,7 @@ class LibvirtDriver(driver.ComputeDriver):
elif new_vtpm_config:
# we've requested vTPM in the new flavor and didn't have one
# previously so we need to create a new secret
crypto.ensure_vtpm_secret(context, instance)
self._get_or_create_secret_for_vtpm(context, instance)
def finish_migration(
self,
@@ -12798,7 +12798,7 @@ class LibvirtDriver(driver.ComputeDriver):
# the instance gained a vTPM and must now lose it; delete the vTPM
# secret, knowing that libvirt will take care of everything else on
# the destination side
crypto.delete_vtpm_secret(context, instance)
vtpm.delete_secret(context, instance, flavor=instance.new_flavor)
def finish_revert_migration(
self,
+75
View File
@@ -0,0 +1,75 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
if ty.TYPE_CHECKING:
from nova import objects
from nova import context as nova_context
from nova import crypto
from nova.virt import hardware
def get_instance_tpm_secret_security(flavor):
secret_security = hardware.get_tpm_secret_security_constraint(flavor)
return secret_security or 'user'
def get_or_create_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
) -> tuple[str, bytes]:
"""Get or create a secret in the key manager service.
The secret UUID and passphrase will be returned.
"""
use_context = get_request_context(context, instance.flavor)
return crypto.ensure_vtpm_secret(use_context, instance)
def delete_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
flavor: ty.Optional['objects.Flavor'] = None,
) -> None:
"""Delete a secret from the key manager service for TPM.
A flavor can be optionally specified to use instead of instance.flavor.
This will be the case for:
* Reverting a no TPM => TPM resize because by the time we get here,
instance.flavor will have already been changed back to the old
flavor.
* Confirming a TPM => no TPM resize because by the time we get here,
instance.flavor will be set to the new flavor.
"""
flavor = flavor or instance.flavor
use_context = get_request_context(context, flavor)
crypto.delete_vtpm_secret(use_context, instance)
def get_request_context(
context: nova_context.RequestContext,
flavor: 'objects.Flavor',
) -> nova_context.RequestContext:
"""Obtain an appropriate RequestContext based on TPM secret security.
The normal user context should be passed in and if TPM secret security
policy for the instance is 'deployment', this will return a Nova service
user context. Otherwise, the normal user context that was passed in will be
returned.
"""
if get_instance_tpm_secret_security(flavor) == 'deployment':
return nova_context.get_nova_service_user_context()
return context