Add handling for vTPM secret permission error

Currently there is no handling for this case so if we get a HTTP 403
from Barbican for a permission error, a very long multi-layered
traceback gets logged in nova-compute. This can happen if an admin
tries to start an instance with vTPM belonging to a different user, for
example.

This adds some basic error handling for this case. Most of the logged
traceback consists RPC oslo.messaging content, so the new
VTPMSecretForbidden exception is added as an expected exception to the
compute manager methods we expect users are more likely to encounter
such as build, start, and reboot.

Other compute manager methods where VTPMSecretForbidden can be raised
are resize, resume, restore, and delete but we will leave these as-is
and let them log tracebacks because we expect them to be rare.

Change-Id: I2ef6df818ed3f63efe2ff9b333c97928d4efa18d
Signed-off-by: melanie witt <melwittt@gmail.com>
This commit is contained in:
melanie witt
2025-10-10 00:06:10 +00:00
parent 7446ba158e
commit b4861a6d2b
4 changed files with 79 additions and 4 deletions
+4 -1
View File
@@ -2736,7 +2736,8 @@ class ComputeManager(manager.Manager):
exception.VirtualInterfaceMacAddressException,
exception.FixedIpInvalidOnHost,
exception.UnableToAutoAllocateNetwork,
exception.NetworksWithQoSPolicyNotSupported) as e:
exception.NetworksWithQoSPolicyNotSupported,
exception.VTPMSecretForbidden) as e:
LOG.exception('Failed to allocate network(s)',
instance=instance)
self._notify_about_instance_usage(context, instance,
@@ -3515,6 +3516,7 @@ class ComputeManager(manager.Manager):
# NOTE(johannes): This is probably better named power_on_instance
# so it matches the driver method, but because of other issues, we
# can't use that name in grizzly.
@messaging.expected_exceptions(exception.VTPMSecretForbidden)
@wrap_exception()
@reverts_task_state
@wrap_instance_event(prefix='compute')
@@ -4384,6 +4386,7 @@ class ComputeManager(manager.Manager):
return share_info
@messaging.expected_exceptions(exception.VTPMSecretForbidden)
@wrap_exception()
@reverts_task_state
@wrap_instance_event(prefix='compute')
+21 -3
View File
@@ -186,6 +186,19 @@ def ensure_vtpm_secret(
the instance's system metadata but could not be found in the key
manager service.
"""
def handle_key_manager_error_forbidden(e):
# Castellan catches the HTTPClientError from barbicanclient and
# re-raises it without its status_code attribute. It also does not
# include the status code in its exception message, so the best we can
# do is look for the word "Forbidden". Example error message:
# "Key manager error: Forbidden: Secret payload retrieval attempt not
# allowed - please review your user/project privileges"
if 'Forbidden' not in str(e):
raise
LOG.error(str(e), instance=instance)
raise exception.VTPMSecretForbidden(str(e)) from None
key_mgr = _get_key_manager()
secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
@@ -206,6 +219,8 @@ def ensure_vtpm_secret(
"is likely to be unrecoverable.",
secret_uuid, instance=instance)
raise
except castellan_exception.KeyManagerError as e:
handle_key_manager_error_forbidden(e)
# If we get here, the instance has no vtpm_secret_uuid. Create a new one
# and register it with the key manager.
@@ -213,9 +228,12 @@ def ensure_vtpm_secret(
# Castellan ManagedObject
cmo = passphrase.Passphrase(
secret, name="vTPM secret for instance %s" % instance.uuid)
secret_uuid = key_mgr.store(context, cmo)
LOG.debug("Created vTPM secret with UUID %s",
secret_uuid, instance=instance)
try:
secret_uuid = key_mgr.store(context, cmo)
LOG.debug("Created vTPM secret with UUID %s",
secret_uuid, instance=instance)
except castellan_exception.KeyManagerError as e:
handle_key_manager_error_forbidden(e)
instance.system_metadata['vtpm_secret_uuid'] = secret_uuid
instance.save()
+4
View File
@@ -2666,3 +2666,7 @@ class HostConflict(Exception):
class InstanceEventTimeout(Exception):
"""A custom timeout exception to replace eventlet.timeout.Timeout."""
pass
class VTPMSecretForbidden(Forbidden):
pass
+50
View File
@@ -314,6 +314,56 @@ class VTPMTest(test.NoDBTestCase):
self.ctxt, uuids.vtpm,
)
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_create_forbidden(self, mock_get_manager):
"""Check when we fail access to create a secret via castellan.
We should bubble up the error.
"""
instance = objects.Instance(uuid=uuids.instance)
instance.system_metadata = {}
mock_get_manager.return_value.store.side_effect = (
castellan_exception.KeyManagerError(
'Forbidden: Secret payload retrieval attempt not allowed'))
self.assertRaises(
exception.VTPMSecretForbidden,
crypto.ensure_vtpm_secret,
self.ctxt, instance)
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_get_forbidden(self, mock_get_manager):
"""Check when we fail access to retrieve a secret via castellan.
We should bubble up the error.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
mock_get_manager.return_value.get.side_effect = (
castellan_exception.KeyManagerError(
'Forbidden: Secret payload retrieval attempt not allowed'))
self.assertRaises(
exception.VTPMSecretForbidden,
crypto.ensure_vtpm_secret,
self.ctxt, instance)
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_other_keymanager_error(self, mock_get_manager):
"""Check when we fail for any other key manager error via castellan.
We should bubble up the error.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
mock_get_manager.return_value.get.side_effect = (
castellan_exception.KeyManagerError('Something else'))
self.assertRaises(
castellan_exception.KeyManagerError,
crypto.ensure_vtpm_secret,
self.ctxt, instance)
class EncryptionSecretTest(test.NoDBTestCase):