diff --git a/nova/compute/api.py b/nova/compute/api.py index c2f51dd2bc..2cdaab6e96 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -87,6 +87,7 @@ from nova import servicegroup from nova import utils from nova.virt import hardware from nova.volume import cinder +from nova import vtpm LOG = logging.getLogger(__name__) @@ -4197,6 +4198,39 @@ class API: return node + @staticmethod + def _validate_vtpm_secret_security(current_flavor, new_flavor, image_meta): + """Validate whether or not the requested resize is supported for vTPM. + + This will reject requests that would require secret ownership + conversions for the time being. Secret ownership changes will be + complex, so we plan to add support for it as a separate follow-up + enhancement. + + TODO(melwitt): Remove this when support for key manager service secret + ownership conversions is added. + """ + if not (hardware.get_vtpm_constraint(current_flavor, image_meta) and + hardware.get_vtpm_constraint(new_flavor, image_meta)): + # If either of the flavors has no vTPM at all, we don't need to + # validate anything because no secret ownership change would be + # involved. + return + + from_security = vtpm.get_instance_tpm_secret_security(current_flavor) + to_security = vtpm.get_instance_tpm_secret_security(new_flavor) + + if (from_security != to_security and + (from_security == 'deployment' or to_security == 'deployment')): + # Resizing to 'deployment' TPM secret security from any other + # mode or resizing to any other mode from 'deployment' TPM secret + # security would involve converting key manager service secret + # ownership from the user to the Nova service user or from the Nova + # service user to the user, and we don't support that yet. + msg = _("Resize between 'deployment' TPM secret security and " + "other TPM secret security modes is not supported.") + raise exception.OperationNotSupportedForVTPM(msg) + @block_shares_not_supported() # TODO(stephenfin): This logic would be so much easier to grok if we # finally split resize and cold migration into separate code paths @@ -4356,6 +4390,10 @@ class API: self._check_compute_service_for_mixed_instance( request_spec.numa_topology, min_comp_ver) + if not same_flavor: + self._validate_vtpm_secret_security(current_flavor, new_flavor, + instance.image_meta) + instance.task_state = task_states.RESIZE_PREP instance.progress = 0 instance.auto_disk_config = auto_disk_config or False diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 5d43ce370d..bb1c68650d 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -67,7 +67,6 @@ from nova.compute import vm_states from nova import conductor import nova.conf import nova.context -from nova import crypto from nova import exception from nova import exception_wrapper from nova.i18n import _ @@ -99,6 +98,7 @@ import nova.virt.node from nova.virt import storage_users from nova.virt import virtapi from nova.volume import cinder +from nova import vtpm CONF = nova.conf.CONF @@ -970,6 +970,9 @@ class ComputeManager(manager.Manager): self.host, action=fields.NotificationAction.DELETE, phase=fields.NotificationPhase.END, bdms=bdms) + def _complete_deletion_vtpm(self, context, instance): + vtpm.delete_secret(context, instance) + def _complete_deletion(self, context, instance): self._update_resource_tracker(context, instance) @@ -985,7 +988,7 @@ class ComputeManager(manager.Manager): self._delete_scheduler_instance_info(context, instance.uuid) # Delete the vTPM secret in the key manager service if needed. - crypto.delete_vtpm_secret(context, instance) + self._complete_deletion_vtpm(context, instance) def _validate_pinning_configuration(self, instances): if not self.driver.capabilities.get('supports_pcpus', False): diff --git a/nova/conf/libvirt.py b/nova/conf/libvirt.py index 4c6f47fc97..d65cdccbe5 100644 --- a/nova/conf/libvirt.py +++ b/nova/conf/libvirt.py @@ -1636,7 +1636,7 @@ Related options: * ``swtpm_user`` must also be set. """), cfg.ListOpt('supported_tpm_secret_security', - default=['user', 'host'], + default=['user', 'host', 'deployment'], help=""" The list of TPM security policies supported by this compute host. If a value is absent, it is not supported by this host, and any instance that requests it @@ -1652,6 +1652,9 @@ Possible values are: accessed by anyone else. The Libvirt secret is public and persistent. It can be read by anyone with sufficient access on the host. The instance can be live-migrated and automatically resumed after host reboot. +* ``deployment``: The Barbican secret is owned by the Nova service user. The + Libvirt secret is private and non-persistent. The instance can be + live-migrated and resumed automatically after host reboot. """), cfg.BoolOpt( 'use_default_aio_mode_for_volumes', diff --git a/nova/context.py b/nova/context.py index cf8dd78668..4b2c5b2eb5 100644 --- a/nova/context.py +++ b/nova/context.py @@ -28,13 +28,16 @@ from oslo_db.sqlalchemy import enginefacade from oslo_log import log as logging from oslo_utils import timeutils +import nova.conf from nova import exception from nova.i18n import _ from nova import objects from nova import policy +from nova import service_auth from nova import utils LOG = logging.getLogger(__name__) +CONF = nova.conf.CONF CELL_CACHE = {} # NOTE(melwitt): Used for the scatter-gather utility to indicate we timed out # waiting for a result from a cell. @@ -47,6 +50,14 @@ CELLS = [] CELL_TIMEOUT = 60 +def reset_globals(): + global CELL_CACHE + global CELLS + CELL_CACHE = {} + CELLS = [] + service_auth.reset_globals() + + class _ContextAuthPlugin(plugin.BaseAuthPlugin): """A keystoneauth auth plugin that uses the values from the Context. @@ -277,6 +288,33 @@ def get_admin_context(read_deleted="no"): overwrite=False) +def get_nova_service_user_context(): + """Get a context that will authenticate as the Nova service user. + + This will pull authentication parameters from the [] + section of the Nova configuration and load an auth plugin, then create + and return a RequestContext object containing that auth plugin. + + Then, code using the RequestContext will call its get_auth_plugin() method + to authenticate with another service. + """ + conf_group = nova.conf.service_token.SERVICE_USER_GROUP + + auth = service_auth.get_service_auth_plugin(conf_group) + session = service_auth.get_service_auth_session(conf_group) + + if auth is None or session is None: + raise exception.InvalidConfiguration( + 'Failed to load auth plugin or session from configuration. ' + f'Ensure the [{conf_group}] section of the Nova configuration ' + 'file is correctly configured for the Nova service user.') + + return RequestContext(user_id=auth.get_user_id(session), + project_id=auth.get_project_id(session), + roles=auth.get_access(session).role_names, + user_auth_plugin=auth, overwrite=False) + + def is_user_context(context): """Indicates if the request context is a normal user.""" if not context: diff --git a/nova/scheduler/request_filter.py b/nova/scheduler/request_filter.py index d3a1bba879..2103e4037c 100644 --- a/nova/scheduler/request_filter.py +++ b/nova/scheduler/request_filter.py @@ -486,6 +486,9 @@ def tpm_secret_security_filter( elif security == 'host': request_spec.root_required.add( os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST) + elif security == 'deployment': + request_spec.root_required.add( + os_traits.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT) else: # We can get here if the requested TPM secret security passed extra # spec validation but is not otherwise supported in the code at this diff --git a/nova/test.py b/nova/test.py index 6b19c89591..d785efed27 100644 --- a/nova/test.py +++ b/nova/test.py @@ -244,8 +244,7 @@ class TestCase(base.BaseTestCase): # NOTE(danms): Reset the cached list of cells from nova.compute import api api.CELLS = [] - context.CELL_CACHE = {} - context.CELLS = [] + context.reset_globals() self.computes = {} self.cell_mappings = {} diff --git a/nova/tests/functional/libvirt/test_vtpm.py b/nova/tests/functional/libvirt/test_vtpm.py index 0d70caa877..c4b90b4939 100644 --- a/nova/tests/functional/libvirt/test_vtpm.py +++ b/nova/tests/functional/libvirt/test_vtpm.py @@ -168,6 +168,15 @@ class VTPMServersTest(base.ServersTestBase): self.key_mgr = crypto._get_key_manager() + # Mock the get_nova_service_user_context() method so we can + # differentiate request contexts for the 'nova' service user. + def fake_get_nova_service_user_context(): + return nova_context.RequestContext(user_id='nova') + + self.useFixture(fixtures.MockPatch( + 'nova.context.get_nova_service_user_context', + fake_get_nova_service_user_context)) + def _create_server_with_vtpm(self, secret_security=None, expected_state='ACTIVE'): extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'} @@ -183,14 +192,15 @@ class VTPMServersTest(base.ServersTestBase): # use the default flavor (i.e. one without vTPM extra specs) return self._create_server() - def assertInstanceHasSecret(self, server): + def assertInstanceHasSecret(self, server, user_id='fake'): + # user_id='fake' is the normal non-admin user. ctx = nova_context.get_admin_context() instance = objects.Instance.get_by_uuid(ctx, server['id']) self.assertIn('vtpm_secret_uuid', instance.system_metadata) self.assertEqual(1, len(self.key_mgr._passphrases)) - self.assertIn( - instance.system_metadata['vtpm_secret_uuid'], - self.key_mgr._passphrases) + secret_uuid = instance.system_metadata['vtpm_secret_uuid'] + self.assertIn(secret_uuid, self.key_mgr._passphrases) + self.assertEqual(user_id, self.key_mgr._contexts[secret_uuid].user_id) return instance.system_metadata['vtpm_secret_uuid'] def assertInstanceHasNoSecret(self, server): @@ -297,6 +307,41 @@ class VTPMServersTest(base.ServersTestBase): self.assertNotIn(instance.system_metadata['vtpm_secret_uuid'], conn._secrets) + def test_create_server_secret_security_deployment(self): + self.flags( + supported_tpm_secret_security=['deployment'], group='libvirt') + self.start_compute(hostname='tpm-host') + compute = self.computes['tpm-host'] + + # ensure we are reporting the correct traits + traits = self._get_provider_traits(self.compute_rp_uuids['tpm-host']) + self.assertIn( + 'COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT', traits) + + # create a server with vTPM + server = self._create_server_with_vtpm(secret_security='deployment') + + # ensure our instance's system_metadata field and key manager inventory + # is correct + self.assertInstanceHasSecret(server, user_id='nova') + + # ensure the libvirt secret is defined correctly + ctx = nova_context.get_admin_context() + instance = objects.Instance.get_by_uuid(ctx, server['id']) + self._assert_libvirt_had_secret( + compute, instance.system_metadata['vtpm_secret_uuid']) + + # Now delete the server, this delete will fail if the secret ownership + # does not match. And we verified the secret owner is 'nova' above. + self._delete_server(server) + + # ensure we deleted the key and undefined the secret now that we no + # longer need it + self.assertEqual(0, len(self.key_mgr._passphrases)) + conn = compute.driver._host.get_connection() + self.assertNotIn(instance.system_metadata['vtpm_secret_uuid'], + conn._secrets) + def test_suspend_resume_server(self): self.start_compute() @@ -409,7 +454,22 @@ class VTPMServersTest(base.ServersTestBase): self._test_resize_revert_server__vtpm_to_vtpm( extra_specs=extra_specs) - def test_resize_server__no_vtpm_to_vtpm(self): + @ddt.data(None, 'user', 'host', 'deployment') + def test_resize_server__no_vtpm_to_vtpm(self, secret_security): + """Resize a server from a flavor without TPM to a flavor with TPM. + + This tests a scenario where the instance does not have a TPM before + the resize but *does* have a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + """ for host in ('test_compute0', 'test_compute1'): self.start_compute(host) @@ -423,6 +483,8 @@ class VTPMServersTest(base.ServersTestBase): # create a flavor with vTPM extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'} + if secret_security is not None: + extra_specs['hw:tpm_secret_security'] = secret_security flavor_id = self._create_flavor(extra_spec=extra_specs) # TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should @@ -436,7 +498,8 @@ class VTPMServersTest(base.ServersTestBase): # ensure our instance's system_metadata field and key manager inventory # is updated to reflect the new vTPM requirement - self.assertInstanceHasSecret(server) + user_id = 'nova' if secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) # revert the instance rather than confirming it, and ensure the secret # is correctly cleaned up @@ -453,16 +516,32 @@ class VTPMServersTest(base.ServersTestBase): # ensure we delete the new key since we no longer need it self.assertInstanceHasNoSecret(server) - def test_resize_server__vtpm_to_no_vtpm(self): + @ddt.data(None, 'user', 'host', 'deployment') + def test_resize_server__vtpm_to_no_vtpm(self, secret_security): + """Resize a server from a flavor with TPM to a flavor without TPM. + + This tests a scenario where the instance has a TPM before the resize + but does *not* have a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + """ for host in ('test_compute0', 'test_compute1'): self.start_compute(host) # create a server with vTPM - server = self._create_server_with_vtpm() + server = self._create_server_with_vtpm(secret_security=secret_security) self.addCleanup(self._delete_server, server) # ensure our instance's system_metadata field is correct - self.assertInstanceHasSecret(server) + user_id = 'nova' if secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) # create a flavor without vTPM flavor_id = self._create_flavor() @@ -478,7 +557,8 @@ class VTPMServersTest(base.ServersTestBase): # ensure we still have the key for the vTPM device in storage in case # we revert - self.assertInstanceHasSecret(server) + user_id = 'nova' if secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) # confirm the instance and ensure the secret is correctly cleaned up @@ -488,13 +568,129 @@ class VTPMServersTest(base.ServersTestBase): 'nova.virt.libvirt.driver.LibvirtDriver' '.migrate_disk_and_power_off', return_value='{}', ): - # revert back to the old flavor *with* vTPM + # confirm to the new flavor *without* vTPM server = self._confirm_resize(server) # ensure we have finally deleted the key for the vTPM device since # there is no going back now self.assertInstanceHasNoSecret(server) + @ddt.unpack + @ddt.data( + (None, 'deployment'), ('deployment', None), + ('user', 'deployment'), ('deployment', 'user'), + ('host', 'deployment'), ('deployment', 'host')) + def test_resize_vtpm_server_secret_security_deployment_unsupported( + self, from_secret_security, to_secret_security): + """Resizes that require secret ownership changes are not allowed. + + This tests a scenario where the instance has a TPM before the resize + and has a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + + Until a later patch in the series adds code to convert to and from a + user-owned secret <=> Nova service user owned secret, we will want to + reject requests that would require conversion. Otherwise, these + attempts will fail with secret access permission errors. + """ + for host in ('test_compute0', 'test_compute1'): + self.start_compute(host) + + # create a server with vTPM with from_secret_security + server = self._create_server_with_vtpm( + secret_security=from_secret_security) + self.addCleanup(self._delete_server, server) + + # ensure our instance's system_metadata field is correct + user_id = 'nova' if from_secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) + + # create a flavor with to_secret_security + extra_specs = {'hw:tpm_version': '1.2', + 'hw:tpm_model': 'tpm-tis'} + if to_secret_security is not None: + extra_specs['hw:tpm_secret_security'] = to_secret_security + flavor_id = self._create_flavor(extra_spec=extra_specs) + + with mock.patch( + 'nova.virt.libvirt.driver.LibvirtDriver' + '.migrate_disk_and_power_off', return_value='{}', + ): + ex = self.assertRaises( + client.OpenStackApiException, self._resize_server, server, + flavor_id=flavor_id) + self.assertEqual(400, ex.response.status_code) + self.assertIn( + "Resize between 'deployment' TPM secret security and " + "other TPM secret security modes is not supported.", + str(ex)) + + @ddt.unpack + @ddt.data( + (None, None), + ('user', 'user'), + ('host', 'host'), + ('deployment', 'deployment'), + (None, 'user'), ('user', None), + (None, 'host'), ('host', None), + ('user', 'host'), ('host', 'user')) + def test_resize_vtpm_server_secret_security_deployment_supported( + self, from_secret_security, to_secret_security): + """Resizes that do not require secret ownership changes are allowed. + + This tests a scenario where the instance has a TPM before the resize + and has a TPM after the resize. + + A TPM secret security of 'None' means the instance is either: + + * A legacy vTPM instance + + * A vTPM instance where the user did not specify TPM secret security + + In both of these cases, the default TPM secret security policy is + 'user'. So 'None' is the equivalent of 'user'. + + A resize from 'deployment' to 'deployment' is allowed because in both + cases the key manager service secret will be owned by the Nova service + user and no ownership change will be needed. + """ + for host in ('test_compute0', 'test_compute1'): + self.start_compute(host) + + # create a server with vTPM with from_secret_security + server = self._create_server_with_vtpm( + secret_security=from_secret_security) + self.addCleanup(self._delete_server, server) + + # ensure our instance's system_metadata field is correct + user_id = 'nova' if from_secret_security == 'deployment' else 'fake' + self.assertInstanceHasSecret(server, user_id=user_id) + + # create a flavor with to_secret_security + extra_specs = {'hw:tpm_version': '1.2', + 'hw:tpm_model': 'tpm-tis'} + if to_secret_security is not None: + extra_specs['hw:tpm_secret_security'] = to_secret_security + flavor_id = self._create_flavor(extra_spec=extra_specs) + + with mock.patch( + 'nova.virt.libvirt.driver.LibvirtDriver' + '.migrate_disk_and_power_off', return_value='{}', + ): + # resize should succeed + self._resize_server(server, flavor_id=flavor_id) + + # And the secret should still be as expected. + self.assertInstanceHasSecret(server, user_id=user_id) + def test_create_server_secret_security_unsupported(self): """Test when a not supported TPM secret security mode is requested diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py index ceb04aa336..fab9a9719a 100644 --- a/nova/tests/unit/compute/test_compute_mgr.py +++ b/nova/tests/unit/compute/test_compute_mgr.py @@ -1794,7 +1794,8 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase, self, reclaim_instance_interval, mock_delete_vtpm, mock_delete_alloc): self.flags(reclaim_instance_interval=reclaim_instance_interval) - instance = objects.Instance(uuid=uuids.instance) + instance = objects.Instance(uuid=uuids.instance, + flavor=objects.Flavor()) with mock.patch.multiple( self.compute, diff --git a/nova/tests/unit/scheduler/test_request_filter.py b/nova/tests/unit/scheduler/test_request_filter.py index b04dfaaa3f..0163bd35d9 100644 --- a/nova/tests/unit/scheduler/test_request_filter.py +++ b/nova/tests/unit/scheduler/test_request_filter.py @@ -752,8 +752,12 @@ class TestRequestFilter(test.NoDBTestCase): @ddt.data( ('flavor', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER), ('flavor', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST), + ('flavor', + 'deployment', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT), ('image', 'user', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_USER), - ('image', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST)) + ('image', 'host', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST), + ('image', + 'deployment', ot.COMPUTE_SECURITY_TPM_SECRET_SECURITY_DEPLOYMENT)) @ddt.unpack def test_tpm_secret_security_filter(self, source, secret_security, trait): # First ensure that tpm_secret_security_filter is included diff --git a/nova/tests/unit/test_context.py b/nova/tests/unit/test_context.py index fb6b9fe517..a387cf201c 100644 --- a/nova/tests/unit/test_context.py +++ b/nova/tests/unit/test_context.py @@ -15,8 +15,12 @@ import threading from unittest import mock - +import ddt import futurist.waiters +from keystoneauth1.fixture import plugin as ks_plugin_fixture +from keystoneauth1 import loading as ks_loading +from oslo_config import cfg +from oslo_config import fixture as config_fixture from oslo_context import context as o_context from oslo_context import fixture as o_fixture from oslo_utils.fixture import uuidsentinel as uuids @@ -24,10 +28,28 @@ from oslo_utils.fixture import uuidsentinel as uuids from nova import context from nova import exception from nova import objects +from nova import service_auth from nova import test from nova import utils +class TestPluginWithAccess(ks_plugin_fixture.TestPlugin): + + def get_access(self, session): + return mock.Mock(role_names=['service']) + + +class LoadingFixtureWithAccess(ks_plugin_fixture.LoadingFixture): + + def create_plugin(self): + return TestPluginWithAccess( + token=self.token, + endpoint=self.endpoint, + user_id=self.user_id, + project_id=self.project_id) + + +@ddt.ddt class ContextTestCase(test.NoDBTestCase): # NOTE(danms): Avoid any cells setup by claiming we will # do things ourselves. @@ -178,6 +200,93 @@ class ContextTestCase(test.NoDBTestCase): context.get_admin_context() self.assertIs(o_context.get_current(), ctx1) + @mock.patch('keystoneauth1.loading.load_auth_from_conf_options') + @mock.patch('keystoneauth1.loading.load_session_from_conf_options') + def test_get_nova_service_user_context(self, mock_load_session, + mock_load_auth): + """Verify the basic get of a Nova service user context.""" + # Get a Nova service user context. + ctxt = context.get_nova_service_user_context() + + # Verify we called the loading functions as expected. + mock_load_auth.assert_called_once_with(context.CONF, 'service_user') + mock_load_session.assert_called_once_with(context.CONF, 'service_user', + auth=None) + mock_plugin = mock_load_auth.return_value + mock_session = mock_load_session.return_value + + # Verify we called the user_id and project_id getting methods as + # expected. + mock_plugin.get_user_id.assert_called_once_with(mock_session) + mock_plugin.get_project_id.assert_called_once_with(mock_session) + + # Verify the RequestContext attributes got set as expected. + self.assertEqual(mock_plugin.get_user_id.return_value, ctxt.user_id) + self.assertEqual(mock_plugin.get_project_id.return_value, + ctxt.project_id) + self.assertEqual(mock_plugin, ctxt.user_auth_plugin) + + # Get another context to verify we create the context with + # overwrite=False to avoid overwriting the thread local storage. + with mock.patch('nova.context.RequestContext') as mock_context: + ctxt = context.get_nova_service_user_context() + mock_context.assert_called_once_with( + user_id=mock_plugin.get_user_id.return_value, + project_id=mock_plugin.get_project_id.return_value, + roles=mock_plugin.get_access.return_value.role_names, + user_auth_plugin=mock_plugin, overwrite=False) + + def test_get_nova_service_user_context_user_project(self): + """Verify the user_id and project_id get set to what we expect.""" + # Use a new config fixture so that the options we register here will + # get unregistered after the test. + conf_fixture = self.useFixture( + config_fixture.Config(conf=cfg.ConfigOpts())) + + # Register the auth and session options in the [service_user] + # config section. + oslo_opts = (ks_loading.get_auth_common_conf_options() + + ks_loading.get_session_conf_options() + + ks_loading.get_auth_plugin_conf_options('password')) + conf_fixture.register_opts(oslo_opts, group='service_user') + + # Fill in typical values for the Nova service user. + conf_fixture.config( + group='service_user', auth_type='password', username='nova', + project_name='service', auth_url='http://anyhost/auth') + + # Use the plugin loading fixture from keystoneauth in order to skip all + # of the real authentication steps of calling Keystone and set expected + # user_id and project_id values. + self.useFixture(LoadingFixtureWithAccess(user_id=uuids.nova, + project_id=uuids.service)) + + # Verify we get the expected user_id and project_id in the + # RequestContext. + with mock.patch.object(service_auth, 'CONF', conf_fixture.conf): + ctxt = context.get_nova_service_user_context() + self.assertEqual(uuids.nova, ctxt.user_id) + self.assertEqual(uuids.service, ctxt.project_id) + self.assertEqual(['service'], ctxt.roles) + + @mock.patch('keystoneauth1.loading.load_auth_from_conf_options') + @mock.patch('keystoneauth1.loading.load_session_from_conf_options') + @ddt.data('auth', 'session') + def test_get_nova_service_user_context_load_fail( + self, to_fail, mock_load_session, mock_load_auth): + if to_fail == 'auth': + mock_load_auth.return_value = None + elif to_fail == 'session': + mock_load_session.return_value = None + + ex = self.assertRaises(exception.InvalidConfiguration, + context.get_nova_service_user_context) + msg = ( + 'Failed to load auth plugin or session from configuration. ' + 'Ensure the [service_user] section of the Nova configuration ' + 'file is correctly configured for the Nova service user.') + self.assertIn(msg, str(ex)) + def test_convert_from_rc_to_dict(self): ctx = context.RequestContext( 111, 222, request_id='req-679033b7-1755-4929-bf85-eb3bfaef7e0b', diff --git a/nova/tests/unit/test_crypto.py b/nova/tests/unit/test_crypto.py index 2cdde1697b..2bf7206be3 100644 --- a/nova/tests/unit/test_crypto.py +++ b/nova/tests/unit/test_crypto.py @@ -279,7 +279,8 @@ class VTPMTest(test.NoDBTestCase): We should create a new one. """ - instance = objects.Instance() + instance = objects.Instance(flavor=objects.Flavor(), + image_ref=uuids.image) instance.uuid = uuids.instance instance.system_metadata = {} mock_get_manager.return_value.store.return_value = uuids.secret diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index 7ba862e4d5..44ee4cf2ca 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -17241,6 +17241,7 @@ class LibvirtConnTestCase(test.NoDBTestCase, self.flags(swtpm_enabled=True, group='libvirt') self.useFixture(nova_fixtures.LibvirtImageBackendFixture()) + mock_ensure_vtpm.return_value = uuids.secret, mock.sentinel.password mock_get_info.return_value = hardware.InstanceInfo( state=power_state.RUNNING) @@ -21238,6 +21239,24 @@ class LibvirtConnTestCase(test.NoDBTestCase, mock_host.return_value.create_secret.return_value, secret) self.assertEqual('host', security) + @mock.patch('nova.context.get_nova_service_user_context') + @mock.patch('nova.crypto.ensure_vtpm_secret') + def test_get_or_create_secret_for_vtpm_security_deployment( + self, mock_ensure_secret, mock_get_ctxt): + # Test that vTPM secret security 'deployment' will use the Nova service + # user auth to create the secret in the key manager service. + mock_ensure_secret.return_value = uuids.secret, mock.sentinel.password + instance = objects.Instance(**self.test_instance) + instance.flavor.extra_specs = {'hw:tpm_secret_security': 'deployment'} + + drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + drvr._get_or_create_secret_for_vtpm(self.context, instance) + + # We should use the service user context. + mock_get_ctxt.assert_called_once_with() + mock_ensure_secret.assert_called_once_with( + mock_get_ctxt.return_value, instance) + @mock.patch('nova.virt.disk.api.clean_lxc_namespace') @mock.patch('nova.virt.libvirt.driver.LibvirtDriver.get_info') @mock.patch('nova.virt.disk.api.setup_container') diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 2ae028acbf..131c6e9e57 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -134,6 +134,7 @@ from nova.virt.libvirt.volume import remotefs from nova.virt.libvirt.volume import volume from nova.virt import netutils from nova.volume import cinder +from nova import vtpm try: # This is optional for unit testing but required at runtime. We check for @@ -1793,7 +1794,7 @@ class LibvirtDriver(driver.ComputeDriver): pass if cleanup_instance_disks: - if hardware.get_tpm_secret_security_constraint( + if vtpm.get_instance_tpm_secret_security( instance.flavor) == 'host': self._host.delete_secret('vtpm', instance.uuid) # Make sure that the instance directory files were successfully @@ -1964,7 +1965,7 @@ class LibvirtDriver(driver.ComputeDriver): # secret; the deletion of the instance directory and undefining of # the domain will take care of the TPM files themselves LOG.info('New flavor no longer requests vTPM; deleting secret.') - crypto.delete_vtpm_secret(context, instance) + vtpm.delete_secret(context, instance, flavor=instance.old_flavor) # TODO(stephenfin): Fold this back into its only caller, cleanup_resize def _cleanup_resize(self, context, instance, network_info): @@ -4823,7 +4824,7 @@ class LibvirtDriver(driver.ComputeDriver): # it to hand when generating the XML. This is slightly wasteful # as we'll perform a redundant key manager API call later when # we create the domain but the alternative is an ugly mess - crypto.ensure_vtpm_secret(context, instance) + self._get_or_create_secret_for_vtpm(context, instance) xml = self._get_guest_xml(context, instance, network_info, disk_info, image_meta, @@ -8195,8 +8196,7 @@ class LibvirtDriver(driver.ComputeDriver): For all others, it will call the key manager service API to get or create a secret and then use it to create a libvirt secret. """ - security = hardware.get_tpm_secret_security_constraint( - instance.flavor) or 'user' + security = vtpm.get_instance_tpm_secret_security(instance.flavor) libvirt_secret = None kwargs = {} @@ -8210,8 +8210,8 @@ class LibvirtDriver(driver.ComputeDriver): kwargs = {'ephemeral': False, 'private': False} if libvirt_secret is None: - secret_uuid, passphrase = crypto.ensure_vtpm_secret(context, - instance) + secret_uuid, passphrase = vtpm.get_or_create_secret( + context, instance) libvirt_secret = self._host.create_secret( 'vtpm', instance.uuid, password=passphrase, uuid=secret_uuid, **kwargs) @@ -12649,7 +12649,7 @@ class LibvirtDriver(driver.ComputeDriver): elif new_vtpm_config: # we've requested vTPM in the new flavor and didn't have one # previously so we need to create a new secret - crypto.ensure_vtpm_secret(context, instance) + self._get_or_create_secret_for_vtpm(context, instance) def finish_migration( self, @@ -12798,7 +12798,7 @@ class LibvirtDriver(driver.ComputeDriver): # the instance gained a vTPM and must now lose it; delete the vTPM # secret, knowing that libvirt will take care of everything else on # the destination side - crypto.delete_vtpm_secret(context, instance) + vtpm.delete_secret(context, instance, flavor=instance.new_flavor) def finish_revert_migration( self, diff --git a/nova/vtpm.py b/nova/vtpm.py new file mode 100644 index 0000000000..3a6b351285 --- /dev/null +++ b/nova/vtpm.py @@ -0,0 +1,75 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import typing as ty + +if ty.TYPE_CHECKING: + from nova import objects + +from nova import context as nova_context +from nova import crypto +from nova.virt import hardware + + +def get_instance_tpm_secret_security(flavor): + secret_security = hardware.get_tpm_secret_security_constraint(flavor) + return secret_security or 'user' + + +def get_or_create_secret( + context: nova_context.RequestContext, + instance: 'objects.Instance', +) -> tuple[str, bytes]: + """Get or create a secret in the key manager service. + + The secret UUID and passphrase will be returned. + """ + use_context = get_request_context(context, instance.flavor) + return crypto.ensure_vtpm_secret(use_context, instance) + + +def delete_secret( + context: nova_context.RequestContext, + instance: 'objects.Instance', + flavor: ty.Optional['objects.Flavor'] = None, +) -> None: + """Delete a secret from the key manager service for TPM. + + A flavor can be optionally specified to use instead of instance.flavor. + This will be the case for: + + * Reverting a no TPM => TPM resize because by the time we get here, + instance.flavor will have already been changed back to the old + flavor. + + * Confirming a TPM => no TPM resize because by the time we get here, + instance.flavor will be set to the new flavor. + """ + flavor = flavor or instance.flavor + use_context = get_request_context(context, flavor) + crypto.delete_vtpm_secret(use_context, instance) + + +def get_request_context( + context: nova_context.RequestContext, + flavor: 'objects.Flavor', +) -> nova_context.RequestContext: + """Obtain an appropriate RequestContext based on TPM secret security. + + The normal user context should be passed in and if TPM secret security + policy for the instance is 'deployment', this will return a Nova service + user context. Otherwise, the normal user context that was passed in will be + returned. + """ + if get_instance_tpm_secret_security(flavor) == 'deployment': + return nova_context.get_nova_service_user_context() + return context