Merge "Refactor service user authentication"

This commit is contained in:
Zuul
2026-01-23 22:30:30 +00:00
committed by Gerrit Code Review
12 changed files with 160 additions and 157 deletions
+1 -1
View File
@@ -116,7 +116,7 @@ class _CyborgClient(object):
ARQ_URL = "/accelerator_requests"
def __init__(self, context):
auth = service_auth.get_auth_plugin(context)
auth = service_auth.get_service_user_token_auth_plugin(context)
self._client = utils.get_ksa_adapter('accelerator', ksa_auth=auth)
def _call_cyborg(self, func, *args, **kwargs):
+7 -16
View File
@@ -18,19 +18,16 @@
import sys
from keystoneauth1 import exceptions as ks_exceptions
from keystoneauth1 import loading as ks_loading
from oslo_log import log as logging
from oslo_serialization import jsonutils
from nova.api.metadata import vendordata
import nova.conf
from nova import service_auth
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
_SESSION = None
_ADMIN_AUTH = None
def _load_ks_session(conf):
"""Load session.
@@ -38,24 +35,18 @@ def _load_ks_session(conf):
This is either an authenticated session or a requests session, depending on
what's configured.
"""
global _ADMIN_AUTH
global _SESSION
auth = service_auth.get_service_auth_plugin(
nova.conf.vendordata.vendordata_group.name)
if not _ADMIN_AUTH:
_ADMIN_AUTH = ks_loading.load_auth_from_conf_options(
conf, nova.conf.vendordata.vendordata_group.name)
if not _ADMIN_AUTH:
if not auth:
LOG.warning('Passing insecure dynamic vendordata requests '
'because of missing or incorrect service account '
'configuration.')
if not _SESSION:
_SESSION = ks_loading.load_session_from_conf_options(
conf, nova.conf.vendordata.vendordata_group.name,
auth=_ADMIN_AUTH)
session = service_auth.get_service_auth_session(
nova.conf.vendordata.vendordata_group.name, auth=auth)
return _SESSION
return session
class DynamicVendorData(vendordata.VendorDataDriver):
+4 -11
View File
@@ -34,7 +34,6 @@ import glanceclient
from glanceclient.common import utils as glance_utils
import glanceclient.exc
from glanceclient.v2 import schemas
from keystoneauth1 import loading as ks_loading
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import excutils
@@ -52,20 +51,14 @@ from nova import utils
LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
_SESSION = None
def _session_and_auth(context):
# Session is cached, but auth needs to be pulled from context each time.
global _SESSION
session = service_auth.get_service_auth_session(
nova.conf.glance.glance_group.name)
auth = service_auth.get_service_user_token_auth_plugin(context)
if not _SESSION:
_SESSION = ks_loading.load_session_from_conf_options(
CONF, nova.conf.glance.glance_group.name)
auth = service_auth.get_auth_plugin(context)
return _SESSION, auth
return session, auth
def _glanceclient_from_endpoint(context, endpoint, version):
+10 -31
View File
@@ -24,7 +24,6 @@ import inspect
import time
import typing as ty
from keystoneauth1 import loading as ks_loading
from neutronclient.common import exceptions as neutron_client_exc
from neutronclient.v2_0 import client as clientv20
from oslo_concurrency import lockutils
@@ -55,26 +54,15 @@ CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
_SESSION = None
_ADMIN_AUTH = None
def reset_state():
global _ADMIN_AUTH
global _SESSION
_ADMIN_AUTH = None
_SESSION = None
def _load_auth_plugin(conf):
auth_plugin = ks_loading.load_auth_from_conf_options(conf,
nova.conf.neutron.NEUTRON_GROUP)
def _load_auth_plugin():
auth_plugin = service_auth.get_service_auth_plugin(
nova.conf.neutron.NEUTRON_GROUP)
if auth_plugin:
return auth_plugin
if conf.neutron.auth_type is None:
if CONF.neutron.auth_type is None:
# If we're coming in through a REST API call for something like
# creating a server, the end user is going to get a 500 response
# which is accurate since the system is mis-configured, but we should
@@ -84,7 +72,7 @@ def _load_auth_plugin(conf):
'service endpoint. See the networking service install guide '
'for details: '
'https://docs.openstack.org/neutron/latest/install/')
err_msg = _('Unknown auth type: %s') % conf.neutron.auth_type
err_msg = _('Unknown auth type: %s') % CONF.neutron.auth_type
raise neutron_client_exc.Unauthorized(message=err_msg)
@@ -223,33 +211,24 @@ def _get_auth_plugin(context, admin=False):
# neutron admin tenant credentials if it is an admin context. This is to
# support some services (metadata API) where an admin context is used
# without an auth token.
global _ADMIN_AUTH
user_auth = None
if admin or (context.is_admin and not context.auth_token):
if not _ADMIN_AUTH:
_ADMIN_AUTH = _load_auth_plugin(CONF)
user_auth = _ADMIN_AUTH
user_auth = _load_auth_plugin()
if context.auth_token or user_auth:
# When user_auth = None, user_auth will be extracted from the context.
return service_auth.get_auth_plugin(context, user_auth=user_auth)
return service_auth.get_service_user_token_auth_plugin(
context, user_auth=user_auth)
# We did not get a user token and we should not be using
# an admin token so log an error
raise exception.Unauthorized()
def _get_session():
global _SESSION
if not _SESSION:
_SESSION = ks_loading.load_session_from_conf_options(
CONF, nova.conf.neutron.NEUTRON_GROUP)
return _SESSION
def get_client(context, admin=False):
auth_plugin = _get_auth_plugin(context, admin=admin)
session = _get_session()
session = service_auth.get_service_auth_session(
nova.conf.neutron.NEUTRON_GROUP)
client_args = dict(session=session,
auth=auth_plugin,
global_request_id=context.global_id,
+67 -19
View File
@@ -11,6 +11,11 @@
# under the License.
import typing as ty
if ty.TYPE_CHECKING:
import keystoneauth1.plugin
from keystoneauth1 import loading as ks_loading
from keystoneauth1 import service_token
from oslo_log import log as logging
@@ -21,35 +26,78 @@ import nova.conf
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
_SERVICE_AUTH = None
# Auth plugins and auth sessions keyed by configuration group name
_AUTHS = {}
_SESSIONS = {}
def reset_globals():
"""For async unit test consistency."""
global _SERVICE_AUTH
_SERVICE_AUTH = None
global _AUTHS, _SESSIONS
_AUTHS = {}
_SESSIONS = {}
def get_auth_plugin(context, user_auth=None):
def get_service_auth_plugin(
conf_group: str,
) -> 'keystoneauth1.plugin.BaseAuthPlugin':
"""Get an auth plugin for authentication as the service user."""
auth = _AUTHS.get(conf_group)
if not auth:
auth = ks_loading.load_auth_from_conf_options(CONF, conf_group)
_AUTHS[conf_group] = auth
return auth
def get_service_auth_session(
conf_group: str,
auth: ty.Optional['keystoneauth1.plugin.BaseAuthPlugin'] = None,
) -> 'keystoneauth1.session.Session':
"""Get a session for authentication as the service user.
An auth plugin can be optionally passed in to use to authenticate the
session.
"""
session = _SESSIONS.get(conf_group)
if not session:
session = ks_loading.load_session_from_conf_options(
CONF, conf_group, auth=auth)
_SESSIONS[conf_group] = session
return session
def get_service_user_token_auth_plugin(context, user_auth=None):
"""Dynamically get an auth plugin based on service user token config.
This function will use [service_user]send_service_user_token configuration
to determine whether to return either:
* The user's auth from the RequestContext
or
* A wrapper around both the user's auth and the service user's auth
The user's auth may be optionally passed in to use instead grabbing it
from the RequestContext. This comes up in cases where we have an anonymous
RequestContext such as using get_admin_context() in nova-manage commands to
call other service APIs.
This function should only be used for passing service user tokens to APIs.
"""
# user_auth may be passed in when the RequestContext is anonymous, such as
# when get_admin_context() is used for API calls by nova-manage.
user_auth = user_auth or context.get_auth_plugin()
if CONF.service_user.send_service_user_token:
global _SERVICE_AUTH
if not _SERVICE_AUTH:
_SERVICE_AUTH = ks_loading.load_auth_from_conf_options(
CONF,
group=
nova.conf.service_token.SERVICE_USER_GROUP)
if _SERVICE_AUTH is None:
# This indicates a misconfiguration so log a warning and
# return the user_auth.
LOG.warning('Unable to load auth from [service_user] '
'configuration. Ensure "auth_type" is set.')
return user_auth
return service_token.ServiceTokenAuthWrapper(
user_auth=user_auth,
service_auth=_SERVICE_AUTH)
service_auth = get_service_auth_plugin(
nova.conf.service_token.SERVICE_USER_GROUP)
if service_auth is None:
# This indicates a misconfiguration so log a warning and
# return the user_auth.
LOG.warning('Unable to load auth from [service_user] '
'configuration. Ensure "auth_type" is set.')
return user_auth
return service_token.ServiceTokenAuthWrapper(
user_auth=user_auth, service_auth=service_auth)
return user_auth
+4
View File
@@ -62,6 +62,7 @@ from nova.pci import request
from nova import quota
from nova.scheduler.client import report
from nova.scheduler import utils as scheduler_utils
import nova.service_auth
from nova.tests import fixtures as nova_fixtures
from nova.tests.unit import matchers
from nova import utils
@@ -336,6 +337,9 @@ class TestCase(base.BaseTestCase):
# Reset the global identity client
nova.limit.utils.IDENTITY_CLIENT = None
# Reset the global service auths and sessions
nova.service_auth.reset_globals()
def _setup_cells(self):
"""Setup a normal cellsv2 environment.
+5 -4
View File
@@ -361,7 +361,7 @@ class TestGetImageService(test.NoDBTestCase):
class TestCreateGlanceClient(test.NoDBTestCase):
@mock.patch.object(service_auth, 'get_auth_plugin')
@mock.patch.object(service_auth, 'get_service_user_token_auth_plugin')
@mock.patch.object(ks_loading, 'load_session_from_conf_options')
@mock.patch('glanceclient.Client')
def test_glanceclient_with_ks_session(self, mock_client, mock_load,
@@ -375,14 +375,15 @@ class TestCreateGlanceClient(test.NoDBTestCase):
mock_client.side_effect = ["a", "b"]
# Reset the cache, so we know its empty before we start
glance._SESSION = None
service_auth.reset_globals()
result1 = glance._glanceclient_from_endpoint(ctx, endpoint, 2)
result2 = glance._glanceclient_from_endpoint(ctx, endpoint, 2)
# Ensure that session is only loaded once.
mock_load.assert_called_once_with(glance.CONF, "glance")
self.assertEqual(session, glance._SESSION)
mock_load.assert_called_once_with(glance.CONF, "glance", auth=None)
self.assertEqual(session,
service_auth.get_service_auth_session('glance'))
# Ensure new client created every time
client_call = mock.call(2, auth="fake_auth",
endpoint_override=endpoint, session=session,
+15 -16
View File
@@ -73,7 +73,7 @@ class TestNeutronClient(test.NoDBTestCase):
def setUp(self):
super(TestNeutronClient, self).setUp()
neutronapi.reset_state()
service_auth.reset_globals()
self.addCleanup(service_auth.reset_globals)
def test_ksa_adapter_loading_defaults(self):
@@ -142,12 +142,9 @@ class TestNeutronClient(test.NoDBTestCase):
self.assertIsInstance(cl.httpclient.auth,
service_token.ServiceTokenAuthWrapper)
@mock.patch('nova.service_auth._SERVICE_AUTH')
@mock.patch('nova.network.neutron._ADMIN_AUTH')
@mock.patch('nova.service_auth.get_service_auth_plugin')
@mock.patch.object(ks_loading, 'load_auth_from_conf_options')
def test_admin_with_service_token(
self, mock_load, mock_admin_auth, mock_service_auth
):
def test_admin_with_service_token(self, mock_load, mock_service_auth):
self.flags(send_service_user_token=True, group='service_user')
admin_context = context.get_admin_context()
@@ -155,8 +152,10 @@ class TestNeutronClient(test.NoDBTestCase):
cl = neutronapi.get_client(admin_context)
self.assertIsInstance(cl.httpclient.auth,
service_token.ServiceTokenAuthWrapper)
self.assertEqual(mock_admin_auth, cl.httpclient.auth.user_auth)
self.assertEqual(mock_service_auth, cl.httpclient.auth.service_auth)
self.assertEqual(mock_service_auth.return_value,
cl.httpclient.auth.user_auth)
self.assertEqual(mock_service_auth.return_value,
cl.httpclient.auth.service_auth)
@mock.patch.object(client.Client, "list_networks",
side_effect=exceptions.Unauthorized())
@@ -215,7 +214,7 @@ class TestNeutronClient(test.NoDBTestCase):
neutronapi.get_client,
my_context)
@mock.patch('nova.network.neutron._ADMIN_AUTH')
@mock.patch('nova.service_auth.get_service_auth_plugin')
@mock.patch.object(client.Client, "list_networks", new=mock.Mock())
def test_reuse_admin_token(self, m):
self.flags(endpoint_override='http://anyhost/', group='neutron')
@@ -227,7 +226,7 @@ class TestNeutronClient(test.NoDBTestCase):
def token_vals(*args, **kwargs):
return tokens.pop()
m.get_token.side_effect = token_vals
m.return_value.get_token.side_effect = token_vals
client1 = neutronapi.get_client(my_context, True)
client1.list_networks(retrieve_all=False)
@@ -243,7 +242,7 @@ class TestNeutronClient(test.NoDBTestCase):
mock_load_from_conf.return_value = None
from neutronclient.common import exceptions as neutron_client_exc
self.assertRaises(neutron_client_exc.Unauthorized,
neutronapi._load_auth_plugin, CONF)
neutronapi._load_auth_plugin)
mock_log_err.assert_called()
self.assertIn('The [neutron] section of your nova configuration file',
mock_log_err.call_args[0][0])
@@ -9247,7 +9246,7 @@ class TestNeutronClientForAdminScenarios(test.NoDBTestCase):
auth_token='token')
# clean global
neutronapi.reset_state()
service_auth.reset_globals()
if admin_context:
# Note that the context does not contain a token but is
@@ -9260,7 +9259,7 @@ class TestNeutronClientForAdminScenarios(test.NoDBTestCase):
# the context has an auth_token.
context_client = neutronapi.get_client(my_context, True)
admin_auth = neutronapi._ADMIN_AUTH
admin_auth = service_auth.get_service_auth_plugin('neutron')
self.assertEqual(CONF.neutron.auth_url, admin_auth.auth_url)
self.assertEqual(CONF.neutron.password, admin_auth.password)
@@ -9278,12 +9277,12 @@ class TestNeutronClientForAdminScenarios(test.NoDBTestCase):
self.assertIsNone(admin_auth.tenant_id)
self.assertIsNone(admin_auth.user_id)
self.assertEqual(CONF.neutron.timeout,
neutronapi._SESSION.timeout)
auth_session = service_auth.get_service_auth_session('neutron')
self.assertEqual(CONF.neutron.timeout, auth_session.timeout)
self.assertEqual(
token_value,
context_client.httpclient.auth.get_token(neutronapi._SESSION))
context_client.httpclient.auth.get_token(auth_session))
self.assertEqual(
CONF.neutron.endpoint_override,
context_client.httpclient.get_endpoint())
+7 -4
View File
@@ -21,6 +21,7 @@ from requests_mock.contrib import fixture
import nova.conf
from nova import context
from nova import exception
from nova import service_auth
from nova import test
from nova.volume import cinder
@@ -73,7 +74,7 @@ class BaseCinderTestCase(object):
def setUp(self):
super(BaseCinderTestCase, self).setUp()
cinder.reset_globals()
service_auth.reset_globals()
self.requests = self.useFixture(fixture.Fixture())
self.api = cinder.API()
@@ -84,7 +85,7 @@ class BaseCinderTestCase(object):
def flags(self, *args, **kwargs):
super(BaseCinderTestCase, self).flags(*args, **kwargs)
cinder.reset_globals()
service_auth.reset_globals()
def create_client(self):
return cinder.cinderclient(self.context)
@@ -128,7 +129,8 @@ class CinderV1TestCase(test.NoDBTestCase):
return_value='http://localhost:8776/v1/%(project_id)s')
fake_session = mock.Mock(get_endpoint=get_endpoint)
ctxt = context.get_context()
with mock.patch.object(cinder, '_SESSION', fake_session):
with mock.patch.object(service_auth, 'get_service_auth_session',
return_value=fake_session):
self.assertRaises(exception.UnsupportedCinderAPIVersion,
cinder.cinderclient, ctxt)
get_api_version.assert_called_once_with(get_endpoint.return_value)
@@ -148,7 +150,8 @@ class CinderV2TestCase(test.NoDBTestCase):
return_value='http://localhost:8776/v2/%(project_id)s')
fake_session = mock.Mock(get_endpoint=get_endpoint)
ctxt = context.get_context()
with mock.patch.object(cinder, '_SESSION', fake_session):
with mock.patch.object(service_auth, 'get_service_auth_session',
return_value=fake_session):
self.assertRaises(exception.UnsupportedCinderAPIVersion,
cinder.cinderclient, ctxt)
get_api_version.assert_called_once_with(get_endpoint.return_value)
+10 -8
View File
@@ -28,41 +28,43 @@ class ServiceAuthTestCase(test.NoDBTestCase):
self.addCleanup(service_auth.reset_globals)
@mock.patch.object(ks_loading, 'load_auth_from_conf_options')
def test_get_auth_plugin_no_wraps(self, mock_load):
def test_get_service_user_token_auth_plugin_no_wraps(self, mock_load):
context = mock.MagicMock()
context.get_auth_plugin.return_value = "fake"
result = service_auth.get_auth_plugin(context)
result = service_auth.get_service_user_token_auth_plugin(context)
self.assertEqual("fake", result)
mock_load.assert_not_called()
@mock.patch.object(ks_loading, 'load_auth_from_conf_options')
def test_get_auth_plugin_wraps(self, mock_load):
def test_get_service_user_token_auth_plugin_wraps(self, mock_load):
self.flags(send_service_user_token=True, group='service_user')
result = service_auth.get_auth_plugin(self.ctx)
result = service_auth.get_service_user_token_auth_plugin(self.ctx)
self.assertIsInstance(result, service_token.ServiceTokenAuthWrapper)
@mock.patch.object(ks_loading, 'load_auth_from_conf_options',
return_value=None)
def test_get_auth_plugin_wraps_bad_config(self, mock_load):
def test_get_service_user_token_auth_plugin_wraps_bad_config(
self, mock_load):
"""Tests the case that send_service_user_token is True but there
is some misconfiguration with the [service_user] section which makes
KSA return None for the service user auth.
"""
self.flags(send_service_user_token=True, group='service_user')
result = service_auth.get_auth_plugin(self.ctx)
result = service_auth.get_service_user_token_auth_plugin(self.ctx)
self.assertEqual(1, mock_load.call_count)
self.assertNotIsInstance(result, service_token.ServiceTokenAuthWrapper)
@mock.patch.object(ks_loading, 'load_auth_from_conf_options',
new=mock.Mock())
def test_get_auth_plugin_user_auth(self):
def test_get_service_user_token_auth_plugin_user_auth(self):
self.flags(send_service_user_token=True, group='service_user')
user_auth = mock.Mock()
result = service_auth.get_auth_plugin(self.ctx, user_auth=user_auth)
result = service_auth.get_service_user_token_auth_plugin(
self.ctx, user_auth=user_auth)
self.assertEqual(user_auth, result.user_auth)
+12 -9
View File
@@ -27,6 +27,7 @@ from oslo_utils import timeutils
import nova.conf
from nova import context
from nova import exception
from nova import service_auth
from nova import test
from nova.tests.unit.fake_instance import fake_instance_obj
from nova.volume import cinder
@@ -1219,7 +1220,7 @@ class CinderClientTestCase(test.NoDBTestCase):
def setUp(self):
super(CinderClientTestCase, self).setUp()
cinder.reset_globals()
service_auth.reset_globals()
self.ctxt = context.RequestContext('fake-user', 'fake-project')
# Mock out the keystoneauth stuff.
self.mock_session = mock.Mock(autospec=session.Session)
@@ -1301,27 +1302,29 @@ class CinderClientTestCase(test.NoDBTestCase):
def test_load_auth_plugin_failed(self, mock_load_from_conf, mock_log_err):
mock_load_from_conf.return_value = None
self.assertRaises(cinder_exception.Unauthorized,
cinder._load_auth_plugin, CONF)
cinder._load_auth_plugin)
mock_log_err.assert_called()
self.assertIn('The [cinder] section of your nova configuration file',
mock_log_err.call_args[0][0])
@mock.patch('nova.volume.cinder._ADMIN_AUTH')
@mock.patch('nova.service_auth.get_service_auth_plugin')
def test_admin_context_without_token(self,
mock_admin_auth):
mock_admin_auth.return_value = '_FAKE_ADMIN_AUTH'
admin_ctx = context.get_admin_context()
params = cinder._get_cinderclient_parameters(admin_ctx)
self.assertEqual(params[0], mock_admin_auth)
self.assertEqual(params[0], mock_admin_auth.return_value)
@mock.patch('nova.service_auth._SERVICE_AUTH')
@mock.patch('nova.volume.cinder._ADMIN_AUTH')
@mock.patch('nova.service_auth.get_service_user_token_auth_plugin')
@mock.patch('nova.service_auth.get_service_auth_plugin')
def test_admin_context_without_user_token_but_with_service_token(
self, mock_admin_auth, mock_service_auth
):
self.flags(send_service_user_token=True, group='service_user')
admin_ctx = context.get_admin_context()
params = cinder._get_cinderclient_parameters(admin_ctx)
self.assertEqual(mock_admin_auth, params[0].user_auth)
self.assertEqual(mock_service_auth, params[0].service_auth)
self.assertEqual(
mock_service_auth.return_value.user_auth, params[0].user_auth)
self.assertEqual(
mock_service_auth.return_value.service_auth,
params[0].service_auth)
+18 -38
View File
@@ -28,7 +28,6 @@ from cinderclient import api_versions as cinder_api_versions
from cinderclient import client as cinder_client
from cinderclient import exceptions as cinder_exception
from keystoneauth1 import exceptions as keystone_exception
from keystoneauth1 import loading as ks_loading
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import excutils
@@ -46,45 +45,23 @@ CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
_ADMIN_AUTH = None
_SESSION = None
def reset_globals():
"""Testing method to reset globals.
"""
global _ADMIN_AUTH
global _SESSION
_ADMIN_AUTH = None
_SESSION = None
def _load_auth_plugin(conf):
auth_plugin = ks_loading.load_auth_from_conf_options(conf,
nova.conf.cinder.cinder_group.name)
def _load_auth_plugin():
auth_plugin = service_auth.get_service_auth_plugin(
nova.conf.cinder.cinder_group.name)
if auth_plugin:
return auth_plugin
if conf.cinder.auth_type is None:
if CONF.cinder.auth_type is None:
LOG.error('The [cinder] section of your nova configuration file '
'must be configured for authentication with the '
'block-storage service endpoint.')
err_msg = _('Unknown auth type: %s') % conf.cinder.auth_type
err_msg = _('Unknown auth type: %s') % CONF.cinder.auth_type
raise cinder_exception.Unauthorized(401, message=err_msg)
def _load_session():
global _SESSION
if not _SESSION:
_SESSION = ks_loading.load_session_from_conf_options(
CONF, nova.conf.cinder.cinder_group.name)
def _get_auth(context):
global _ADMIN_AUTH
# NOTE(lixipeng): Auth token is none when call
# cinder API from compute periodic tasks, context
# from them generated from 'context.get_admin_context'
@@ -92,17 +69,16 @@ def _get_auth(context):
# So add load_auth_plugin when this condition appear.
user_auth = None
if context.is_admin and not context.auth_token:
if not _ADMIN_AUTH:
_ADMIN_AUTH = _load_auth_plugin(CONF)
user_auth = _ADMIN_AUTH
user_auth = _load_auth_plugin()
# When user_auth = None, user_auth will be extracted from the context.
return service_auth.get_auth_plugin(context, user_auth=user_auth)
return service_auth.get_service_user_token_auth_plugin(
context, user_auth=user_auth)
# NOTE(efried): Bug #1752152
# This method is copied/adapted from cinderclient.client.get_server_version so
# we can use _SESSION.get rather than a raw requests.get to retrieve the
# we can use Session.get rather than a raw requests.get to retrieve the
# version document. This enables HTTPS by gleaning cert info from the session
# config.
def _get_server_version(context, url):
@@ -116,7 +92,8 @@ def _get_server_version(context, url):
min_version = "2.0"
current_version = "2.0"
_load_session()
session = service_auth.get_service_auth_session(
nova.conf.cinder.cinder_group.name)
auth = _get_auth(context)
try:
@@ -142,7 +119,7 @@ def _get_server_version(context, url):
# leave as is without cropping.
version_url = url
response = _SESSION.get(version_url, auth=auth)
response = session.get(version_url, auth=auth)
data = jsonutils.loads(response.text)
versions = data['versions']
for version in versions:
@@ -190,7 +167,8 @@ def _check_microversion(context, url, microversion):
def _get_cinderclient_parameters(context):
_load_session()
session = service_auth.get_service_auth_session(
nova.conf.cinder.cinder_group.name)
auth = _get_auth(context)
@@ -208,7 +186,7 @@ def _get_cinderclient_parameters(context):
if CONF.cinder.endpoint_template:
url = CONF.cinder.endpoint_template % context.to_dict()
else:
url = _SESSION.get_endpoint(auth, **service_parameters)
url = session.get_endpoint(auth, **service_parameters)
return auth, service_parameters, url
@@ -268,8 +246,10 @@ def cinderclient(context, microversion=None, skip_version_check=False,
if check_only:
return
session = service_auth.get_service_auth_session(
nova.conf.cinder.cinder_group.name)
return cinder_client.Client(version,
session=_SESSION,
session=session,
auth=auth,
endpoint_override=endpoint_override,
connect_retries=CONF.cinder.http_retries,