Merge "Add support for Keystone v3"

This commit is contained in:
Jenkins
2014-08-07 23:49:53 +00:00
committed by Gerrit Code Review
3 changed files with 711 additions and 169 deletions
+188
View File
@@ -0,0 +1,188 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import uuid
# these are copied from python-keystoneclient tests
BASE_HOST = 'http://keystone.example.com'
BASE_URL = "%s:5000/" % BASE_HOST
UPDATED = '2013-03-06T00:00:00Z'
V2_URL = "%sv2.0" % BASE_URL
V2_DESCRIBED_BY_HTML = {'href': 'http://docs.openstack.org/api/'
'openstack-identity-service/2.0/content/',
'rel': 'describedby',
'type': 'text/html'}
V2_DESCRIBED_BY_PDF = {'href': 'http://docs.openstack.org/api/openstack-ident'
'ity-service/2.0/identity-dev-guide-2.0.pdf',
'rel': 'describedby',
'type': 'application/pdf'}
V2_VERSION = {'id': 'v2.0',
'links': [{'href': V2_URL, 'rel': 'self'},
V2_DESCRIBED_BY_HTML, V2_DESCRIBED_BY_PDF],
'status': 'stable',
'updated': UPDATED}
V3_URL = "%sv3" % BASE_URL
V3_MEDIA_TYPES = [{'base': 'application/json',
'type': 'application/vnd.openstack.identity-v3+json'},
{'base': 'application/xml',
'type': 'application/vnd.openstack.identity-v3+xml'}]
V3_VERSION = {'id': 'v3.0',
'links': [{'href': V3_URL, 'rel': 'self'}],
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED}
def _create_version_list(versions):
return json.dumps({'versions': {'values': versions}})
def _create_single_version(version):
return json.dumps({'version': version})
V3_VERSION_LIST = _create_version_list([V3_VERSION, V2_VERSION])
V2_VERSION_LIST = _create_version_list([V2_VERSION])
V3_VERSION_ENTRY = _create_single_version(V3_VERSION)
V2_VERSION_ENTRY = _create_single_version(V2_VERSION)
GLANCE_ENDPOINT = 'http://glance.example.com/v1'
def _get_normalized_token_data(**kwargs):
ref = copy.deepcopy(kwargs)
# normalized token data
ref['user_id'] = ref.get('user_id', uuid.uuid4().hex)
ref['username'] = ref.get('username', uuid.uuid4().hex)
ref['project_id'] = ref.get('project_id',
ref.get('tenant_id', uuid.uuid4().hex))
ref['project_name'] = ref.get('tenant_name',
ref.get('tenant_name', uuid.uuid4().hex))
ref['user_domain_id'] = ref.get('user_domain_id', uuid.uuid4().hex)
ref['user_domain_name'] = ref.get('user_domain_name', uuid.uuid4().hex)
ref['project_domain_id'] = ref.get('project_domain_id', uuid.uuid4().hex)
ref['project_domain_name'] = ref.get('project_domain_name',
uuid.uuid4().hex)
ref['roles'] = ref.get('roles', [{'name': uuid.uuid4().hex,
'id': uuid.uuid4().hex}])
ref['roles_link'] = ref.get('roles_link', [])
ref['glance_url'] = ref.get('glance_url', GLANCE_ENDPOINT)
return ref
def generate_v2_project_scoped_token(**kwargs):
"""Generate a Keystone V2 token based on auth request."""
ref = _get_normalized_token_data(**kwargs)
o = {'access': {'token': {'id': uuid.uuid4().hex,
'expires': '2099-05-22T00:02:43.941430Z',
'issued_at': '2013-05-21T00:02:43.941473Z',
'tenant': {'enabled': True,
'id': ref.get('project_id'),
'name': ref.get('project_id')
}
},
'user': {'id': ref.get('user_id'),
'name': uuid.uuid4().hex,
'username': ref.get('username'),
'roles': ref.get('roles'),
'roles_links': ref.get('roles_links')
}
}}
# we only care about Glance and Keystone endpoints
o['access']['serviceCatalog'] = [
{'endpoints': [
{'publicURL': ref.get('glance_url'),
'id': uuid.uuid4().hex,
'region': 'RegionOne'
}],
'endpoints_links': [],
'name': 'Glance',
'type': 'keystore'},
{'endpoints': [
{'publicURL': ref.get('auth_url'),
'adminURL': ref.get('auth_url'),
'id': uuid.uuid4().hex,
'region': 'RegionOne'
}],
'endpoint_links': [],
'name': 'keystone',
'type': 'identity'}]
return o
def generate_v3_project_scoped_token(**kwargs):
"""Generate a Keystone V3 token based on auth request."""
ref = _get_normalized_token_data(**kwargs)
o = {'token': {'expires_at': '2099-05-22T00:02:43.941430Z',
'issued_at': '2013-05-21T00:02:43.941473Z',
'methods': ['password'],
'project': {'id': ref.get('project_id'),
'name': ref.get('project_name'),
'domain': {'id': ref.get('project_domain_id'),
'name': ref.get(
'project_domain_name')
}
},
'user': {'id': ref.get('user_id'),
'name': ref.get('username'),
'domain': {'id': ref.get('user_domain_id'),
'name': ref.get('user_domain_name')
}
},
'roles': ref.get('roles')
}}
# we only care about Glance and Keystone endpoints
o['token']['catalog'] = [
{'endpoints': [
{
'id': uuid.uuid4().hex,
'interface': 'public',
'region': 'RegionTwo',
'url': ref.get('glance_url')
}],
'id': uuid.uuid4().hex,
'type': 'keystore'},
{'endpoints': [
{
'id': uuid.uuid4().hex,
'interface': 'public',
'region': 'RegionTwo',
'url': ref.get('auth_url')
},
{
'id': uuid.uuid4().hex,
'interface': 'admin',
'region': 'RegionTwo',
'url': ref.get('auth_url')
}],
'id': uuid.uuid4().hex,
'type': 'identity'}]
# token ID is conveyed via the X-Subject-Token header so we are generating
# one to stash there
token_id = uuid.uuid4().hex
return token_id, o
+207 -10
View File
@@ -25,31 +25,50 @@ from glanceclient import shell as openstack_shell
#NOTE (esheffield) Used for the schema caching tests
from glanceclient.v2 import schemas as schemas
import json
from tests import keystone_client_fixtures
from tests import utils
import keystoneclient
from keystoneclient.openstack.common.apiclient import exceptions as ks_exc
DEFAULT_IMAGE_URL = 'http://127.0.0.1:5000/'
DEFAULT_USERNAME = 'username'
DEFAULT_PASSWORD = 'password'
DEFAULT_TENANT_ID = 'tenant_id'
DEFAULT_TENANT_NAME = 'tenant_name'
DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v2.0/'
DEFAULT_PROJECT_ID = '0123456789'
DEFAULT_USER_DOMAIN_NAME = 'user_domain_name'
DEFAULT_UNVERSIONED_AUTH_URL = 'http://127.0.0.1:5000/'
DEFAULT_V2_AUTH_URL = 'http://127.0.0.1:5000/v2.0/'
DEFAULT_V3_AUTH_URL = 'http://127.0.0.1:5000/v3/'
DEFAULT_AUTH_TOKEN = ' 3bcc3d3a03f44e3d8377f9247b0ad155'
TEST_SERVICE_URL = 'http://127.0.0.1:5000/'
FAKE_V2_ENV = {'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_TENANT_NAME': DEFAULT_TENANT_NAME,
'OS_AUTH_URL': DEFAULT_V2_AUTH_URL,
'OS_IMAGE_URL': DEFAULT_IMAGE_URL}
FAKE_V3_ENV = {'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_PROJECT_ID': DEFAULT_PROJECT_ID,
'OS_USER_DOMAIN_NAME': DEFAULT_USER_DOMAIN_NAME,
'OS_AUTH_URL': DEFAULT_V3_AUTH_URL,
'OS_IMAGE_URL': DEFAULT_IMAGE_URL}
class ShellTest(utils.TestCase):
# auth environment to use
auth_env = FAKE_V2_ENV.copy()
# expected auth plugin to invoke
auth_plugin = 'keystoneclient.auth.identity.v2.Password'
def setUp(self):
super(ShellTest, self).setUp()
global _old_env
fake_env = {
'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_TENANT_NAME': DEFAULT_TENANT_NAME,
'OS_AUTH_URL': DEFAULT_AUTH_URL,
'OS_IMAGE_URL': DEFAULT_IMAGE_URL,
'OS_AUTH_TOKEN': DEFAULT_AUTH_TOKEN}
_old_env, os.environ = os.environ, fake_env.copy()
_old_env, os.environ = os.environ, self.auth_env
global shell, _shell, assert_called, assert_called_anytime
_shell = openstack_shell.OpenStackImagesShell()
@@ -99,6 +118,184 @@ class ShellTest(utils.TestCase):
targeted_image_url = test_shell._get_image_url(fake_args)
self.assertEqual(expected_image_url, targeted_image_url)
@mock.patch.object(openstack_shell.OpenStackImagesShell,
'_get_versioned_client')
def test_cert_and_key_args_interchangeable(self,
mock_versioned_client):
# make sure --os-cert and --os-key are passed correctly
args = '--os-cert mycert --os-key mykey image-list'
shell(args)
assert mock_versioned_client.called
((api_version, args), kwargs) = mock_versioned_client.call_args
self.assertEqual(args.os_cert, 'mycert')
self.assertEqual(args.os_key, 'mykey')
# make sure we get the same thing with --cert-file and --key-file
args = '--cert-file mycertfile --key-file mykeyfile image-list'
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
assert mock_versioned_client.called
((api_version, args), kwargs) = mock_versioned_client.call_args
self.assertEqual(args.os_cert, 'mycertfile')
self.assertEqual(args.os_key, 'mykeyfile')
@mock.patch('glanceclient.v1.client.Client')
def test_no_auth_with_token_and_image_url_with_v1(self, v1_client):
# test no authentication is required if both token and endpoint url
# are specified
args = ('--os-auth-token mytoken --os-image-url https://image:1234/v1 '
'image-list')
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
assert v1_client.called
(args, kwargs) = v1_client.call_args
self.assertEqual(kwargs['token'], 'mytoken')
self.assertEqual(args[0], 'https://image:1234/v1')
@mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schema')
def test_no_auth_with_token_and_image_url_with_v2(self,
cache_schema):
with mock.patch('glanceclient.v2.client.Client') as v2_client:
# test no authentication is required if both token and endpoint url
# are specified
args = ('--os-auth-token mytoken '
'--os-image-url https://image:1234/v2 '
'--os-image-api-version 2 image-list')
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
((args), kwargs) = v2_client.call_args
self.assertEqual(args[0], 'https://image:1234/v2')
self.assertEqual(kwargs['token'], 'mytoken')
def _assert_auth_plugin_args(self, mock_auth_plugin):
# make sure our auth plugin is invoked with the correct args
mock_auth_plugin.assert_called_once_with(
keystone_client_fixtures.V2_URL,
self.auth_env['OS_USERNAME'],
self.auth_env['OS_PASSWORD'],
tenant_name=self.auth_env['OS_TENANT_NAME'],
tenant_id='')
@mock.patch('glanceclient.v1.client.Client')
@mock.patch('keystoneclient.session.Session')
@mock.patch.object(keystoneclient.discover.Discover, 'url_for',
side_effect=[keystone_client_fixtures.V2_URL, None])
def test_auth_plugin_invocation_with_v1(self,
v1_client,
ks_session,
url_for):
with mock.patch(self.auth_plugin) as mock_auth_plugin:
args = 'image-list'
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
self._assert_auth_plugin_args(mock_auth_plugin)
@mock.patch('glanceclient.v2.client.Client')
@mock.patch('keystoneclient.session.Session')
@mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schema')
@mock.patch.object(keystoneclient.discover.Discover, 'url_for',
side_effect=[keystone_client_fixtures.V2_URL, None])
def test_auth_plugin_invocation_with_v2(self,
v2_client,
ks_session,
url_for,
cache_schema):
with mock.patch(self.auth_plugin) as mock_auth_plugin:
args = '--os-image-api-version 2 image-list'
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
self._assert_auth_plugin_args(mock_auth_plugin)
@mock.patch('glanceclient.v1.client.Client')
@mock.patch('keystoneclient.session.Session')
@mock.patch.object(keystoneclient.discover.Discover, 'url_for',
side_effect=[keystone_client_fixtures.V2_URL,
keystone_client_fixtures.V3_URL])
def test_auth_plugin_invocation_with_unversioned_auth_url_with_v1(
self, v1_client, ks_session, url_for):
with mock.patch(self.auth_plugin) as mock_auth_plugin:
args = '--os-auth-url %s image-list' % (
keystone_client_fixtures.BASE_URL)
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
self._assert_auth_plugin_args(mock_auth_plugin)
@mock.patch('glanceclient.v2.client.Client')
@mock.patch('keystoneclient.session.Session')
@mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schema')
@mock.patch.object(keystoneclient.discover.Discover, 'url_for',
side_effect=[keystone_client_fixtures.V2_URL,
keystone_client_fixtures.V3_URL])
def test_auth_plugin_invocation_with_unversioned_auth_url_with_v2(
self, v2_client, ks_session, cache_schema, url_for):
with mock.patch(self.auth_plugin) as mock_auth_plugin:
args = ('--os-auth-url %s --os-image-api-version 2 '
'image-list') % (keystone_client_fixtures.BASE_URL)
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
self._assert_auth_plugin_args(mock_auth_plugin)
class ShellTestWithKeystoneV3Auth(ShellTest):
# auth environment to use
auth_env = FAKE_V3_ENV.copy()
# expected auth plugin to invoke
auth_plugin = 'keystoneclient.auth.identity.v3.Password'
def _assert_auth_plugin_args(self, mock_auth_plugin):
mock_auth_plugin.assert_called_once_with(
keystone_client_fixtures.V3_URL,
user_id='',
username=self.auth_env['OS_USERNAME'],
password=self.auth_env['OS_PASSWORD'],
user_domain_id='',
user_domain_name=self.auth_env['OS_USER_DOMAIN_NAME'],
project_id=self.auth_env['OS_PROJECT_ID'],
project_name='',
project_domain_id='',
project_domain_name='')
@mock.patch('glanceclient.v1.client.Client')
@mock.patch('keystoneclient.session.Session')
@mock.patch.object(keystoneclient.discover.Discover, 'url_for',
side_effect=[None, keystone_client_fixtures.V3_URL])
def test_auth_plugin_invocation_with_v1(self,
v1_client,
ks_session,
url_for):
with mock.patch(self.auth_plugin) as mock_auth_plugin:
args = 'image-list'
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
self._assert_auth_plugin_args(mock_auth_plugin)
@mock.patch('glanceclient.v2.client.Client')
@mock.patch('keystoneclient.session.Session')
@mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schema')
@mock.patch.object(keystoneclient.discover.Discover, 'url_for',
side_effect=[None, keystone_client_fixtures.V3_URL])
def test_auth_plugin_invocation_with_v2(self,
v2_client,
ks_session,
url_for,
cache_schema):
with mock.patch(self.auth_plugin) as mock_auth_plugin:
args = '--os-image-api-version 2 image-list'
glance_shell = openstack_shell.OpenStackImagesShell()
glance_shell.main(args.split())
self._assert_auth_plugin_args(mock_auth_plugin)
@mock.patch('keystoneclient.session.Session')
@mock.patch('keystoneclient.discover.Discover',
side_effect=ks_exc.ClientException())
def test_api_discovery_failed_with_unversioned_auth_url(self,
ks_session,
discover):
args = '--os-auth-url %s image-list' % (
keystone_client_fixtures.BASE_URL)
glance_shell = openstack_shell.OpenStackImagesShell()
self.assertRaises(exc.CommandError, glance_shell.main, args.split())
class ShellCacheSchemaTest(utils.TestCase):
def setUp(self):