From 880019baafe23f46e59dee7debadf49b9177dc70 Mon Sep 17 00:00:00 2001 From: Artom Lifshitz Date: Mon, 17 Feb 2025 18:22:58 -0500 Subject: [PATCH] TPM: support instances with `deployment` secret security This means authenticating as the Nova service user to Barbican, so that the latter can make the secret owned by Nova. This requires the [service_user] config section to be set. An API block is also added to prevent resizes to change to or from the ``deployment`` TPM secret security mode. This is because doing so would require conversion of secret ownership to or from the user to the Nova service user. The change is complicated and will be implemented as a separate patch later in the series. Resizing from ``deployment`` TPM secret security mode to ``deployment`` TPM secret security mode is allowed. Related to blueprint vtpm-live-migration Change-Id: I007f9993451d9197f53dee9a5fd29daa307ebe6b Signed-off-by: melanie witt --- nova/compute/api.py | 38 +++ nova/compute/manager.py | 7 +- nova/conf/libvirt.py | 5 +- nova/context.py | 38 +++ nova/scheduler/request_filter.py | 3 + nova/test.py | 3 +- nova/tests/functional/libvirt/test_vtpm.py | 218 +++++++++++++++++- nova/tests/unit/compute/test_compute_mgr.py | 3 +- .../unit/scheduler/test_request_filter.py | 6 +- nova/tests/unit/test_context.py | 111 ++++++++- nova/tests/unit/test_crypto.py | 3 +- nova/tests/unit/virt/libvirt/test_driver.py | 19 ++ nova/virt/libvirt/driver.py | 18 +- nova/vtpm.py | 75 ++++++ 14 files changed, 518 insertions(+), 29 deletions(-) create mode 100644 nova/vtpm.py diff --git a/nova/compute/api.py b/nova/compute/api.py index c2f51dd2bc..2cdaab6e96 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -87,6 +87,7 @@ from nova import servicegroup from nova import utils from nova.virt import hardware from nova.volume import cinder +from nova import vtpm LOG = logging.getLogger(__name__) @@ -4197,6 +4198,39 @@ class API: return node + @staticmethod + def _validate_vtpm_secret_security(current_flavor, new_flavor, image_meta): + """Validate whether or not the requested resize is supported for vTPM. + + This will reject requests that would require secret ownership + conversions for the time being. Secret ownership changes will be + complex, so we plan to add support for it as a separate follow-up + enhancement. + + TODO(melwitt): Remove this when support for key manager service secret + ownership conversions is added. + """ + if not (hardware.get_vtpm_constraint(current_flavor, image_meta) and + hardware.get_vtpm_constraint(new_flavor, image_meta)): + # If either of the flavors has no vTPM at all, we don't need to + # validate anything because no secret ownership change would be + # involved. + return + + from_security = vtpm.get_instance_tpm_secret_security(current_flavor) + to_security = vtpm.get_instance_tpm_secret_security(new_flavor) + + if (from_security != to_security and + (from_security == 'deployment' or to_security == 'deployment')): + # Resizing to 'deployment' TPM secret security from any other + # mode or resizing to any other mode from 'deployment' TPM secret + # security would involve converting key manager service secret + # ownership from the user to the Nova service user or from the Nova + # service user to the user, and we don't support that yet. + msg = _("Resize between 'deployment' TPM secret security and " + "other TPM secret security modes is not supported.") + raise exception.OperationNotSupportedForVTPM(msg) + @block_shares_not_supported() # TODO(stephenfin): This logic would be so much easier to grok if we # finally split resize and cold migration into separate code paths @@ -4356,6 +4390,10 @@ class API: self._check_compute_service_for_mixed_instance( request_spec.numa_topology, min_comp_ver) + if not same_flavor: + self._validate_vtpm_secret_security(current_flavor, new_flavor, + instance.image_meta) + instance.task_state = task_states.RESIZE_PREP instance.progress = 0 instance.auto_disk_config = auto_disk_config or False diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 5d43ce370d..bb1c68650d 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -67,7 +67,6 @@ from nova.compute import vm_states from nova import conductor import nova.conf import nova.context -from nova import crypto from nova import exception from nova import exception_wrapper from nova.i18n import _ @@ -99,6 +98,7 @@ import nova.virt.node from nova.virt import storage_users from nova.virt import virtapi from nova.volume import cinder +from nova import vtpm CONF = nova.conf.CONF @@ -970,6 +970,9 @@ class ComputeManager(manager.Manager): self.host, action=fields.NotificationAction.DELETE, phase=fields.NotificationPhase.END, bdms=bdms) + def _complete_deletion_vtpm(self, context, instance): + vtpm.delete_secret(context, instance) + def _complete_deletion(self, context, instance): self._update_resource_tracker(context, instance) @@ -985,7 +988,7 @@ class ComputeManager(manager.Manager): self._delete_scheduler_instance_info(context, instance.uuid) # Delete the vTPM secret in the key manager service if needed. - crypto.delete_vtpm_secret(context, instance) + self._complete_deletion_vtpm(context, instance) def _validate_pinning_configuration(self, instances): if not self.driver.capabilities.get('supports_pcpus', False): diff --git a/nova/conf/libvirt.py b/nova/conf/libvirt.py index 4c6f47fc97..d65cdccbe5 100644 --- a/nova/conf/libvirt.py +++ b/nova/conf/libvirt.py @@ -1636,7 +1636,7 @@ Related options: * ``swtpm_user`` must also be set. """), cfg.ListOpt('supported_tpm_secret_security', - default=['user', 'host'], + default=['user', 'host', 'deployment'], help=""" The list of TPM security policies supported by this compute host. If a value is absent, it is not supported by this host, and any instance that requests it @@ -1652,6 +1652,9 @@ Possible values are: accessed by anyone else. The Libvirt secret is public and persistent. It can be read by anyone with sufficient access on the host. The instance can be live-migrated and automatically resumed after host reboot. +* ``deployment``: The Barbican secret is owned by the Nova service user. The + Libvirt secret is private and non-persistent. The instance can be + live-migrated and resumed automatically after host reboot. """), cfg.BoolOpt( 'use_default_aio_mode_for_volumes', diff --git a/nova/context.py b/nova/context.py index cf8dd78668..4b2c5b2eb5 100644 --- a/nova/context.py +++ b/nova/context.py @@ -28,13 +28,16 @@ from oslo_db.sqlalchemy import enginefacade from oslo_log import log as logging from oslo_utils import timeutils +import nova.conf from nova import exception from nova.i18n import _ from nova import objects from nova import policy +from nova import service_auth from nova import utils LOG = logging.getLogger(__name__) +CONF = nova.conf.CONF CELL_CACHE = {} # NOTE(melwitt): Used for the scatter-gather utility to indicate we timed out # waiting for a result from a cell. @@ -47,6 +50,14 @@ CELLS = [] CELL_TIMEOUT = 60 +def reset_globals(): + global CELL_CACHE + global CELLS + CELL_CACHE = {} + CELLS = [] + service_auth.reset_globals() + + class _ContextAuthPlugin(plugin.BaseAuthPlugin): """A keystoneauth auth plugin that uses the values from the Context. @@ -277,6 +288,33 @@ def get_admin_context(read_deleted="no"): overwrite=False) +def get_nova_service_user_context(): + """Get a context that will authenticate as the Nova service user. + + This will pull authentication parameters from the [] + section of the Nova configuration and load an auth plugin, then create + and return a RequestContext object containing that auth plugin. + + Then, code using the RequestContext will call its get_auth_plugin() method + to authenticate with another service. + """ + conf_group = nova.conf.service_token.SERVICE_USER_GROUP + + auth = service_auth.get_service_auth_plugin(conf_group) + session = service_auth.get_service_auth_session(conf_group) + + if auth is None or session is None: + raise exception.InvalidConfiguration( + 'Failed to load auth plugin or session from configuration. ' + f'Ensure the [{conf_group}] section of the Nova configuration ' + 'file is correctly configured for the Nova service user.') + + return RequestContext(user_id=auth.get_user_id(session), + project_id=auth.get_project_id(session), + roles=auth.get_access(session).role_names, + user_auth_plugin=auth, overwrite=False) + + def is_user_context(context): """Indicates if the request context is a normal user.""" if not context: diff --git a/nova/scheduler/request_filter.py b/nova/scheduler/request_filter.py index d3a1bba879..2103e4037c 100644 --- a/nova/scheduler/request_filter.py +++ b/nova/scheduler/request_filter.py @@ -486,6 +486,9 @@ def tpm_secret_security_filter( elif security == 'host': request_spec.root_required.add( os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST) + elif security == 'deployment': + request_spec.root_required.add( + os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT) else: # We can get here if the requested TPM secret security passed extra # spec validation but is not otherwise supported in the code at this diff --git a/nova/test.py b/nova/test.py index b22652e2cb..6e1cb4ae1c 100644 --- a/nova/test.py +++ b/nova/test.py @@ -243,8 +243,7 @@ class TestCase(base.BaseTestCase): # NOTE(danms): Reset the cached list of cells from nova.compute import api api.CELLS = [] - context.CELL_CACHE = {} - context.CELLS = [] + context.reset_globals() self.computes = {} self.cell_mappings = {} diff --git a/nova/tests/functional/libvirt/test_vtpm.py b/nova/tests/functional/libvirt/test_vtpm.py index 0d70caa877..c4b90b4939 100644 --- a/nova/tests/functional/libvirt/test_vtpm.py +++ b/nova/tests/functional/libvirt/test_vtpm.py @@ -168,6 +168,15 @@ class VTPMServersTest(base.ServersTestBase): self.key_mgr = crypto._get_key_manager() + # Mock the get_nova_service_user_context() method so we can + # differentiate request contexts for the 'nova' service user. + def fake_get_nova_service_user_context(): + return nova_context.RequestContext(user_id='nova') + + self.useFixture(fixtures.MockPatch( + 'nova.context.get_nova_service_user_context', + fake_get_nova_service_user_context)) + def _create_server_with_vtpm(self, secret_security=None, expected_state='ACTIVE'): extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'} @@ -183,14 +192,15 @@ class VTPMServersTest(base.ServersTestBase): # use the default flavor (i.e. one without vTPM extra specs) return self._create_server() - def assertInstanceHasSecret(self, server): + def assertInstanceHasSecret(self, server, user_id='fake'): + # user_id='fake' is the normal non-admin user. ctx = nova_context.get_admin_context() instance = objects.Instance.get_by_uuid(ctx, server['id']) self.assertIn('vtpm_secret_uuid', instance.system_metadata) self.assertEqual(1, len(self.key_mgr._passphrases)) - self.assertIn( - instance.system_metadata['vtpm_secret_uuid'], - self.key_mgr._passphrases) + secret_uuid = instance.system_metadata['vtpm_secret_uuid'] + self.assertIn(secret_uuid, self.key_mgr._passphrases) + self.assertEqual(user_id, self.key_mgr._contexts[secret_uuid].user_id) return instance.system_metadata['vtpm_secret_uuid'] def assertInstanceHasNoSecret(self, server): @@ -297,6 +307,41 @@ class VTPMServersTest(base.ServersTestBase): self.assertNotIn(instance.system_metadata['vtpm_secret_uuid'], conn._secrets) + def test_create_server_secret_security_deployment(self): + self.flags( + supported_tpm_secret_security=['deployment'], group='libvirt') + self.start_compute(hostname='tpm-host') + compute = self.computes['tpm-host'] + + # ensure we are reporting the correct traits + traits = self._get_provider_traits(self.compute_rp_uuids['tpm-host']) + self.assertIn( + 'COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT', traits) + + # create a server with vTPM + server = self._create_server_with_vtpm(secret_security='deployment') + + # ensure our instance's system_metadata field and key manager inventory + # is correct + self.assertInstanceHasSecret(server, user_id='nova') + + # ensure the libvirt secret is defined correctly + ctx = nova_context.get_admin_context() + instance = objects.Instance.get_by_uuid(ctx, server['id']) + self._assert_libvirt_had_secret( + compute, instance.system_metadata['vtpm_secret_uuid']) + + # Now delete the server, this delete will fail if the secret ownership + # does not match. And we verified the secret owner is 'nova' above. + self._delete_server(server) + + # ensure we deleted the key and undefined the secret now that we no + # longer need it + self.assertEqual(0, len(self.key_mgr._passphrases)) + conn = compute.driver._host.get_connection() + self.assertNotIn(instance.system_metadata['vtpm_secret_uuid'], + conn._secrets) + def test_suspend_resume_server(self): self.start_compute() @@ -409,7 +454,22 @@ class VTPMServersTest(base.ServersTestBase): self._test_resize_revert_server__vtpm_to_vtpm( extra_specs=extra_specs) - def test_resize_server__no_vtpm_to_vtpm(self): + @ddt.data(None, 'user', 'host', 'deployment') + def test_resize_server__no_vtpm_to_vtpm(self, secret_security): + """Resize a server from a flavor without TPM to a flavor with TPM. + + This tests a scenario where the instance does not have a TPM before + the resize but *does* have a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + """ for host in ('test_compute0', 'test_compute1'): self.start_compute(host) @@ -423,6 +483,8 @@ class VTPMServersTest(base.ServersTestBase): # create a flavor with vTPM extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'} + if secret_security is not None: + extra_specs['hw:tpm_secret_security'] = secret_security flavor_id = self._create_flavor(extra_spec=extra_specs) # TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should @@ -436,7 +498,8 @@ class VTPMServersTest(base.ServersTestBase): # ensure our instance's system_metadata field and key manager inventory # is updated to reflect the new vTPM requirement - self.assertInstanceHasSecret(server) + user_id = 'nova' if secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) # revert the instance rather than confirming it, and ensure the secret # is correctly cleaned up @@ -453,16 +516,32 @@ class VTPMServersTest(base.ServersTestBase): # ensure we delete the new key since we no longer need it self.assertInstanceHasNoSecret(server) - def test_resize_server__vtpm_to_no_vtpm(self): + @ddt.data(None, 'user', 'host', 'deployment') + def test_resize_server__vtpm_to_no_vtpm(self, secret_security): + """Resize a server from a flavor with TPM to a flavor without TPM. + + This tests a scenario where the instance has a TPM before the resize + but does *not* have a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + """ for host in ('test_compute0', 'test_compute1'): self.start_compute(host) # create a server with vTPM - server = self._create_server_with_vtpm() + server = self._create_server_with_vtpm(secret_security=secret_security) self.addCleanup(self._delete_server, server) # ensure our instance's system_metadata field is correct - self.assertInstanceHasSecret(server) + user_id = 'nova' if secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) # create a flavor without vTPM flavor_id = self._create_flavor() @@ -478,7 +557,8 @@ class VTPMServersTest(base.ServersTestBase): # ensure we still have the key for the vTPM device in storage in case # we revert - self.assertInstanceHasSecret(server) + user_id = 'nova' if secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) # confirm the instance and ensure the secret is correctly cleaned up @@ -488,13 +568,129 @@ class VTPMServersTest(base.ServersTestBase): 'nova.virt.libvirt.driver.LibvirtDriver' '.migrate_disk_and_power_off', return_value='{}', ): - # revert back to the old flavor *with* vTPM + # confirm to the new flavor *without* vTPM server = self._confirm_resize(server) # ensure we have finally deleted the key for the vTPM device since # there is no going back now self.assertInstanceHasNoSecret(server) + @ddt.unpack + @ddt.data( + (None, 'deployment'), ('deployment', None), + ('user', 'deployment'), ('deployment', 'user'), + ('host', 'deployment'), ('deployment', 'host')) + def test_resize_vtpm_server_secret_security_deployment_unsupported( + self, from_secret_security, to_secret_security): + """Resizes that require secret ownership changes are not allowed. + + This tests a scenario where the instance has a TPM before the resize + and has a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + + Until a later patch in the series adds code to convert to and from a + user-owned secret <=> Nova service user owned secret, we will want to + reject requests that would require conversion. Otherwise, these + attempts will fail with secret access permission errors. + """ + for host in ('test_compute0', 'test_compute1'): + self.start_compute(host) + + # create a server with vTPM with from_secret_security + server = self._create_server_with_vtpm( + secret_security=from_secret_security) + self.addCleanup(self._delete_server, server) + + # ensure our instance's system_metadata field is correct + user_id = 'nova' if from_secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) + + # create a flavor with to_secret_security + extra_specs = {'hw:tpm_version': '1.2', + 'hw:tpm_model': 'tpm-tis'} + if to_secret_security is not None: + extra_specs['hw:tpm_secret_security'] = to_secret_security + flavor_id = self._create_flavor(extra_spec=extra_specs) + + with mock.patch( + 'nova.virt.libvirt.driver.LibvirtDriver' + '.migrate_disk_and_power_off', return_value='{}', + ): + ex = self.assertRaises( + client.OpenStackApiException, self._resize_server, server, + flavor_id=flavor_id) + self.assertEqual(400, ex.response.status_code) + self.assertIn( + "Resize between 'deployment' TPM secret security and " + "other TPM secret security modes is not supported.", + str(ex)) + + @ddt.unpack + @ddt.data( + (None, None), + ('user', 'user'), + ('host', 'host'), + ('deployment', 'deployment'), + (None, 'user'), ('user', None), + (None, 'host'), ('host', None), + ('user', 'host'), ('host', 'user')) + def test_resize_vtpm_server_secret_security_deployment_supported( + self, from_secret_security, to_secret_security): + """Resizes that do not require secret ownership changes are allowed. + + This tests a scenario where the instance has a TPM before the resize + and has a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + + A resize from 'deployment' to 'deployment' is allowed because in both + cases the key manager service secret will be owned by the Nova service + user and no ownership change will be needed. + """ + for host in ('test_compute0', 'test_compute1'): + self.start_compute(host) + + # create a server with vTPM with from_secret_security + server = self._create_server_with_vtpm( + secret_security=from_secret_security) + self.addCleanup(self._delete_server, server) + + # ensure our instance's system_metadata field is correct + user_id = 'nova' if from_secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) + + # create a flavor with to_secret_security + extra_specs = {'hw:tpm_version': '1.2', + 'hw:tpm_model': 'tpm-tis'} + if to_secret_security is not None: + extra_specs['hw:tpm_secret_security'] = to_secret_security + flavor_id = self._create_flavor(extra_spec=extra_specs) + + with mock.patch( + 'nova.virt.libvirt.driver.LibvirtDriver' + '.migrate_disk_and_power_off', return_value='{}', + ): + # resize should succeed + self._resize_server(server, flavor_id=flavor_id) + + # And the secret should still be as expected. + self.assertInstanceHasSecret(server, user_id=user_id) + def test_create_server_secret_security_unsupported(self): """Test when a not supported TPM secret security mode is requested diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py index ceb04aa336..fab9a9719a 100644 --- a/nova/tests/unit/compute/test_compute_mgr.py +++ b/nova/tests/unit/compute/test_compute_mgr.py @@ -1794,7 +1794,8 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, self, reclaim_instance_interval, mock_delete_vtpm, mock_delete_alloc): self.flags(reclaim_instance_interval=reclaim_instance_interval) - instance = objects.Instance(uuid=uuids.instance) + instance = objects.Instance(uuid=uuids.instance, + flavor=objects.Flavor()) with mock.patch.multiple( self.compute, diff --git a/nova/tests/unit/scheduler/test_request_filter.py b/nova/tests/unit/scheduler/test_request_filter.py index b04dfaaa3f..0163bd35d9 100644 --- a/nova/tests/unit/scheduler/test_request_filter.py +++ b/nova/tests/unit/scheduler/test_request_filter.py @@ -752,8 +752,12 @@ class TestRequestFilter(test.NoDBTestCase): @ddt.data( ('flavor', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER), ('flavor', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST), + ('flavor', + 'deployment', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT), ('image', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER), - ('image', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST)) + ('image', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST), + ('image', + 'deployment', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT)) @ddt.unpack def test_tpm_secret_security_filter(self, source, secret_security, trait): # First ensure that tpm_secret_security_filter is included diff --git a/nova/tests/unit/test_context.py b/nova/tests/unit/test_context.py index fb6b9fe517..a387cf201c 100644 --- a/nova/tests/unit/test_context.py +++ b/nova/tests/unit/test_context.py @@ -15,8 +15,12 @@ import threading from unittest import mock - +import ddt import futurist.waiters +from keystoneauth1.fixture import plugin as ks_plugin_fixture +from keystoneauth1 import loading as ks_loading +from oslo_config import cfg +from oslo_config import fixture as config_fixture from oslo_context import context as o_context from oslo_context import fixture as o_fixture from oslo_utils.fixture import uuidsentinel as uuids @@ -24,10 +28,28 @@ from oslo_utils.fixture import uuidsentinel as uuids from nova import context from nova import exception from nova import objects +from nova import service_auth from nova import test from nova import utils +class TestPluginWithAccess(ks_plugin_fixture.TestPlugin): + + def get_access(self, session): + return mock.Mock(role_names=['service']) + + +class LoadingFixtureWithAccess(ks_plugin_fixture.LoadingFixture): + + def create_plugin(self): + return TestPluginWithAccess( + token=self.token, + endpoint=self.endpoint, + user_id=self.user_id, + project_id=self.project_id) + + +@ddt.ddt class ContextTestCase(test.NoDBTestCase): # NOTE(danms): Avoid any cells setup by claiming we will # do things ourselves. @@ -178,6 +200,93 @@ class ContextTestCase(test.NoDBTestCase): context.get_admin_context() self.assertIs(o_context.get_current(), ctx1) + @mock.patch('keystoneauth1.loading.load_auth_from_conf_options') + @mock.patch('keystoneauth1.loading.load_session_from_conf_options') + def test_get_nova_service_user_context(self, mock_load_session, + mock_load_auth): + """Verify the basic get of a Nova service user context.""" + # Get a Nova service user context. + ctxt = context.get_nova_service_user_context() + + # Verify we called the loading functions as expected. + mock_load_auth.assert_called_once_with(context.CONF, 'service_user') + mock_load_session.assert_called_once_with(context.CONF, 'service_user', + auth=None) + mock_plugin = mock_load_auth.return_value + mock_session = mock_load_session.return_value + + # Verify we called the user_id and project_id getting methods as + # expected. + mock_plugin.get_user_id.assert_called_once_with(mock_session) + mock_plugin.get_project_id.assert_called_once_with(mock_session) + + # Verify the RequestContext attributes got set as expected. + self.assertEqual(mock_plugin.get_user_id.return_value, ctxt.user_id) + self.assertEqual(mock_plugin.get_project_id.return_value, + ctxt.project_id) + self.assertEqual(mock_plugin, ctxt.user_auth_plugin) + + # Get another context to verify we create the context with + # overwrite=False to avoid overwriting the thread local storage. + with mock.patch('nova.context.RequestContext') as mock_context: + ctxt = context.get_nova_service_user_context() + mock_context.assert_called_once_with( + user_id=mock_plugin.get_user_id.return_value, + project_id=mock_plugin.get_project_id.return_value, + roles=mock_plugin.get_access.return_value.role_names, + user_auth_plugin=mock_plugin, overwrite=False) + + def test_get_nova_service_user_context_user_project(self): + """Verify the user_id and project_id get set to what we expect.""" + # Use a new config fixture so that the options we register here will + # get unregistered after the test. + conf_fixture = self.useFixture( + config_fixture.Config(conf=cfg.ConfigOpts())) + + # Register the auth and session options in the [service_user] + # config section. + oslo_opts = (ks_loading.get_auth_common_conf_options() + + ks_loading.get_session_conf_options() + + ks_loading.get_auth_plugin_conf_options('password')) + conf_fixture.register_opts(oslo_opts, group='service_user') + + # Fill in typical values for the Nova service user. + conf_fixture.config( + group='service_user', auth_type='password', username='nova', + project_name='service', auth_url='http://anyhost/auth') + + # Use the plugin loading fixture from keystoneauth in order to skip all + # of the real authentication steps of calling Keystone and set expected + # user_id and project_id values. + self.useFixture(LoadingFixtureWithAccess(user_id=uuids.nova, + project_id=uuids.service)) + + # Verify we get the expected user_id and project_id in the + # RequestContext. + with mock.patch.object(service_auth, 'CONF', conf_fixture.conf): + ctxt = context.get_nova_service_user_context() + self.assertEqual(uuids.nova, ctxt.user_id) + self.assertEqual(uuids.service, ctxt.project_id) + self.assertEqual(['service'], ctxt.roles) + + @mock.patch('keystoneauth1.loading.load_auth_from_conf_options') + @mock.patch('keystoneauth1.loading.load_session_from_conf_options') + @ddt.data('auth', 'session') + def test_get_nova_service_user_context_load_fail( + self, to_fail, mock_load_session, mock_load_auth): + if to_fail == 'auth': + mock_load_auth.return_value = None + elif to_fail == 'session': + mock_load_session.return_value = None + + ex = self.assertRaises(exception.InvalidConfiguration, + context.get_nova_service_user_context) + msg = ( + 'Failed to load auth plugin or session from configuration. ' + 'Ensure the [service_user] section of the Nova configuration ' + 'file is correctly configured for the Nova service user.') + self.assertIn(msg, str(ex)) + def test_convert_from_rc_to_dict(self): ctx = context.RequestContext( 111, 222, request_id='req-679033b7-1755-4929-bf85-eb3bfaef7e0b', diff --git a/nova/tests/unit/test_crypto.py b/nova/tests/unit/test_crypto.py index 2cdde1697b..2bf7206be3 100644 --- a/nova/tests/unit/test_crypto.py +++ b/nova/tests/unit/test_crypto.py @@ -279,7 +279,8 @@ class VTPMTest(test.NoDBTestCase): We should create a new one. """ - instance = objects.Instance() + instance = objects.Instance(flavor=objects.Flavor(), + image_ref=uuids.image) instance.uuid = uuids.instance instance.system_metadata = {} mock_get_manager.return_value.store.return_value = uuids.secret diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index 7ba862e4d5..44ee4cf2ca 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -17241,6 +17241,7 @@ class LibvirtConnTestCase(test.NoDBTestCase, self.flags(swtpm_enabled=True, group='libvirt') self.useFixture(nova_fixtures.LibvirtImageBackendFixture()) + mock_ensure_vtpm.return_value = uuids.secret, mock.sentinel.password mock_get_info.return_value = hardware.InstanceInfo( state=power_state.RUNNING) @@ -21238,6 +21239,24 @@ class LibvirtConnTestCase(test.NoDBTestCase, mock_host.return_value.create_secret.return_value, secret) self.assertEqual('host', security) + @mock.patch('nova.context.get_nova_service_user_context') + @mock.patch('nova.crypto.ensure_vtpm_secret') + def test_get_or_create_secret_for_vtpm_security_deployment( + self, mock_ensure_secret, mock_get_ctxt): + # Test that vTPM secret security 'deployment' will use the Nova service + # user auth to create the secret in the key manager service. + mock_ensure_secret.return_value = uuids.secret, mock.sentinel.password + instance = objects.Instance(**self.test_instance) + instance.flavor.extra_specs = {'hw:tpm_secret_security': 'deployment'} + + drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + drvr._get_or_create_secret_for_vtpm(self.context, instance) + + # We should use the service user context. + mock_get_ctxt.assert_called_once_with() + mock_ensure_secret.assert_called_once_with( + mock_get_ctxt.return_value, instance) + @mock.patch('nova.virt.disk.api.clean_lxc_namespace') @mock.patch('nova.virt.libvirt.driver.LibvirtDriver.get_info') @mock.patch('nova.virt.disk.api.setup_container') diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 2ae028acbf..131c6e9e57 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -134,6 +134,7 @@ from nova.virt.libvirt.volume import remotefs from nova.virt.libvirt.volume import volume from nova.virt import netutils from nova.volume import cinder +from nova import vtpm try: # This is optional for unit testing but required at runtime. We check for @@ -1793,7 +1794,7 @@ class LibvirtDriver(driver.ComputeDriver): pass if cleanup_instance_disks: - if hardware.get_tpm_secret_security_constraint( + if vtpm.get_instance_tpm_secret_security( instance.flavor) == 'host': self._host.delete_secret('vtpm', instance.uuid) # Make sure that the instance directory files were successfully @@ -1964,7 +1965,7 @@ class LibvirtDriver(driver.ComputeDriver): # secret; the deletion of the instance directory and undefining of # the domain will take care of the TPM files themselves LOG.info('New flavor no longer requests vTPM; deleting secret.') - crypto.delete_vtpm_secret(context, instance) + vtpm.delete_secret(context, instance, flavor=instance.old_flavor) # TODO(stephenfin): Fold this back into its only caller, cleanup_resize def _cleanup_resize(self, context, instance, network_info): @@ -4823,7 +4824,7 @@ class LibvirtDriver(driver.ComputeDriver): # it to hand when generating the XML. This is slightly wasteful # as we'll perform a redundant key manager API call later when # we create the domain but the alternative is an ugly mess - crypto.ensure_vtpm_secret(context, instance) + self._get_or_create_secret_for_vtpm(context, instance) xml = self._get_guest_xml(context, instance, network_info, disk_info, image_meta, @@ -8195,8 +8196,7 @@ class LibvirtDriver(driver.ComputeDriver): For all others, it will call the key manager service API to get or create a secret and then use it to create a libvirt secret. """ - security = hardware.get_tpm_secret_security_constraint( - instance.flavor) or 'user' + security = vtpm.get_instance_tpm_secret_security(instance.flavor) libvirt_secret = None kwargs = {} @@ -8210,8 +8210,8 @@ class LibvirtDriver(driver.ComputeDriver): kwargs = {'ephemeral': False, 'private': False} if libvirt_secret is None: - secret_uuid, passphrase = crypto.ensure_vtpm_secret(context, - instance) + secret_uuid, passphrase = vtpm.get_or_create_secret( + context, instance) libvirt_secret = self._host.create_secret( 'vtpm', instance.uuid, password=passphrase, uuid=secret_uuid, **kwargs) @@ -12649,7 +12649,7 @@ class LibvirtDriver(driver.ComputeDriver): elif new_vtpm_config: # we've requested vTPM in the new flavor and didn't have one # previously so we need to create a new secret - crypto.ensure_vtpm_secret(context, instance) + self._get_or_create_secret_for_vtpm(context, instance) def finish_migration( self, @@ -12798,7 +12798,7 @@ class LibvirtDriver(driver.ComputeDriver): # the instance gained a vTPM and must now lose it; delete the vTPM # secret, knowing that libvirt will take care of everything else on # the destination side - crypto.delete_vtpm_secret(context, instance) + vtpm.delete_secret(context, instance, flavor=instance.new_flavor) def finish_revert_migration( self, diff --git a/nova/vtpm.py b/nova/vtpm.py new file mode 100644 index 0000000000..3a6b351285 --- /dev/null +++ b/nova/vtpm.py @@ -0,0 +1,75 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import typing as ty + +if ty.TYPE_CHECKING: + from nova import objects + +from nova import context as nova_context +from nova import crypto +from nova.virt import hardware + + +def get_instance_tpm_secret_security(flavor): + secret_security = hardware.get_tpm_secret_security_constraint(flavor) + return secret_security or 'user' + + +def get_or_create_secret( + context: nova_context.RequestContext, + instance: 'objects.Instance', +) -> tuple[str, bytes]: + """Get or create a secret in the key manager service. + + The secret UUID and passphrase will be returned. + """ + use_context = get_request_context(context, instance.flavor) + return crypto.ensure_vtpm_secret(use_context, instance) + + +def delete_secret( + context: nova_context.RequestContext, + instance: 'objects.Instance', + flavor: ty.Optional['objects.Flavor'] = None, +) -> None: + """Delete a secret from the key manager service for TPM. + + A flavor can be optionally specified to use instead of instance.flavor. + This will be the case for: + + * Reverting a no TPM => TPM resize because by the time we get here, + instance.flavor will have already been changed back to the old + flavor. + + * Confirming a TPM => no TPM resize because by the time we get here, + instance.flavor will be set to the new flavor. + """ + flavor = flavor or instance.flavor + use_context = get_request_context(context, flavor) + crypto.delete_vtpm_secret(use_context, instance) + + +def get_request_context( + context: nova_context.RequestContext, + flavor: 'objects.Flavor', +) -> nova_context.RequestContext: + """Obtain an appropriate RequestContext based on TPM secret security. + + The normal user context should be passed in and if TPM secret security + policy for the instance is 'deployment', this will return a Nova service + user context. Otherwise, the normal user context that was passed in will be + returned. + """ + if get_instance_tpm_secret_security(flavor) == 'deployment': + return nova_context.get_nova_service_user_context() + return context