Merge "Add handling for vTPM secret permission error"

This commit is contained in:
Zuul
2025-11-05 20:23:18 +00:00
committed by Gerrit Code Review
4 changed files with 79 additions and 4 deletions
+4 -1
View File
@@ -2736,7 +2736,8 @@ class ComputeManager(manager.Manager):
exception.VirtualInterfaceMacAddressException,
exception.FixedIpInvalidOnHost,
exception.UnableToAutoAllocateNetwork,
exception.NetworksWithQoSPolicyNotSupported) as e:
exception.NetworksWithQoSPolicyNotSupported,
exception.VTPMSecretForbidden) as e:
LOG.exception('Failed to allocate network(s)',
instance=instance)
self._notify_about_instance_usage(context, instance,
@@ -3515,6 +3516,7 @@ class ComputeManager(manager.Manager):
# NOTE(johannes): This is probably better named power_on_instance
# so it matches the driver method, but because of other issues, we
# can't use that name in grizzly.
@messaging.expected_exceptions(exception.VTPMSecretForbidden)
@wrap_exception()
@reverts_task_state
@wrap_instance_event(prefix='compute')
@@ -4384,6 +4386,7 @@ class ComputeManager(manager.Manager):
return share_info
@messaging.expected_exceptions(exception.VTPMSecretForbidden)
@wrap_exception()
@reverts_task_state
@wrap_instance_event(prefix='compute')
+21 -3
View File
@@ -186,6 +186,19 @@ def ensure_vtpm_secret(
the instance's system metadata but could not be found in the key
manager service.
"""
def handle_key_manager_error_forbidden(e):
# Castellan catches the HTTPClientError from barbicanclient and
# re-raises it without its status_code attribute. It also does not
# include the status code in its exception message, so the best we can
# do is look for the word "Forbidden". Example error message:
# "Key manager error: Forbidden: Secret payload retrieval attempt not
# allowed - please review your user/project privileges"
if 'Forbidden' not in str(e):
raise
LOG.error(str(e), instance=instance)
raise exception.VTPMSecretForbidden(str(e)) from None
key_mgr = _get_key_manager()
secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
@@ -206,6 +219,8 @@ def ensure_vtpm_secret(
"is likely to be unrecoverable.",
secret_uuid, instance=instance)
raise
except castellan_exception.KeyManagerError as e:
handle_key_manager_error_forbidden(e)
# If we get here, the instance has no vtpm_secret_uuid. Create a new one
# and register it with the key manager.
@@ -213,9 +228,12 @@ def ensure_vtpm_secret(
# Castellan ManagedObject
cmo = passphrase.Passphrase(
secret, name="vTPM secret for instance %s" % instance.uuid)
secret_uuid = key_mgr.store(context, cmo)
LOG.debug("Created vTPM secret with UUID %s",
secret_uuid, instance=instance)
try:
secret_uuid = key_mgr.store(context, cmo)
LOG.debug("Created vTPM secret with UUID %s",
secret_uuid, instance=instance)
except castellan_exception.KeyManagerError as e:
handle_key_manager_error_forbidden(e)
instance.system_metadata['vtpm_secret_uuid'] = secret_uuid
instance.save()
+4
View File
@@ -2666,3 +2666,7 @@ class HostConflict(Exception):
class InstanceEventTimeout(Exception):
"""A custom timeout exception to replace eventlet.timeout.Timeout."""
pass
class VTPMSecretForbidden(Forbidden):
pass
+50
View File
@@ -314,6 +314,56 @@ class VTPMTest(test.NoDBTestCase):
self.ctxt, uuids.vtpm,
)
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_create_forbidden(self, mock_get_manager):
"""Check when we fail access to create a secret via castellan.
We should bubble up the error.
"""
instance = objects.Instance(uuid=uuids.instance)
instance.system_metadata = {}
mock_get_manager.return_value.store.side_effect = (
castellan_exception.KeyManagerError(
'Forbidden: Secret payload retrieval attempt not allowed'))
self.assertRaises(
exception.VTPMSecretForbidden,
crypto.ensure_vtpm_secret,
self.ctxt, instance)
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_get_forbidden(self, mock_get_manager):
"""Check when we fail access to retrieve a secret via castellan.
We should bubble up the error.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
mock_get_manager.return_value.get.side_effect = (
castellan_exception.KeyManagerError(
'Forbidden: Secret payload retrieval attempt not allowed'))
self.assertRaises(
exception.VTPMSecretForbidden,
crypto.ensure_vtpm_secret,
self.ctxt, instance)
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_other_keymanager_error(self, mock_get_manager):
"""Check when we fail for any other key manager error via castellan.
We should bubble up the error.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
mock_get_manager.return_value.get.side_effect = (
castellan_exception.KeyManagerError('Something else'))
self.assertRaises(
castellan_exception.KeyManagerError,
crypto.ensure_vtpm_secret,
self.ctxt, instance)
class EncryptionSecretTest(test.NoDBTestCase):