Admin API + Worker Tracking.

This commit is contained in:
Todd Willey
2010-06-25 18:55:14 -04:00
parent 8a2f1763cb
commit 849282175c
10 changed files with 534 additions and 71 deletions
+4 -2
View File
@@ -20,7 +20,7 @@
# under the License.
"""
Tornado daemon for the main API endpoint.
Tornado daemon for the main API endpoint.
"""
import logging
@@ -34,6 +34,7 @@ from nova import rpc
from nova import server
from nova import utils
from nova.auth import users
from nova.compute import model
from nova.endpoint import admin
from nova.endpoint import api
from nova.endpoint import cloud
@@ -43,9 +44,10 @@ FLAGS = flags.FLAGS
def main(_argv):
user_manager = users.UserManager()
host_manager = model.Host
controllers = {
'Cloud': cloud.CloudController(),
'Admin': admin.AdminController(user_manager)
'Admin': admin.AdminController(user_manager, host_manager)
}
_app = api.APIServerApplication(user_manager, controllers)
+2 -2
View File
@@ -75,8 +75,8 @@ def main():
topic='%s.%s' % (FLAGS.compute_topic, FLAGS.node_name),
proxy=n)
# heartbeat = task.LoopingCall(n.report_state)
# heartbeat.start(interval=FLAGS.node_report_state_interval, now=False)
pulse = task.LoopingCall(n.report_state, FLAGS.node_name, 'nova-compute')
pulse.start(interval=FLAGS.node_report_state_interval, now=False)
injected = consumer_all.attach_to_twisted()
injected = consumer_node.attach_to_twisted()
+31 -2
View File
@@ -21,9 +21,11 @@
Nova User API client library.
"""
import base64
from nova import vendor
import boto
from boto.ec2.regioninfo import RegionInfo
import base64
class UserInfo(object):
""" Information about a Nova user
@@ -57,6 +59,30 @@ class UserInfo(object):
elif name == 'secretkey':
self.secretkey = str(value)
class HostInfo(object):
"""
Information about a Nova Host:
Disk stats
Running Instances
Memory stats
CPU stats
Network address info
Firewall info
Bridge and devices
"""
def __init__(self, connection=None):
self.connection = connection
self.hostname = None
def __repr__(self):
return 'Host:%s' % self.hostname
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
setattr(self, name, value)
class NovaAdminClient(object):
def __init__(self, clc_ip='127.0.0.1', region='nova', access_key='admin',
@@ -91,7 +117,7 @@ class NovaAdminClient(object):
def get_users(self):
""" grabs the list of all users """
return self.apiconn.get_list('DescribeUsers', {}, (['item', UserInfo]))
return self.apiconn.get_list('DescribeUsers', {}, [('item', UserInfo)])
def get_user(self, name):
""" grab a single user by name """
@@ -116,3 +142,6 @@ class NovaAdminClient(object):
""" returns the content of a zip file containing novarc and access credentials. """
return self.apiconn.get_object('GenerateX509ForUser', {'Name': username}, UserInfo).file
def get_hosts(self):
return self.apiconn.get_list('DescribeHosts', {}, [('item', HostInfo)])
+1 -1
View File
@@ -328,7 +328,7 @@ class UserManager(object):
user = self.get_user_from_access_key(access_key)
if user == None:
raise exception.NotFound('No user found for access key')
raise exception.NotFound('No user found for access key %s' % access_key)
if project_name is '':
project_name = user.name
+245 -36
View File
@@ -43,16 +43,30 @@ True
"""
import logging
import time
from nova import vendor
import redis
from nova import datastore
from nova import exception
from nova import flags
from nova import utils
FLAGS = flags.FLAGS
class ConnectionError(exception.Error):
pass
def absorb_connection_error(fn):
def _wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except redis.exceptions.ConnectionError, ce:
raise ConnectionError(str(ce))
return _wrapper
# TODO(ja): singleton instance of the directory
class InstanceDirectory(object):
"""an api for interacting with the global state of instances """
@@ -64,6 +78,7 @@ class InstanceDirectory(object):
def __getitem__(self, item):
return self.get(item)
@absorb_connection_error
def by_project(self, project):
""" returns a list of instance objects for a project """
for instance_id in datastore.Redis.instance().smembers('project:%s:instances' % project):
@@ -87,10 +102,12 @@ class InstanceDirectory(object):
""" returns the instance a volume is attached to """
pass
@absorb_connection_error
def exists(self, instance_id):
return datastore.Redis.instance().sismember('instances', instance_id)
@property
@absorb_connection_error
def all(self):
""" returns a list of all instances """
for instance_id in datastore.Redis.instance().smembers('instances'):
@@ -101,34 +118,66 @@ class InstanceDirectory(object):
instance_id = utils.generate_uid('i')
return self.get(instance_id)
class Instance(object):
""" Wrapper around stored properties of an instance """
def __init__(self, instance_id):
""" loads an instance from the datastore if exists """
self.instance_id = instance_id
class BasicModel(object):
@absorb_connection_error
def __init__(self):
self.initial_state = {}
self.state = datastore.Redis.instance().hgetall(self.__redis_key)
if self.state:
self.initial_state = self.state
else:
self.state = {'state': 0,
'state_description': 'pending',
'instance_id': instance_id,
'node_name': 'unassigned',
'project_id': 'unassigned',
'user_id': 'unassigned'
}
self.state = self.default_state()
def default_state(self):
""" You probably want to define this in your subclass """
return {}
@classmethod
def lookup(cls, identifier):
rv = cls(identifier)
if rv.new_record():
return None
else:
return rv
@classmethod
@absorb_connection_error
def all(cls):
""" yields all objects in the store """
redis_set = cls._redis_set_name(cls.__name__)
for identifier in datastore.Redis.instance().smembers(redis_set):
yield cls(identifier)
@classmethod
@absorb_connection_error
def associated_to(cls, foreign_type, foreign_id):
redis_set = cls._redis_association_name(foreign_type, foreign_id)
for identifier in datastore.Redis.instance().smembers(redis_set):
yield cls(identifier)
@classmethod
def _redis_set_name(cls, kls_name):
# stupidly pluralize (for compatiblity with previous codebase)
return kls_name.lower() + "s"
@classmethod
def _redis_association_name(cls, foreign_type, foreign_id):
return cls._redis_set_name(
"%s:%s:%s" %
(foreign_type, foreign_id, cls.__name__)
)
@property
def identifier(self):
""" You DEFINITELY want to define this in your subclass """
raise Exception("Your sublcass should define identifier")
@property
def __redis_key(self):
""" Magic string for instance keys """
return 'instance:%s' % self.instance_id
return '%s:%s' % (self.__class__.__name__.lower(), self.identifier)
def __repr__(self):
return "<Instance:%s>" % self.instance_id
return "<%s:%s>" % (self.__class__.__name__, self.identifier)
def keys(self):
return self.state.keys()
@@ -157,12 +206,59 @@ class Instance(object):
def __delitem__(self, item):
""" We don't support this """
raise Exception("Silly monkey, Instances NEED all their properties.")
raise Exception("Silly monkey, models NEED all their properties.")
def new_record(self):
return self.initial_state == {}
@absorb_connection_error
def add_to_index(self):
set_name = self.__class__._redis_set_name(self.__class__.__name__)
datastore.Redis.instance().sadd(set_name, self.identifier)
@absorb_connection_error
def remove_from_index(self):
set_name = self.__class__._redis_set_name(self.__class__.__name__)
datastore.Redis.instance().srem(set_name, self.identifier)
@absorb_connection_error
def associate_with(self, foreign_type, foreign_id):
# note the extra 's' on the end is for plurality
# to match the old data without requiring a migration of any sort
self.add_associated_model_to_its_set(foreign_type, foreign_id)
redis_set = self.__class__._redis_association_name(
foreign_type,
foreign_id
)
datastore.Redis.instance().sadd(redis_set, self.identifier)
@absorb_connection_error
def unassociate_with(self, foreign_type, foreign_id):
redis_set = self.__class__._redis_association_name(
foreign_type,
foreign_id
)
datastore.Redis.instance().srem(redis_set, self.identifier)
def add_associated_model_to_its_set(self, my_type, my_id):
table = globals()
klsname = my_type.capitalize()
if table.has_key(klsname):
my_class = table[klsname]
my_inst = my_class(my_id)
my_inst.save()
else:
logging.warning(
"no model class for %s when building association from %s" %
(klsname, self)
)
@absorb_connection_error
def save(self):
""" update the directory with the state from this instance
make sure you've set the project_id and user_id before you call save
for the first time.
"""
update the directory with the state from this model
also add it to the index of items of the same type
then set the initial_state = state so new changes are tracked
"""
# TODO(ja): implement hmset in redis-py and use it
# instead of multiple calls to hset
@@ -170,29 +266,53 @@ class Instance(object):
# if (not self.initial_state.has_key(key)
# or self.initial_state[key] != val):
datastore.Redis.instance().hset(self.__redis_key, key, val)
if self.initial_state == {}:
datastore.Redis.instance().sadd('project:%s:instances' % self.project,
self.instance_id)
datastore.Redis.instance().sadd('instances', self.instance_id)
self.add_to_index()
self.initial_state = self.state
return True
@absorb_connection_error
def destroy(self):
"""
deletes all related records from datastore.
does NOT do anything to running libvirt state.
"""
logging.info(
"Destroying datamodel for %s %s",
(self.__class__.__name__, self.identifier)
)
datastore.Redis.instance().delete(self.__redis_key)
return True
class Instance(BasicModel):
""" Wrapper around stored properties of an instance """
def __init__(self, instance_id):
""" loads an instance from the datastore if exists """
# set instance data before super call since it uses default_state
self.instance_id = instance_id
super(Instance, self).__init__()
def default_state(self):
return {
'state': 0,
'state_description': 'pending',
'instance_id': self.instance_id,
'node_name': 'unassigned',
'project_id': 'unassigned',
'user_id': 'unassigned'
}
@property
def identifier(self):
return self.instance_id
@property
def project(self):
if self.state.get('project_id', None):
return self.state['project_id']
return self.state.get('owner_id', 'unassigned')
def destroy(self):
""" deletes all related records from datastore.
does NOT do anything to running libvirt state.
"""
logging.info("Destroying datamodel for instance %s", self.instance_id)
datastore.Redis.instance().srem('project:%s:instances' % self.project,
self.instance_id)
datastore.Redis.instance().srem('instances', self.instance_id)
return True
@property
def volumes(self):
""" returns a list of attached volumes """
@@ -203,6 +323,95 @@ class Instance(object):
""" Returns a reservation object """
pass
def save(self):
""" Call into superclass to save object, then save associations """
# XXX: doesn't track migration between projects, just adds the first one
should_update_project = self.new_record()
success = super(Instance, self).save()
if success and should_update_project:
self.associate_with("project", self.project)
return True
def destroy(self):
""" Destroy associations, then destroy the object """
self.unassociate_with("project", self.project)
return super(Instance, self).destroy()
class Host(BasicModel):
"""
A Host is the base machine that runs many virtualized Instance.
Hosts are usually controlled vi nova.compute.node.Node, this model
just stores stats about a host in redis.
"""
def __init__(self, hostname):
""" loads an instance from the datastore if exists """
# set instance data before super call since it uses default_state
self.hostname = hostname
super(Host, self).__init__()
def default_state(self):
return {
"hostname": self.hostname
}
@property
def identifier(self):
return self.hostname
class Worker(BasicModel):
"""
A Worker is a job (compute, api, network, ...) that runs on a host.
"""
def __init__(self, host_or_combined, binpath=None):
""" loads an instance from the datastore if exists """
# set instance data before super call since it uses default_state
# since loading from datastore expects a combined key that
# is equivilent to identifier, we need to expect that, while
# maintaining meaningful semantics (2 arguments) when creating
# from within other code like the bin/nova-* scripts
if binpath:
self.hostname = host_or_combined
self.binary = binpath
else:
self.hostname, self.binary = host_or_combined.split(":")
super(Worker, self).__init__()
def default_state(self):
return {
"hostname": self.hostname,
"binary": self.binary,
"updated_at": utils.timestamp()
}
@property
def identifier(self):
return "%s:%s" % (self.hostname, self.binary)
def save(self):
""" Call into superclass to save object, then save associations """
# XXX: doesn't clear out from host list after crash, termination, etc
success = super(Worker, self).save()
if success:
self.associate_with("host", self.hostname)
return True
def destroy(self):
""" Destroy associations, then destroy the object """
self.unassociate_with("host", self.hostname)
return super(Worker, self).destroy()
def heartbeat(self):
self['updated_at'] = utils.timestamp()
self.save()
return True
@classmethod
def by_host(cls, hostname):
for x in cls.associated_to("host", hostname):
yield x
if __name__ == "__main__":
import doctest
+13 -3
View File
@@ -142,9 +142,19 @@ class Node(object, service.Service):
return retval
@defer.inlineCallbacks
def report_state(self):
logging.debug("Reporting State")
return
def report_state(self, hostname, worker):
try:
record = model.Worker(hostname, worker)
record.heartbeat()
if getattr(self, "model_disconnected", False):
self.model_disconnected = False
logging.error("Recovered model server connection!")
except model.ConnectionError, ex:
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
yield
# @exception.wrap_exception
def run_instance(self, instance_id, **_kwargs):
+18 -25
View File
@@ -25,7 +25,7 @@ Admin API controller, exposed through http via the api worker.
import base64
def user_dict(user, base64_file=None):
"""Convert the user object to a result dict"""
""" Convert the user object to a result dict """
if user:
return {
'username': user.id,
@@ -36,23 +36,17 @@ def user_dict(user, base64_file=None):
else:
return {}
def node_dict(node):
"""Convert a node object to a result dict"""
if node:
return {
'node_id': node.id,
'workers': ", ".join(node.workers),
'disks': ", ".join(node.disks),
'ram': node.memory,
'load_average' : node.load_average,
}
def host_dict(host):
""" Convert a host model object to a result dict """
if host:
return host.state
else:
return {}
def admin_only(target):
"""Decorator for admin-only API calls"""
""" Decorator for admin-only API calls """
def wrapper(*args, **kwargs):
"""Internal wrapper method for admin-only API calls"""
""" Internal wrapper method for admin-only API calls """
context = args[1]
if context.user.is_admin():
return target(*args, **kwargs)
@@ -63,27 +57,26 @@ def admin_only(target):
class AdminController(object):
"""
API Controller for users, node status, and worker mgmt.
API Controller for users, hosts, nodes, and workers.
Trivial admin_only wrapper will be replaced with RBAC,
allowing project managers to administer project users.
"""
def __init__(self, user_manager, node_manager=None):
def __init__(self, user_manager, host_manager):
self.user_manager = user_manager
self.node_manager = node_manager
self.host_manager = host_manager
def __str__(self):
return 'AdminController'
@admin_only
def describe_user(self, _context, name, **_kwargs):
"""Returns user data, including access and secret keys.
"""
""" Returns user data, including access and secret keys. """
return user_dict(self.user_manager.get_user(name))
@admin_only
def describe_users(self, _context, **_kwargs):
"""Returns all users - should be changed to deal with a list.
"""
""" Returns all users - should be changed to deal with a list. """
return {'userSet':
[user_dict(u) for u in self.user_manager.get_users()] }
@@ -116,7 +109,7 @@ class AdminController(object):
return user_dict(user, base64.b64encode(project.get_credentials(user)))
@admin_only
def describe_nodes(self, _context, **_kwargs):
def describe_hosts(self, _context, **_kwargs):
"""Returns status info for all nodes. Includes:
* Disk Space
* Instance List
@@ -125,11 +118,11 @@ class AdminController(object):
* DHCP servers running
* Iptables / bridges
"""
return {'nodeSet':
[node_dict(n) for n in self.node_manager.get_nodes()] }
return {'hostSet':
[host_dict(h) for h in self.host_manager.all()] }
@admin_only
def describe_node(self, _context, name, **_kwargs):
def describe_host(self, _context, name, **_kwargs):
"""Returns status info for single node.
"""
return node_dict(self.node_manager.get_node(name))
return host_dict(self.host_manager.lookup(name))
+213
View File
@@ -0,0 +1,213 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright [2010] [Anso Labs, LLC]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from nova import vendor
from twisted.internet import defer
from nova import exception
from nova import flags
from nova import test
from nova import utils
from nova.compute import model
from nova.compute import node
FLAGS = flags.FLAGS
class ModelTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(ModelTestCase, self).setUp()
self.flags(fake_libvirt=True,
fake_storage=True,
fake_users=True)
def tearDown(self):
model.Instance('i-test').destroy()
model.Host('testhost').destroy()
model.Worker('testhost', 'nova-testworker').destroy()
def create_instance(self):
inst = model.Instance('i-test')
inst['reservation_id'] = 'r-test'
inst['launch_time'] = '10'
inst['user_id'] = 'fake'
inst['project_id'] = 'fake'
inst['instance_type'] = 'm1.tiny'
inst['node_name'] = FLAGS.node_name
inst['mac_address'] = utils.generate_mac()
inst['ami_launch_index'] = 0
inst.save()
return inst
def create_host(self):
host = model.Host('testhost')
host.save()
return host
def create_worker(self):
worker = model.Worker('testhost', 'nova-testworker')
worker.save()
return worker
@defer.inlineCallbacks
def test_create_instance(self):
""" store with create_instace, then test that a load finds it """
instance = yield self.create_instance()
old = yield model.Instance(instance.identifier)
self.assertEqual(False, old.new_record())
@defer.inlineCallbacks
def test_delete_instance(self):
""" create, then destroy, then make sure loads a new record """
instance = yield self.create_instance()
yield instance.destroy()
newinst = yield model.Instance('i-test')
self.assertEqual(True, newinst.new_record())
@defer.inlineCallbacks
def test_instance_added_to_set(self):
""" create, then check that it is listed for the project """
instance = yield self.create_instance()
found = False
for x in model.InstanceDirectory().all:
if x.identifier == 'i-test':
found = True
self.assertEqual(True, found)
@defer.inlineCallbacks
def test_instance_associates_project(self):
""" create, then check that it is listed for the project """
instance = yield self.create_instance()
found = False
for x in model.InstanceDirectory().by_project(instance.project):
if x.identifier == 'i-test':
found = True
self.assertEqual(True, found)
@defer.inlineCallbacks
def test_host_class_finds_hosts(self):
host = yield self.create_host()
self.assertEqual('testhost', model.Host.lookup('testhost').identifier)
@defer.inlineCallbacks
def test_host_class_doesnt_find_missing_hosts(self):
rv = yield model.Host.lookup('woahnelly')
self.assertEqual(None, rv)
@defer.inlineCallbacks
def test_create_host(self):
""" store with create_host, then test that a load finds it """
host = yield self.create_host()
old = yield model.Host(host.identifier)
self.assertEqual(False, old.new_record())
@defer.inlineCallbacks
def test_delete_host(self):
""" create, then destroy, then make sure loads a new record """
instance = yield self.create_host()
yield instance.destroy()
newinst = yield model.Host('testhost')
self.assertEqual(True, newinst.new_record())
@defer.inlineCallbacks
def test_host_added_to_set(self):
""" create, then check that it is included in list """
instance = yield self.create_host()
found = False
for x in model.Host.all():
if x.identifier == 'testhost':
found = True
self.assertEqual(True, found)
@defer.inlineCallbacks
def test_create_worker_two_args(self):
""" create a worker with two arguments """
w = yield self.create_worker()
self.assertEqual(
False,
model.Worker('testhost', 'nova-testworker').new_record()
)
@defer.inlineCallbacks
def test_create_worker_single_arg(self):
""" Create a worker using the combined host:bin format """
w = yield model.Worker("testhost:nova-testworker")
w.save()
self.assertEqual(
False,
model.Worker('testhost:nova-testworker').new_record()
)
@defer.inlineCallbacks
def test_equality_of_worker_single_and_double_args(self):
""" Create a worker using the combined host:bin arg, find with 2 """
w = yield model.Worker("testhost:nova-testworker")
w.save()
self.assertEqual(
False,
model.Worker('testhost', 'nova-testworker').new_record()
)
@defer.inlineCallbacks
def test_equality_worker_of_double_and_single_args(self):
""" Create a worker using the combined host:bin arg, find with 2 """
w = yield self.create_worker()
self.assertEqual(
False,
model.Worker('testhost:nova-testworker').new_record()
)
@defer.inlineCallbacks
def test_delete_worker(self):
""" create, then destroy, then make sure loads a new record """
instance = yield self.create_worker()
yield instance.destroy()
newinst = yield model.Worker('testhost', 'nova-testworker')
self.assertEqual(True, newinst.new_record())
@defer.inlineCallbacks
def test_worker_heartbeat(self):
""" Create a worker, sleep, heartbeat, check for update """
w = yield self.create_worker()
ts = w['updated_at']
yield time.sleep(2)
w.heartbeat()
w2 = model.Worker('testhost', 'nova-testworker')
ts2 = w2['updated_at']
self.assertEqual(True, (ts2 > ts))
@defer.inlineCallbacks
def test_worker_added_to_set(self):
""" create, then check that it is included in list """
instance = yield self.create_worker()
found = False
for x in model.Worker.all():
if x.identifier == 'testhost:nova-testworker':
found = True
self.assertEqual(True, found)
@defer.inlineCallbacks
def test_worker_associates_host(self):
""" create, then check that it is listed for the host """
instance = yield self.create_worker()
found = False
for x in model.Worker.by_host('testhost'):
if x.identifier == 'testhost:nova-testworker':
found = True
self.assertEqual(True, found)
+6
View File
@@ -29,6 +29,7 @@ import os.path
import inspect
import subprocess
import random
import time
from nova import flags
@@ -114,3 +115,8 @@ def get_my_ip():
(addr, port) = csock.getsockname()
csock.close()
return addr
def timestamp(at=None):
if not at:
at = time.gmtime()
return time.strftime("%Y-%m-%dT%H:%M:%SZ", at)
+1
View File
@@ -61,6 +61,7 @@ from nova.tests.storage_unittest import *
from nova.tests.users_unittest import *
from nova.tests.datastore_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.model_unittest import *
FLAGS = flags.FLAGS