Put EC2 API -> eventlet back into trunk, fixing the bits that I missed when I put it into trunk on 9/21.

Note that some of this got into trunk via r291 accidentally because r291 was a branch based off of the trunk that was reverted on 9/22.
This commit is contained in:
Michael Gundlach
2010-09-23 21:18:12 +00:00
committed by Tarmac
18 changed files with 342 additions and 254 deletions
+16 -31
View File
@@ -1,31 +1,28 @@
#!/usr/bin/env python
# pylint: disable-msg=C0103
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tornado daemon for the main API endpoint.
Nova API daemon.
"""
import logging
import os
import sys
from tornado import httpserver
from tornado import ioloop
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
@@ -36,28 +33,16 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import flags
from nova import server
from nova import utils
from nova.endpoint import admin
from nova.endpoint import api
from nova.endpoint import cloud
from nova import server
FLAGS = flags.FLAGS
flags.DEFINE_integer('api_port', 8773, 'API port')
def main(_argv):
"""Load the controllers and start the tornado I/O loop."""
controllers = {
'Cloud': cloud.CloudController(),
'Admin': admin.AdminController()}
_app = api.APIServerApplication(controllers)
io_inst = ioloop.IOLoop.instance()
http_server = httpserver.HTTPServer(_app)
http_server.listen(FLAGS.cc_port)
logging.debug('Started HTTP server on %s', FLAGS.cc_port)
io_inst.start()
def main(_args):
from nova import api
from nova import wsgi
wsgi.run_server(api.API(), FLAGS.api_port)
if __name__ == '__main__':
utils.default_flagfile()
-45
View File
@@ -1,45 +0,0 @@
#!/usr/bin/env python
# pylint: disable-msg=C0103
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Nova API daemon.
"""
import os
import sys
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import api
from nova import flags
from nova import utils
from nova import wsgi
FLAGS = flags.FLAGS
flags.DEFINE_integer('api_port', 8773, 'API port')
if __name__ == '__main__':
utils.default_flagfile()
wsgi.run_server(api.API(), FLAGS.api_port)
+1 -2
View File
@@ -73,7 +73,6 @@ from nova import quota
from nova import utils
from nova.auth import manager
from nova.cloudpipe import pipelib
from nova.endpoint import cloud
FLAGS = flags.FLAGS
@@ -84,7 +83,7 @@ class VpnCommands(object):
def __init__(self):
self.manager = manager.AuthManager()
self.pipe = pipelib.CloudPipe(cloud.CloudController())
self.pipe = pipelib.CloudPipe()
def list(self):
"""Print a listing of the VPNs for all projects."""
-8
View File
@@ -172,14 +172,6 @@ Further Challenges
The :mod:`rbac` Module
--------------------------
.. automodule:: nova.auth.rbac
:members:
:undoc-members:
:show-inheritance:
The :mod:`signer` Module
------------------------
+65 -4
View File
@@ -23,23 +23,65 @@ Root WSGI middleware for all API controllers.
import routes
import webob.dec
from nova import flags
from nova import wsgi
from nova.api import cloudpipe
from nova.api import ec2
from nova.api import rackspace
from nova.api.ec2 import metadatarequesthandler
flags.DEFINE_string('rsapi_subdomain', 'rs',
'subdomain running the RS API')
flags.DEFINE_string('ec2api_subdomain', 'ec2',
'subdomain running the EC2 API')
flags.DEFINE_string('FAKE_subdomain', None,
'set to rs or ec2 to fake the subdomain of the host for testing')
FLAGS = flags.FLAGS
class API(wsgi.Router):
"""Routes top-level requests to the appropriate controller."""
def __init__(self):
rsdomain = {'sub_domain': [FLAGS.rsapi_subdomain]}
ec2domain = {'sub_domain': [FLAGS.ec2api_subdomain]}
# If someone wants to pretend they're hitting the RS subdomain
# on their local box, they can set FAKE_subdomain to 'rs', which
# removes subdomain restrictions from the RS routes below.
if FLAGS.FAKE_subdomain == 'rs':
rsdomain = {}
elif FLAGS.FAKE_subdomain == 'ec2':
ec2domain = {}
mapper = routes.Mapper()
mapper.connect("/", controller=self.versions)
mapper.connect("/v1.0/{path_info:.*}", controller=rackspace.API())
mapper.connect("/services/{path_info:.*}", controller=ec2.API())
mapper.sub_domains = True
mapper.connect("/", controller=self.rsapi_versions,
conditions=rsdomain)
mapper.connect("/v1.0/{path_info:.*}", controller=rackspace.API(),
conditions=rsdomain)
mapper.connect("/", controller=self.ec2api_versions,
conditions=ec2domain)
mapper.connect("/services/{path_info:.*}", controller=ec2.API(),
conditions=ec2domain)
mapper.connect("/cloudpipe/{path_info:.*}", controller=cloudpipe.API())
mrh = metadatarequesthandler.MetadataRequestHandler()
for s in ['/latest',
'/2009-04-04',
'/2008-09-01',
'/2008-02-01',
'/2007-12-15',
'/2007-10-10',
'/2007-08-29',
'/2007-03-01',
'/2007-01-19',
'/1.0']:
mapper.connect('%s/{path_info:.*}' % s, controller=mrh,
conditions=ec2domain)
super(API, self).__init__(mapper)
@webob.dec.wsgify
def versions(self, req):
def rsapi_versions(self, req):
"""Respond to a request for all OpenStack API versions."""
response = {
"versions": [
@@ -48,3 +90,22 @@ class API(wsgi.Router):
"application/xml": {
"attributes": dict(version=["status", "id"])}}
return wsgi.Serializer(req.environ, metadata).to_content_type(response)
@webob.dec.wsgify
def ec2api_versions(self, req):
"""Respond to a request for all EC2 versions."""
# available api versions
versions = [
'1.0',
'2007-01-19',
'2007-03-01',
'2007-08-29',
'2007-10-10',
'2007-12-15',
'2008-02-01',
'2008-09-01',
'2009-04-04',
]
return ''.join('%s\n' % v for v in versions)
+69
View File
@@ -0,0 +1,69 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
REST API Request Handlers for CloudPipe
"""
import logging
import urllib
import webob
import webob.dec
import webob.exc
from nova import crypto
from nova import wsgi
from nova.auth import manager
from nova.api.ec2 import cloud
_log = logging.getLogger("api")
_log.setLevel(logging.DEBUG)
class API(wsgi.Application):
def __init__(self):
self.controller = cloud.CloudController()
@webob.dec.wsgify
def __call__(self, req):
if req.method == 'POST':
return self.sign_csr(req)
_log.debug("Cloudpipe path is %s" % req.path_info)
if req.path_info.endswith("/getca/"):
return self.send_root_ca(req)
return webob.exc.HTTPNotFound()
def get_project_id_from_ip(self, ip):
# TODO(eday): This was removed with the ORM branch, fix!
instance = self.controller.get_instance_by_ip(ip)
return instance['project_id']
def send_root_ca(self, req):
_log.debug("Getting root ca")
project_id = self.get_project_id_from_ip(req.remote_addr)
res = webob.Response()
res.headers["Content-Type"] = "text/plain"
res.body = crypto.fetch_ca(project_id)
return res
def sign_csr(self, req):
project_id = self.get_project_id_from_ip(req.remote_addr)
cert = self.str_params['cert']
return crypto.sign_csr(urllib.unquote(cert), project_id)
+7 -3
View File
@@ -25,6 +25,7 @@ import webob.dec
import webob.exc
from nova import exception
from nova import flags
from nova import wsgi
from nova.api.ec2 import apirequest
from nova.api.ec2 import context
@@ -33,6 +34,7 @@ from nova.api.ec2 import cloud
from nova.auth import manager
FLAGS = flags.FLAGS
_log = logging.getLogger("api")
_log.setLevel(logging.DEBUG)
@@ -164,8 +166,8 @@ class Authorizer(wsgi.Middleware):
'ModifyImageAttribute': ['projectmanager', 'sysadmin'],
},
'AdminController': {
# All actions have the same permission: [] (the default)
# admins will be allowed to run them
# All actions have the same permission: ['none'] (the default)
# superusers will be allowed to run them
# all others will get HTTPUnauthorized.
},
}
@@ -175,7 +177,7 @@ class Authorizer(wsgi.Middleware):
context = req.environ['ec2.context']
controller_name = req.environ['ec2.controller'].__class__.__name__
action = req.environ['ec2.action']
allowed_roles = self.action_roles[controller_name].get(action, [])
allowed_roles = self.action_roles[controller_name].get(action, ['none'])
if self._matches_any_role(context, allowed_roles):
return self.application
else:
@@ -183,6 +185,8 @@ class Authorizer(wsgi.Middleware):
def _matches_any_role(self, context, roles):
"""Return True if any role in roles is allowed in context."""
if context.user.is_superuser():
return True
if 'all' in roles:
return True
if 'none' in roles:
+1 -3
View File
@@ -68,10 +68,8 @@ class APIRequest(object):
key = _camelcase_to_underscore(parts[0])
if len(parts) > 1:
d = args.get(key, {})
d[parts[1]] = value[0]
d[parts[1]] = value
value = d
else:
value = value[0]
args[key] = value
for key in args.keys():
+73
View File
@@ -0,0 +1,73 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Metadata request handler."""
import logging
import webob.dec
import webob.exc
from nova.api.ec2 import cloud
class MetadataRequestHandler(object):
"""Serve metadata from the EC2 API."""
def print_data(self, data):
if isinstance(data, dict):
output = ''
for key in data:
if key == '_name':
continue
output += key
if isinstance(data[key], dict):
if '_name' in data[key]:
output += '=' + str(data[key]['_name'])
else:
output += '/'
output += '\n'
return output[:-1] # cut off last \n
elif isinstance(data, list):
return '\n'.join(data)
else:
return str(data)
def lookup(self, path, data):
items = path.split('/')
for item in items:
if item:
if not isinstance(data, dict):
return data
if not item in data:
return None
data = data[item]
return data
@webob.dec.wsgify
def __call__(self, req):
cc = cloud.CloudController()
meta_data = cc.get_metadata(req.remote_addr)
if meta_data is None:
logging.error('Failed to get metadata for ip: %s' % req.remote_addr)
raise webob.exc.HTTPNotFound()
data = self.lookup(req.path_info, meta_data)
if data is None:
raise webob.exc.HTTPNotFound()
return self.print_data(data)
+2 -2
View File
@@ -44,7 +44,7 @@ flags.DEFINE_list('allowed_roles',
# NOTE(vish): a user with one of these roles will be a superuser and
# have access to all api commands
flags.DEFINE_list('superuser_roles', ['cloudadmin'],
'Roles that ignore rbac checking completely')
'Roles that ignore authorization checking completely')
# NOTE(vish): a user with one of these roles will have it for every
# project, even if he or she is not a member of the project
@@ -304,7 +304,7 @@ class AuthManager(object):
return "%s:%s" % (user.access, Project.safe_id(project))
def is_superuser(self, user):
"""Checks for superuser status, allowing user to bypass rbac
"""Checks for superuser status, allowing user to bypass authorization
@type user: User or uid
@param user: User to check.
-59
View File
@@ -1,59 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tornado REST API Request Handlers for CloudPipe
"""
import logging
import urllib
import tornado.web
from nova import crypto
from nova.auth import manager
_log = logging.getLogger("api")
_log.setLevel(logging.DEBUG)
class CloudPipeRequestHandler(tornado.web.RequestHandler):
def get(self, path):
path = self.request.path
_log.debug( "Cloudpipe path is %s" % path)
if path.endswith("/getca/"):
self.send_root_ca()
self.finish()
def get_project_id_from_ip(self, ip):
cc = self.application.controllers['Cloud']
instance = cc.get_instance_by_ip(ip)
instance['project_id']
def send_root_ca(self):
_log.debug( "Getting root ca")
project_id = self.get_project_id_from_ip(self.request.remote_ip)
self.set_header("Content-Type", "text/plain")
self.write(crypto.fetch_ca(project_id))
def post(self, *args, **kwargs):
project_id = self.get_project_id_from_ip(self.request.remote_ip)
cert = self.get_argument('cert', '')
self.write(crypto.sign_csr(urllib.unquote(cert), project_id))
self.finish()
+4 -2
View File
@@ -32,6 +32,8 @@ from nova import exception
from nova import flags
from nova import utils
from nova.auth import manager
# TODO(eday): Eventually changes these to something not ec2-specific
from nova.api.ec2 import cloud
from nova.api.ec2 import context
@@ -42,8 +44,8 @@ flags.DEFINE_string('boot_script_template',
class CloudPipe(object):
def __init__(self, cloud_controller):
self.controller = cloud_controller
def __init__(self):
self.controller = cloud.CloudController()
self.manager = manager.AuthManager()
def launch_vpn_instance(self, project_id):
+27 -22
View File
@@ -46,9 +46,9 @@ LOG.setLevel(logging.DEBUG)
class Connection(carrot_connection.BrokerConnection):
"""Connection instance object"""
@classmethod
def instance(cls):
def instance(cls, new=False):
"""Returns the instance"""
if not hasattr(cls, '_instance'):
if new or not hasattr(cls, '_instance'):
params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
userid=FLAGS.rabbit_userid,
@@ -60,7 +60,10 @@ class Connection(carrot_connection.BrokerConnection):
# NOTE(vish): magic is fun!
# pylint: disable-msg=W0142
cls._instance = cls(**params)
if new:
return cls(**params)
else:
cls._instance = cls(**params)
return cls._instance
@classmethod
@@ -94,8 +97,6 @@ class Consumer(messaging.Consumer):
injected.start()
return injected
attachToTornado = attach_to_tornado
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be
@@ -265,28 +266,32 @@ def call(topic, msg):
msg.update({'_msg_id': msg_id})
LOG.debug("MSG_ID is %s" % (msg_id))
conn = Connection.instance()
d = defer.Deferred()
class WaitMessage(object):
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
if data['failure']:
self.result = RemoteError(*data['failure'])
else:
self.result = data['result']
wait_msg = WaitMessage()
conn = Connection.instance(True)
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
consumer.register_callback(wait_msg)
def deferred_receive(data, message):
"""Acks message and callbacks or errbacks"""
message.ack()
if data['failure']:
return d.errback(RemoteError(*data['failure']))
else:
return d.callback(data['result'])
consumer.register_callback(deferred_receive)
injected = consumer.attach_to_tornado()
# clean up after the injected listened and return x
d.addCallback(lambda x: injected.stop() and x or x)
conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
return d
try:
consumer.wait(limit=1)
except StopIteration:
pass
consumer.close()
return wait_msg.result
def cast(topic, msg):
+45 -66
View File
@@ -18,12 +18,13 @@
import unittest
import logging
import webob
from nova import exception
from nova import flags
from nova import test
from nova.api import ec2
from nova.auth import manager
from nova.auth import rbac
FLAGS = flags.FLAGS
@@ -72,9 +73,17 @@ class AccessTestCase(test.BaseTestCase):
try:
self.project.add_role(self.testsys, 'sysadmin')
except: pass
self.context = Context()
self.context.project = self.project
#user is set in each test
def noopWSGIApp(environ, start_response):
start_response('200 OK', [])
return ['']
self.mw = ec2.Authorizer(noopWSGIApp)
self.mw.action_roles = {'str': {
'_allow_all': ['all'],
'_allow_none': [],
'_allow_project_manager': ['projectmanager'],
'_allow_sys_and_net': ['sysadmin', 'netadmin'],
'_allow_sysadmin': ['sysadmin']}}
def tearDown(self):
um = manager.AuthManager()
@@ -87,76 +96,46 @@ class AccessTestCase(test.BaseTestCase):
um.delete_user('testsys')
super(AccessTestCase, self).tearDown()
def response_status(self, user, methodName):
context = Context()
context.project = self.project
context.user = user
environ = {'ec2.context' : context,
'ec2.controller': 'some string',
'ec2.action': methodName}
req = webob.Request.blank('/', environ)
resp = req.get_response(self.mw)
return resp.status_int
def shouldAllow(self, user, methodName):
self.assertEqual(200, self.response_status(user, methodName))
def shouldDeny(self, user, methodName):
self.assertEqual(401, self.response_status(user, methodName))
def test_001_allow_all(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_all(self.context))
self.context.user = self.testpmsys
self.assertTrue(self._allow_all(self.context))
self.context.user = self.testnet
self.assertTrue(self._allow_all(self.context))
self.context.user = self.testsys
self.assertTrue(self._allow_all(self.context))
users = [self.testadmin, self.testpmsys, self.testnet, self.testsys]
for user in users:
self.shouldAllow(user, '_allow_all')
def test_002_allow_none(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_none(self.context))
self.context.user = self.testpmsys
self.assertRaises(exception.NotAuthorized, self._allow_none, self.context)
self.context.user = self.testnet
self.assertRaises(exception.NotAuthorized, self._allow_none, self.context)
self.context.user = self.testsys
self.assertRaises(exception.NotAuthorized, self._allow_none, self.context)
self.shouldAllow(self.testadmin, '_allow_none')
users = [self.testpmsys, self.testnet, self.testsys]
for user in users:
self.shouldDeny(user, '_allow_none')
def test_003_allow_project_manager(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_project_manager(self.context))
self.context.user = self.testpmsys
self.assertTrue(self._allow_project_manager(self.context))
self.context.user = self.testnet
self.assertRaises(exception.NotAuthorized, self._allow_project_manager, self.context)
self.context.user = self.testsys
self.assertRaises(exception.NotAuthorized, self._allow_project_manager, self.context)
for user in [self.testadmin, self.testpmsys]:
self.shouldAllow(user, '_allow_project_manager')
for user in [self.testnet, self.testsys]:
self.shouldDeny(user, '_allow_project_manager')
def test_004_allow_sys_and_net(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_sys_and_net(self.context))
self.context.user = self.testpmsys # doesn't have the per project sysadmin
self.assertRaises(exception.NotAuthorized, self._allow_sys_and_net, self.context)
self.context.user = self.testnet
self.assertTrue(self._allow_sys_and_net(self.context))
self.context.user = self.testsys
self.assertTrue(self._allow_sys_and_net(self.context))
def test_005_allow_sys_no_pm(self):
self.context.user = self.testadmin
self.assertTrue(self._allow_sys_no_pm(self.context))
self.context.user = self.testpmsys
self.assertRaises(exception.NotAuthorized, self._allow_sys_no_pm, self.context)
self.context.user = self.testnet
self.assertRaises(exception.NotAuthorized, self._allow_sys_no_pm, self.context)
self.context.user = self.testsys
self.assertTrue(self._allow_sys_no_pm(self.context))
@rbac.allow('all')
def _allow_all(self, context):
return True
@rbac.allow('none')
def _allow_none(self, context):
return True
@rbac.allow('projectmanager')
def _allow_project_manager(self, context):
return True
@rbac.allow('sysadmin', 'netadmin')
def _allow_sys_and_net(self, context):
return True
@rbac.allow('sysadmin')
@rbac.deny('projectmanager')
def _allow_sys_no_pm(self, context):
return True
for user in [self.testadmin, self.testnet, self.testsys]:
self.shouldAllow(user, '_allow_sys_and_net')
# denied because it doesn't have the per project sysadmin
for user in [self.testpmsys]:
self.shouldDeny(user, '_allow_sys_and_net')
if __name__ == "__main__":
# TODO: Implement use_fake as an option
+26 -4
View File
@@ -25,6 +25,7 @@ import stubout
import webob
import webob.dec
import nova.exception
from nova import api
from nova.tests.api.test_helper import *
@@ -36,25 +37,46 @@ class Test(unittest.TestCase):
def tearDown(self): # pylint: disable-msg=C0103
self.stubs.UnsetAll()
def _request(self, url, subdomain, **kwargs):
environ_keys = {'HTTP_HOST': '%s.example.com' % subdomain}
environ_keys.update(kwargs)
req = webob.Request.blank(url, environ_keys)
return req.get_response(api.API())
def test_rackspace(self):
self.stubs.Set(api.rackspace, 'API', APIStub)
result = webob.Request.blank('/v1.0/cloud').get_response(api.API())
result = self._request('/v1.0/cloud', 'rs')
self.assertEqual(result.body, "/cloud")
def test_ec2(self):
self.stubs.Set(api.ec2, 'API', APIStub)
result = webob.Request.blank('/ec2/cloud').get_response(api.API())
result = self._request('/services/cloud', 'ec2')
self.assertEqual(result.body, "/cloud")
def test_not_found(self):
self.stubs.Set(api.ec2, 'API', APIStub)
self.stubs.Set(api.rackspace, 'API', APIStub)
result = webob.Request.blank('/test/cloud').get_response(api.API())
result = self._request('/test/cloud', 'ec2')
self.assertNotEqual(result.body, "/cloud")
def test_query_api_versions(self):
result = webob.Request.blank('/').get_response(api.API())
result = self._request('/', 'rs')
self.assertTrue('CURRENT' in result.body)
def test_metadata(self):
def go(url):
result = self._request(url, 'ec2',
REMOTE_ADDR='128.192.151.2')
# Each should get to the ORM layer and fail to find the IP
self.assertRaises(nova.exception.NotFound, go, '/latest/')
self.assertRaises(nova.exception.NotFound, go, '/2009-04-04/')
self.assertRaises(nova.exception.NotFound, go, '/1.0/')
def test_ec2_root(self):
result = self._request('/', 'ec2')
self.assertTrue('2007-12-15\n' in result.body)
if __name__ == '__main__':
unittest.main()
+5
View File
@@ -25,12 +25,17 @@ import random
import StringIO
import webob
from nova import flags
from nova import test
from nova import api
from nova.api.ec2 import cloud
from nova.auth import manager
FLAGS = flags.FLAGS
FLAGS.FAKE_subdomain = 'ec2'
class FakeHttplibSocket(object):
"""a fake socket implementation for httplib.HTTPResponse, trivial"""
def __init__(self, response_string):
-1
View File
@@ -22,7 +22,6 @@ from M2Crypto import RSA
import StringIO
import time
from tornado import ioloop
from twisted.internet import defer
import unittest
from xml.etree import ElementTree
+1 -2
View File
@@ -49,8 +49,7 @@ from nova import datastore
from nova import flags
from nova import twistd
#TODO(gundlach): rewrite and readd this after merge
#from nova.tests.access_unittest import *
from nova.tests.access_unittest import *
from nova.tests.auth_unittest import *
from nova.tests.api_unittest import *
from nova.tests.cloud_unittest import *