Adds initial support for XenAPI (not yet finished)

This commit is contained in:
Ewan Mellor
2010-07-29 00:07:43 +00:00
committed by Tarmac
27 changed files with 996 additions and 506 deletions
+1 -1
View File
@@ -76,7 +76,7 @@ def main():
FLAGS.fake_rabbit = True
FLAGS.redis_db = 8
FLAGS.network_size = 32
FLAGS.fake_libvirt=True
FLAGS.connection_type = 'fake'
FLAGS.fake_network=True
FLAGS.auth_driver='nova.auth.ldapdriver.FakeLdapDriver'
action = argv[1]
+2 -2
View File
@@ -18,10 +18,10 @@
Nova Fakes
==========
The :mod:`fakevirt` Module
The :mod:`virt.fake` Module
--------------------------
.. automodule:: nova.fakevirt
.. automodule:: nova.virt.fake
:members:
:undoc-members:
:show-inheritance:
+30
View File
@@ -0,0 +1,30 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The built-in instance properties.
"""
INSTANCE_TYPES = {}
INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0}
INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10}
INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10}
INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10}
INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10}
INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10}
+23 -104
View File
@@ -27,7 +27,6 @@ Instance Monitoring:
import boto
import boto.s3
import datetime
import libxml2
import logging
import os
import rrdtool
@@ -37,12 +36,8 @@ from twisted.internet import defer
from twisted.internet import task
from twisted.application import service
try:
import libvirt
except Exception, err:
logging.warning('no libvirt found')
from nova import flags
from nova.virt import connection as virt_connection
FLAGS = flags.FLAGS
@@ -130,83 +125,6 @@ def init_rrd(instance, name):
*RRD_VALUES[name]
)
def get_disks(domain):
"""
Returns a list of all block devices for this domain.
"""
# TODO(devcamcar): Replace libxml2 with etree.
xml = domain.XMLDesc(0)
doc = None
try:
doc = libxml2.parseDoc(xml)
except:
return []
ctx = doc.xpathNewContext()
disks = []
try:
ret = ctx.xpathEval('/domain/devices/disk')
for node in ret:
devdst = None
for child in node.children:
if child.name == 'target':
devdst = child.prop('dev')
if devdst == None:
continue
disks.append(devdst)
finally:
if ctx != None:
ctx.xpathFreeContext()
if doc != None:
doc.freeDoc()
return disks
def get_interfaces(domain):
"""
Returns a list of all network interfaces for this instance.
"""
# TODO(devcamcar): Replace libxml2 with etree.
xml = domain.XMLDesc(0)
doc = None
try:
doc = libxml2.parseDoc(xml)
except:
return []
ctx = doc.xpathNewContext()
interfaces = []
try:
ret = ctx.xpathEval('/domain/devices/interface')
for node in ret:
devdst = None
for child in node.children:
if child.name == 'target':
devdst = child.prop('dev')
if devdst == None:
continue
interfaces.append(devdst)
finally:
if ctx != None:
ctx.xpathFreeContext()
if doc != None:
doc.freeDoc()
return interfaces
def graph_cpu(instance, duration):
"""
Creates a graph of cpu usage for the specified instance and duration.
@@ -317,10 +235,9 @@ def store_graph(instance_id, filename):
class Instance(object):
def __init__(self, conn, domain):
def __init__(self, conn, instance_id):
self.conn = conn
self.domain = domain
self.instance_id = domain.name()
self.instance_id = instance_id
self.last_updated = datetime.datetime.min
self.cputime = 0
self.cputime_last_updated = None
@@ -385,14 +302,14 @@ class Instance(object):
"""
Returns cpu usage statistics for this instance.
"""
info = self.domain.info()
info = self.conn.get_info(self.instance_id)
# Get the previous values.
cputime_last = self.cputime
cputime_last_updated = self.cputime_last_updated
# Get the raw CPU time used in nanoseconds.
self.cputime = float(info[4])
self.cputime = float(info['cpu_time'])
self.cputime_last_updated = utcnow()
logging.debug('CPU: %d', self.cputime)
@@ -413,8 +330,8 @@ class Instance(object):
logging.debug('cputime_delta = %s', cputime_delta)
# Get the number of virtual cpus in this domain.
vcpus = int(info[3])
vcpus = int(info['num_cpu'])
logging.debug('vcpus = %d', vcpus)
# Calculate CPU % used and cap at 100.
@@ -427,14 +344,13 @@ class Instance(object):
rd = 0
wr = 0
# Get a list of block devices for this instance.
disks = get_disks(self.domain)
disks = self.conn.get_disks(self.instance_id)
# Aggregate the read and write totals.
for disk in disks:
try:
rd_req, rd_bytes, wr_req, wr_bytes, errs = \
self.domain.blockStats(disk)
self.conn.block_stats(self.instance_id, disk)
rd += rd_bytes
wr += wr_bytes
except TypeError:
@@ -451,13 +367,12 @@ class Instance(object):
rx = 0
tx = 0
# Get a list of all network interfaces for this instance.
interfaces = get_interfaces(self.domain)
interfaces = self.conn.get_interfaces(self.instance_id)
# Aggregate the in and out totals.
for interface in interfaces:
try:
stats = self.domain.interfaceStats(interface)
stats = self.conn.interface_stats(self.instance_id, interface)
rx += stats[0]
tx += stats[4]
except TypeError:
@@ -493,20 +408,24 @@ class InstanceMonitor(object, service.Service):
Update resource usage for all running instances.
"""
try:
conn = libvirt.openReadOnly(None)
except libvirt.libvirtError:
logging.exception('unexpected libvirt error')
conn = virt_connection.get_connection(read_only=True)
except Exception, exn:
logging.exception('unexpected exception getting connection')
time.sleep(FLAGS.monitoring_instances_delay)
return
domain_ids = conn.listDomainsID()
domain_ids = conn.list_instances()
try:
self.updateInstances_(conn, domain_ids)
except Exception, exn:
logging.exception('updateInstances_')
def updateInstances_(self, conn, domain_ids):
for domain_id in domain_ids:
if not domain_id in self._instances:
domain = conn.lookupByID(domain_id)
instance = Instance(conn, domain)
instance = Instance(conn, domain_id)
self._instances[domain_id] = instance
logging.debug('Found instance: %s', instance.instance_id)
logging.debug('Found instance: %s', domain_id)
for key in self._instances.keys():
instance = self._instances[key]
+41
View File
@@ -0,0 +1,41 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The various power states that a VM can be in."""
NOSTATE = 0x00
RUNNING = 0x01
BLOCKED = 0x02
PAUSED = 0x03
SHUTDOWN = 0x04
SHUTOFF = 0x05
CRASHED = 0x06
def name(code):
d = {
NOSTATE : 'pending',
RUNNING : 'running',
BLOCKED : 'blocked',
PAUSED : 'paused',
SHUTDOWN: 'shutdown',
SHUTOFF : 'shutdown',
CRASHED : 'crashed',
}
return d[code]
+29 -268
View File
@@ -20,95 +20,50 @@
Compute Service:
Runs on each compute host, managing the
hypervisor using libvirt.
hypervisor using the virt module.
"""
import base64
import boto.utils
import json
import logging
import os
import shutil
import sys
import time
from twisted.internet import defer
from twisted.internet import task
try:
import libvirt
except Exception, err:
logging.warning('no libvirt found')
from nova import exception
from nova import fakevirt
from nova import flags
from nova import process
from nova import service
from nova import utils
from nova.auth import signer, manager
from nova.compute import disk
from nova.compute import model
from nova.compute import network
from nova.compute import power_state
from nova.compute.instance_types import INSTANCE_TYPES
from nova.objectstore import image # for image_path flag
from nova.virt import connection as virt_connection
from nova.volume import service as volume_service
FLAGS = flags.FLAGS
flags.DEFINE_string('libvirt_xml_template',
utils.abspath('compute/libvirt.xml.template'),
'Libvirt XML Template')
flags.DEFINE_bool('use_s3', True,
'whether to get images from s3 or use local copy')
flags.DEFINE_string('instances_path', utils.abspath('../instances'),
'where instances are stored on disk')
INSTANCE_TYPES = {}
INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0}
INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10}
INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10}
INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10}
INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10}
INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10}
def _image_path(path=''):
return os.path.join(FLAGS.images_path, path)
def _image_url(path):
return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path)
class ComputeService(service.Service):
"""
Manages the running instances.
"""
def __init__(self):
""" load configuration options for this node and connect to libvirt """
""" load configuration options for this node and connect to the hypervisor"""
super(ComputeService, self).__init__()
self._instances = {}
self._conn = self._get_connection()
self._conn = virt_connection.get_connection()
self.instdir = model.InstanceDirectory()
# TODO(joshua): This needs to ensure system state, specifically: modprobe aoe
def _get_connection(self):
""" returns a libvirt connection object """
# TODO(termie): maybe lazy load after initial check for permissions
# TODO(termie): check whether we can be disconnected
if FLAGS.fake_libvirt:
conn = fakevirt.FakeVirtConnection.instance()
else:
auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT],
'root',
None]
conn = libvirt.openAuth('qemu:///system', auth, 0)
if conn == None:
logging.error('Failed to open connection to the hypervisor')
sys.exit(1)
return conn
def noop(self):
""" simple test of an AMQP message call """
return defer.succeed('PONG')
@@ -124,8 +79,7 @@ class ComputeService(service.Service):
def adopt_instances(self):
""" if there are instances already running, adopt them """
return defer.succeed(0)
instance_names = [self._conn.lookupByID(x).name()
for x in self._conn.listDomainsID()]
instance_names = self._conn.list_instances()
for name in instance_names:
try:
new_inst = Instance.fromName(self._conn, name)
@@ -158,7 +112,7 @@ class ComputeService(service.Service):
logging.exception("model server went away")
yield
# @exception.wrap_exception
@exception.wrap_exception
def run_instance(self, instance_id, **_kwargs):
""" launch a new instance with specified options """
logging.debug("Starting instance %s..." % (instance_id))
@@ -176,8 +130,7 @@ class ComputeService(service.Service):
logging.info("Instances current state is %s", new_inst.state)
if new_inst.is_running():
raise exception.Error("Instance is already running")
d = new_inst.spawn()
return d
new_inst.spawn()
@exception.wrap_exception
def terminate_instance(self, instance_id):
@@ -312,20 +265,6 @@ class Instance(object):
self.datamodel.save()
logging.debug("Finished init of Instance with id of %s" % name)
def toXml(self):
# TODO(termie): cache?
logging.debug("Starting the toXML method")
libvirt_xml = open(FLAGS.libvirt_xml_template).read()
xml_info = self.datamodel.copy()
# TODO(joshua): Make this xml express the attached disks as well
# TODO(termie): lazy lazy hack because xml is annoying
xml_info['nova'] = json.dumps(self.datamodel.copy())
libvirt_xml = libvirt_xml % xml_info
logging.debug("Finished the toXML method")
return libvirt_xml
@classmethod
def fromName(cls, conn, name):
""" use the saved data for reloading the instance """
@@ -336,7 +275,7 @@ class Instance(object):
def set_state(self, state_code, state_description=None):
self.datamodel['state'] = state_code
if not state_description:
state_description = STATE_NAMES[state_code]
state_description = power_state.name(state_code)
self.datamodel['state_description'] = state_description
self.datamodel.save()
@@ -350,37 +289,29 @@ class Instance(object):
return self.datamodel['name']
def is_pending(self):
return (self.state == Instance.NOSTATE or self.state == 'pending')
return (self.state == power_state.NOSTATE or self.state == 'pending')
def is_destroyed(self):
return self.state == Instance.SHUTOFF
return self.state == power_state.SHUTOFF
def is_running(self):
logging.debug("Instance state is: %s" % self.state)
return (self.state == Instance.RUNNING or self.state == 'running')
return (self.state == power_state.RUNNING or self.state == 'running')
def describe(self):
return self.datamodel
def info(self):
logging.debug("Getting info for dom %s" % self.name)
virt_dom = self._conn.lookupByName(self.name)
(state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info()
return {'state': state,
'max_mem': max_mem,
'mem': mem,
'num_cpu': num_cpu,
'cpu_time': cpu_time,
'node_name': FLAGS.node_name}
def basepath(self, path=''):
return os.path.abspath(os.path.join(self.datamodel['basepath'], path))
result = self._conn.get_info(self.name)
result['node_name'] = FLAGS.node_name
return result
def update_state(self):
self.datamodel.update(self.info())
self.set_state(self.state)
self.datamodel.save() # Extra, but harmless
@defer.inlineCallbacks
@exception.wrap_exception
def destroy(self):
if self.is_destroyed():
@@ -388,38 +319,9 @@ class Instance(object):
raise exception.Error('trying to destroy already destroyed'
' instance: %s' % self.name)
self.set_state(Instance.NOSTATE, 'shutting_down')
try:
virt_dom = self._conn.lookupByName(self.name)
virt_dom.destroy()
except Exception, _err:
pass
# If the instance is already terminated, we're still happy
d = defer.Deferred()
d.addCallback(lambda x: self._cleanup())
d.addCallback(lambda x: self.datamodel.destroy())
# TODO(termie): short-circuit me for tests
# WE'LL save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
timer = task.LoopingCall(f=None)
def _wait_for_shutdown():
try:
self.update_state()
if self.state == Instance.SHUTDOWN:
timer.stop()
d.callback(None)
except Exception:
self.set_state(Instance.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_shutdown
timer.start(interval=0.5, now=True)
return d
def _cleanup(self):
target = os.path.abspath(self.datamodel['basepath'])
logging.info("Deleting instance files at %s", target)
shutil.rmtree(target)
self.set_state(power_state.NOSTATE, 'shutting_down')
yield self._conn.destroy(self)
self.datamodel.destroy()
@defer.inlineCallbacks
@exception.wrap_exception
@@ -430,157 +332,26 @@ class Instance(object):
'instance: %s (state: %s)' % (self.name, self.state))
logging.debug('rebooting instance %s' % self.name)
self.set_state(Instance.NOSTATE, 'rebooting')
yield self._conn.lookupByName(self.name).destroy()
self._conn.createXML(self.toXml(), 0)
d = defer.Deferred()
timer = task.LoopingCall(f=None)
def _wait_for_reboot():
try:
self.update_state()
if self.is_running():
logging.debug('rebooted instance %s' % self.name)
timer.stop()
d.callback(None)
except Exception:
self.set_state(Instance.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_reboot
timer.start(interval=0.5, now=True)
yield d
def _fetch_s3_image(self, image, path):
url = _image_url('%s/image' % image)
# This should probably move somewhere else, like e.g. a download_as
# method on User objects and at the same time get rewritten to use
# twisted web client.
headers = {}
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
user_id = self.datamodel['user_id']
user = manager.AuthManager().get_user(user_id)
uri = '/' + url.partition('/')[2]
auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', uri)
headers['Authorization'] = 'AWS %s:%s' % (user.access, auth)
cmd = ['/usr/bin/curl', '--silent', url]
for (k,v) in headers.iteritems():
cmd += ['-H', '%s: %s' % (k,v)]
cmd += ['-o', path]
return process.SharedPool().execute(executable=cmd[0], args=cmd[1:])
def _fetch_local_image(self, image, path):
source = _image_path('%s/image' % image)
d = process.simple_execute('cp %s %s' % (source, path))
return d
@defer.inlineCallbacks
def _create_image(self, libvirt_xml):
# syntactic nicety
data = self.datamodel
basepath = self.basepath
# ensure directories exist and are writable
yield process.simple_execute(
'mkdir -p %s' % basepath())
yield process.simple_execute(
'chmod 0777 %s' % basepath())
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
logging.info('Creating image for: %s', data['instance_id'])
f = open(basepath('libvirt.xml'), 'w')
f.write(libvirt_xml)
f.close()
if FLAGS.fake_libvirt:
logging.info('fake_libvirt, nothing to do for create_image')
raise defer.returnValue(None);
if FLAGS.use_s3:
_fetch_file = self._fetch_s3_image
else:
_fetch_file = self._fetch_local_image
if not os.path.exists(basepath('disk')):
yield _fetch_file(data['image_id'], basepath('disk-raw'))
if not os.path.exists(basepath('kernel')):
yield _fetch_file(data['kernel_id'], basepath('kernel'))
if not os.path.exists(basepath('ramdisk')):
yield _fetch_file(data['ramdisk_id'], basepath('ramdisk'))
execute = lambda cmd, input=None: \
process.simple_execute(cmd=cmd,
input=input,
error_ok=1)
key = data['key_data']
net = None
if FLAGS.simple_network:
with open(FLAGS.simple_network_template) as f:
net = f.read() % {'address': data['private_dns_name'],
'network': FLAGS.simple_network_network,
'netmask': FLAGS.simple_network_netmask,
'gateway': FLAGS.simple_network_gateway,
'broadcast': FLAGS.simple_network_broadcast,
'dns': FLAGS.simple_network_dns}
if key or net:
logging.info('Injecting data into image %s', data['image_id'])
yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute)
if os.path.exists(basepath('disk')):
yield process.simple_execute(
'rm -f %s' % basepath('disk'))
bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb']
* 1024 * 1024 * 1024)
yield disk.partition(
basepath('disk-raw'), basepath('disk'), bytes, execute=execute)
self.set_state(power_state.NOSTATE, 'rebooting')
yield self._conn.reboot(self)
self.update_state()
@defer.inlineCallbacks
@exception.wrap_exception
def spawn(self):
self.set_state(Instance.NOSTATE, 'spawning')
self.set_state(power_state.NOSTATE, 'spawning')
logging.debug("Starting spawn in Instance")
xml = self.toXml()
self.set_state(Instance.NOSTATE, 'launching')
logging.info('self %s', self)
try:
yield self._create_image(xml)
self._conn.createXML(xml, 0)
# TODO(termie): this should actually register
# a callback to check for successful boot
logging.debug("Instance is running")
local_d = defer.Deferred()
timer = task.LoopingCall(f=None)
def _wait_for_boot():
try:
self.update_state()
if self.is_running():
logging.debug('booted instance %s' % self.name)
timer.stop()
local_d.callback(None)
except Exception:
self.set_state(Instance.SHUTDOWN)
logging.error('Failed to boot instance %s' % self.name)
timer.stop()
local_d.callback(None)
timer.f = _wait_for_boot
timer.start(interval=0.5, now=True)
yield self._conn.spawn(self)
except Exception, ex:
logging.debug(ex)
self.set_state(Instance.SHUTDOWN)
self.set_state(power_state.SHUTDOWN)
self.update_state()
@exception.wrap_exception
def console_output(self):
if not FLAGS.fake_libvirt:
# FIXME: Abstract this for Xen
if FLAGS.connection_type == 'libvirt':
fname = os.path.abspath(
os.path.join(self.datamodel['basepath'], 'console.log'))
with open(fname, 'r') as f:
@@ -588,13 +359,3 @@ class Instance(object):
else:
console = 'FAKE CONSOLE OUTPUT'
return defer.succeed(console)
STATE_NAMES = {
Instance.NOSTATE : 'pending',
Instance.RUNNING : 'running',
Instance.BLOCKED : 'blocked',
Instance.PAUSED : 'paused',
Instance.SHUTDOWN : 'shutdown',
Instance.SHUTOFF : 'shutdown',
Instance.CRASHED : 'crashed',
}
+2 -1
View File
@@ -37,6 +37,7 @@ from nova.auth import rbac
from nova.auth import manager
from nova.compute import model
from nova.compute import network
from nova.compute.instance_types import INSTANCE_TYPES
from nova.compute import service as compute_service
from nova.endpoint import images
from nova.volume import service as volume_service
@@ -102,7 +103,7 @@ class CloudController(object):
result = {}
for instance in self.instdir.all:
if instance['project_id'] == project_id:
line = '%s slots=%d' % (instance['private_dns_name'], compute_service.INSTANCE_TYPES[instance['instance_type']]['vcpus'])
line = '%s slots=%d' % (instance['private_dns_name'], INSTANCE_TYPES[instance['instance_type']]['vcpus'])
if instance['key_name'] in result:
result[instance['key_name']].append(line)
else:
-112
View File
@@ -1,112 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A fake (in-memory) hypervisor+api. Allows nova testing w/o KVM and libvirt.
"""
import StringIO
from xml.etree import ElementTree
class FakeVirtConnection(object):
# FIXME: networkCreateXML, listNetworks don't do anything since
# they aren't exercised in tests yet
def __init__(self):
self.next_index = 0
self.instances = {}
@classmethod
def instance(cls):
if not hasattr(cls, '_instance'):
cls._instance = cls()
return cls._instance
def lookupByID(self, i):
return self.instances[str(i)]
def listDomainsID(self):
return self.instances.keys()
def listNetworks(self):
return []
def lookupByName(self, instance_id):
for x in self.instances.values():
if x.name() == instance_id:
return x
raise Exception('no instance found for instance_id: %s' % instance_id)
def networkCreateXML(self, xml):
pass
def createXML(self, xml, flags):
# parse the xml :(
xml_stringio = StringIO.StringIO(xml)
my_xml = ElementTree.parse(xml_stringio)
name = my_xml.find('name').text
fake_instance = FakeVirtInstance(conn=self,
index=str(self.next_index),
name=name,
xml=my_xml)
self.instances[str(self.next_index)] = fake_instance
self.next_index += 1
def _removeInstance(self, i):
self.instances.pop(str(i))
class FakeVirtInstance(object):
NOSTATE = 0x00
RUNNING = 0x01
BLOCKED = 0x02
PAUSED = 0x03
SHUTDOWN = 0x04
SHUTOFF = 0x05
CRASHED = 0x06
def __init__(self, conn, index, name, xml):
self._conn = conn
self._destroyed = False
self._name = name
self._index = index
self._state = self.RUNNING
def name(self):
return self._name
def destroy(self):
if self._state == self.SHUTOFF:
raise Exception('instance already destroyed: %s' % self.name())
self._state = self.SHUTDOWN
self._conn._removeInstance(self._index)
def info(self):
return [self._state, 0, 2, 0, 0]
def XMLDesc(self, flags):
return open('fakevirtinstance.xml', 'r').read()
def blockStats(self, disk):
return [0L, 0L, 0L, 0L, null]
def interfaceStats(self, iface):
return [0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L]
+1 -2
View File
@@ -36,6 +36,7 @@ DEFINE_bool = DEFINE_bool
# Define any app-specific flags in their own files, docs at:
# http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39
DEFINE_string('connection_type', 'libvirt', 'libvirt, xenapi or fake')
DEFINE_integer('s3_port', 3333, 's3 port')
DEFINE_string('s3_host', '127.0.0.1', 's3 host')
#DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on')
@@ -43,8 +44,6 @@ DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on')
DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on')
DEFINE_string('network_topic', 'network', 'the topic network nodes listen on')
DEFINE_bool('fake_libvirt', False,
'whether to use a fake libvirt or not')
DEFINE_bool('verbose', False, 'show debug output')
DEFINE_boolean('fake_rabbit', False, 'use a fake rabbit')
DEFINE_bool('fake_network', False, 'should we use fake network devices and addresses')
+18 -4
View File
@@ -120,7 +120,21 @@ def get_context(request):
logging.debug("Authentication Failure: %s" % ex)
raise exception.NotAuthorized
class S3(Resource):
class ErrorHandlingResource(Resource):
"""Maps exceptions to 404 / 401 codes. Won't work for exceptions thrown after NOT_DONE_YET is returned."""
# TODO(unassigned) (calling-all-twisted-experts): This needs to be plugged in to the right place in twisted...
# This doesn't look like it's the right place (consider exceptions in getChild; or after NOT_DONE_YET is returned
def render(self, request):
try:
return Resource.render(self, request)
except exception.NotFound:
request.setResponseCode(404)
return ''
except exception.NotAuthorized:
request.setResponseCode(403)
return ''
class S3(ErrorHandlingResource):
"""Implementation of an S3-like storage server based on local files."""
def getChild(self, name, request):
request.context = get_context(request)
@@ -140,7 +154,7 @@ class S3(Resource):
}})
return server.NOT_DONE_YET
class BucketResource(Resource):
class BucketResource(ErrorHandlingResource):
def __init__(self, name):
Resource.__init__(self)
self.name = name
@@ -190,7 +204,7 @@ class BucketResource(Resource):
return ''
class ObjectResource(Resource):
class ObjectResource(ErrorHandlingResource):
def __init__(self, bucket, name):
Resource.__init__(self)
self.bucket = bucket
@@ -231,7 +245,7 @@ class ObjectResource(Resource):
request.setResponseCode(204)
return ''
class ImageResource(Resource):
class ImageResource(ErrorHandlingResource):
isLeaf = True
def __init__(self, name):
+1 -1
View File
@@ -33,7 +33,7 @@ class Context(object):
class AccessTestCase(test.BaseTestCase):
def setUp(self):
super(AccessTestCase, self).setUp()
FLAGS.fake_libvirt = True
FLAGS.connection_type = 'fake'
FLAGS.fake_storage = True
um = manager.AuthManager()
# Make test users
+1 -1
View File
@@ -35,7 +35,7 @@ class AuthTestCase(test.BaseTestCase):
flush_db = False
def setUp(self):
super(AuthTestCase, self).setUp()
self.flags(fake_libvirt=True,
self.flags(connection_type='fake',
fake_storage=True)
self.manager = manager.AuthManager()
+4 -4
View File
@@ -39,7 +39,7 @@ FLAGS = flags.FLAGS
class CloudTestCase(test.BaseTestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
self.flags(fake_libvirt=True,
self.flags(connection_type='fake',
fake_storage=True)
self.conn = rpc.Connection.instance()
@@ -71,7 +71,7 @@ class CloudTestCase(test.BaseTestCase):
manager.AuthManager().delete_user('admin')
def test_console_output(self):
if FLAGS.fake_libvirt:
if FLAGS.connection_type == 'fake':
logging.debug("Can't test instances without a real virtual env.")
return
instance_id = 'foo'
@@ -82,7 +82,7 @@ class CloudTestCase(test.BaseTestCase):
rv = yield self.compute.terminate_instance(instance_id)
def test_run_instances(self):
if FLAGS.fake_libvirt:
if FLAGS.connection_type == 'fake':
logging.debug("Can't test instances without a real virtual env.")
return
image_id = FLAGS.default_image
@@ -103,7 +103,7 @@ class CloudTestCase(test.BaseTestCase):
break
self.assert_(rv)
if not FLAGS.fake_libvirt:
if connection_type != 'fake':
time.sleep(45) # Should use boto for polling here
for reservations in rv['reservationSet']:
# for res_id in reservations.keys():
+1 -1
View File
@@ -57,7 +57,7 @@ class ComputeConnectionTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeConnectionTestCase, self).setUp()
self.flags(fake_libvirt=True,
self.flags(connection_type='fake',
fake_storage=True)
self.compute = service.ComputeService()
+1 -1
View File
@@ -20,7 +20,7 @@ from nova import flags
FLAGS = flags.FLAGS
FLAGS.fake_libvirt = True
FLAGS.connection_type = 'fake'
FLAGS.fake_storage = True
FLAGS.fake_rabbit = True
FLAGS.fake_network = True
+1 -1
View File
@@ -34,7 +34,7 @@ FLAGS = flags.FLAGS
class ModelTestCase(test.TrialTestCase):
def setUp(self):
super(ModelTestCase, self).setUp()
self.flags(fake_libvirt=True,
self.flags(connection_type='fake',
fake_storage=True)
def tearDown(self):
+1 -1
View File
@@ -34,7 +34,7 @@ class NetworkTestCase(test.TrialTestCase):
super(NetworkTestCase, self).setUp()
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(fake_libvirt=True,
self.flags(connection_type='fake',
fake_storage=True,
fake_network=True,
auth_driver='nova.auth.ldapdriver.FakeLdapDriver',
+2
View File
@@ -26,6 +26,8 @@ import tempfile
from nova import flags
from nova import objectstore
from nova.objectstore import bucket # for buckets_path flag
from nova.objectstore import image # for images_path flag
from nova import test
from nova.auth import manager
from nova.objectstore.handler import S3
+1 -1
View File
@@ -20,7 +20,7 @@ from nova import flags
FLAGS = flags.FLAGS
FLAGS.fake_libvirt = False
FLAGS.connection_type = 'libvirt'
FLAGS.fake_storage = False
FLAGS.fake_rabbit = False
FLAGS.fake_network = False
+115
View File
@@ -0,0 +1,115 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from nova import exception
from nova import flags
from nova import test
from nova.compute import node
from nova.volume import storage
FLAGS = flags.FLAGS
class StorageTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(StorageTestCase, self).setUp()
self.mynode = node.Node()
self.mystorage = None
self.flags(connection_type='fake',
fake_storage=True)
self.mystorage = storage.BlockStore()
def test_run_create_volume(self):
vol_size = '0'
user_id = 'fake'
project_id = 'fake'
volume_id = self.mystorage.create_volume(vol_size, user_id, project_id)
# TODO(termie): get_volume returns differently than create_volume
self.assertEqual(volume_id,
storage.get_volume(volume_id)['volume_id'])
rv = self.mystorage.delete_volume(volume_id)
self.assertRaises(exception.Error,
storage.get_volume,
volume_id)
def test_too_big_volume(self):
vol_size = '1001'
user_id = 'fake'
project_id = 'fake'
self.assertRaises(TypeError,
self.mystorage.create_volume,
vol_size, user_id, project_id)
def test_too_many_volumes(self):
vol_size = '1'
user_id = 'fake'
project_id = 'fake'
num_shelves = FLAGS.last_shelf_id - FLAGS.first_shelf_id + 1
total_slots = FLAGS.slots_per_shelf * num_shelves
vols = []
for i in xrange(total_slots):
vid = self.mystorage.create_volume(vol_size, user_id, project_id)
vols.append(vid)
self.assertRaises(storage.NoMoreVolumes,
self.mystorage.create_volume,
vol_size, user_id, project_id)
for id in vols:
self.mystorage.delete_volume(id)
def test_run_attach_detach_volume(self):
# Create one volume and one node to test with
instance_id = "storage-test"
vol_size = "5"
user_id = "fake"
project_id = 'fake'
mountpoint = "/dev/sdf"
volume_id = self.mystorage.create_volume(vol_size, user_id, project_id)
volume_obj = storage.get_volume(volume_id)
volume_obj.start_attach(instance_id, mountpoint)
rv = yield self.mynode.attach_volume(volume_id,
instance_id,
mountpoint)
self.assertEqual(volume_obj['status'], "in-use")
self.assertEqual(volume_obj['attachStatus'], "attached")
self.assertEqual(volume_obj['instance_id'], instance_id)
self.assertEqual(volume_obj['mountpoint'], mountpoint)
self.assertRaises(exception.Error,
self.mystorage.delete_volume,
volume_id)
rv = yield self.mystorage.detach_volume(volume_id)
volume_obj = storage.get_volume(volume_id)
self.assertEqual(volume_obj['status'], "available")
rv = self.mystorage.delete_volume(volume_id)
self.assertRaises(exception.Error,
storage.get_volume,
volume_id)
def test_multi_node(self):
# TODO(termie): Figure out how to test with two nodes,
# each of them having a different FLAG for storage_node
# This will allow us to test cross-node interactions
pass
+1 -1
View File
@@ -34,7 +34,7 @@ class VolumeTestCase(test.TrialTestCase):
super(VolumeTestCase, self).setUp()
self.compute = compute.service.ComputeService()
self.volume = None
self.flags(fake_libvirt=True,
self.flags(connection_type='fake',
fake_storage=True)
self.volume = volume_service.VolumeService()
+15
View File
@@ -0,0 +1,15 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+45
View File
@@ -0,0 +1,45 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
from nova.virt import xenapi
FLAGS = flags.FLAGS
def get_connection(read_only=False):
# TODO(termie): maybe lazy load after initial check for permissions
# TODO(termie): check whether we can be disconnected
t = FLAGS.connection_type
if t == 'fake':
conn = fake.get_connection(read_only)
elif t == 'libvirt':
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
conn = xenapi.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
if conn is None:
logging.error('Failed to open connection to the hypervisor')
sys.exit(1)
return conn
+81
View File
@@ -0,0 +1,81 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A fake (in-memory) hypervisor+api. Allows nova testing w/o a hypervisor.
"""
import logging
from nova.compute import power_state
def get_connection(_):
# The read_only parameter is ignored.
return FakeConnection.instance()
class FakeConnection(object):
def __init__(self):
self.instances = {}
@classmethod
def instance(cls):
if not hasattr(cls, '_instance'):
cls._instance = cls()
return cls._instance
def list_instances(self):
return self.instances.keys()
def spawn(self, instance):
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
def reboot(self, instance):
pass
def destroy(self, instance):
del self.instances[instance.name]
def get_info(self, instance_id):
i = self.instances[instance_id]
return {'state': i._state,
'max_mem': 0,
'mem': 0,
'num_cpu': 2,
'cpu_time': 0}
def list_disks(self, instance_id):
return ['A_DISK']
def list_interfaces(self, instance_id):
return ['A_VIF']
def block_stats(self, instance_id, disk_id):
return [0L, 0L, 0L, 0L, null]
def interface_stats(self, instance_id, iface_id):
return [0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L]
class FakeInstance(object):
def __init__(self):
self._state = power_state.NOSTATE
+72
View File
@@ -0,0 +1,72 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handling of VM disk images.
"""
import os.path
import time
from nova import flags
from nova import process
from nova.auth import signer
FLAGS = flags.FLAGS
flags.DEFINE_bool('use_s3', True,
'whether to get images from s3 or use local copy')
def fetch(image, path, user):
if FLAGS.use_s3:
f = _fetch_s3_image
else:
f = _fetch_local_image
return f(image, path, user)
def _fetch_s3_image(image, path, user):
url = _image_url('%s/image' % image)
# This should probably move somewhere else, like e.g. a download_as
# method on User objects and at the same time get rewritten to use
# twisted web client.
headers = {}
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
uri = '/' + url.partition('/')[2]
auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', uri)
headers['Authorization'] = 'AWS %s:%s' % (user.access, auth)
cmd = ['/usr/bin/curl', '--silent', url]
for (k,v) in headers.iteritems():
cmd += ['-H', '%s: %s' % (k,v)]
cmd += ['-o', path]
return process.SharedPool().execute(executable=cmd[0], args=cmd[1:])
def _fetch_local_image(image, path, _):
source = _image_path('%s/image' % image)
return process.simple_execute('cp %s %s' % (source, path))
def _image_path(path):
return os.path.join(FLAGS.images_path, path)
def _image_url(path):
return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path)
+355
View File
@@ -0,0 +1,355 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A connection to a hypervisor (e.g. KVM) through libvirt.
"""
import json
import logging
import os.path
import shutil
import sys
from twisted.internet import defer
from twisted.internet import task
from nova import exception
from nova import flags
from nova import process
from nova import utils
from nova.auth import manager
from nova.compute import disk
from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import images
libvirt = None
libxml2 = None
FLAGS = flags.FLAGS
flags.DEFINE_string('libvirt_xml_template',
utils.abspath('compute/libvirt.xml.template'),
'Libvirt XML Template')
def get_connection(read_only):
# These are loaded late so that there's no need to install these
# libraries when not using libvirt.
global libvirt
global libxml2
if libvirt is None:
libvirt = __import__('libvirt')
if libxml2 is None:
libxml2 = __import__('libxml2')
return LibvirtConnection(read_only)
class LibvirtConnection(object):
def __init__(self, read_only):
auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT],
'root',
None]
if read_only:
self._conn = libvirt.openReadOnly('qemu:///system')
else:
self._conn = libvirt.openAuth('qemu:///system', auth, 0)
def list_instances(self):
return [self._conn.lookupByID(x).name()
for x in self._conn.listDomainsID()]
def destroy(self, instance):
try:
virt_dom = self._conn.lookupByName(instance.name)
virt_dom.destroy()
except Exception, _err:
pass
# If the instance is already terminated, we're still happy
d = defer.Deferred()
d.addCallback(lambda _: self._cleanup(instance))
# FIXME: What does this comment mean?
# TODO(termie): short-circuit me for tests
# WE'LL save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
timer = task.LoopingCall(f=None)
def _wait_for_shutdown():
try:
instance.update_state()
if instance.state == power_state.SHUTDOWN:
timer.stop()
d.callback(None)
except Exception:
instance.set_state(power_state.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_shutdown
timer.start(interval=0.5, now=True)
return d
def _cleanup(self, instance):
target = os.path.abspath(instance.datamodel['basepath'])
logging.info("Deleting instance files at %s", target)
shutil.rmtree(target)
@defer.inlineCallbacks
@exception.wrap_exception
def reboot(self, instance):
xml = self.toXml(instance)
yield self._conn.lookupByName(instance.name).destroy()
yield self._conn.createXML(xml, 0)
d = defer.Deferred()
timer = task.LoopingCall(f=None)
def _wait_for_reboot():
try:
instance.update_state()
if instance.is_running():
logging.debug('rebooted instance %s' % instance.name)
timer.stop()
d.callback(None)
except Exception, exn:
logging.error('_wait_for_reboot failed: %s' % exn)
instance.set_state(power_state.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_reboot
timer.start(interval=0.5, now=True)
yield d
@defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
xml = self.toXml(instance)
instance.set_state(power_state.NOSTATE, 'launching')
yield self._create_image(instance, xml)
yield self._conn.createXML(xml, 0)
# TODO(termie): this should actually register
# a callback to check for successful boot
logging.debug("Instance is running")
local_d = defer.Deferred()
timer = task.LoopingCall(f=None)
def _wait_for_boot():
try:
instance.update_state()
if instance.is_running():
logging.debug('booted instance %s' % instance.name)
timer.stop()
local_d.callback(None)
except Exception, exn:
logging.error("_wait_for_boot exception %s" % exn)
self.set_state(power_state.SHUTDOWN)
logging.error('Failed to boot instance %s' % instance.name)
timer.stop()
local_d.callback(None)
timer.f = _wait_for_boot
timer.start(interval=0.5, now=True)
yield local_d
@defer.inlineCallbacks
def _create_image(self, instance, libvirt_xml):
# syntactic nicety
data = instance.datamodel
basepath = lambda x='': self.basepath(instance, x)
# ensure directories exist and are writable
yield process.simple_execute('mkdir -p %s' % basepath())
yield process.simple_execute('chmod 0777 %s' % basepath())
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
logging.info('Creating image for: %s', data['instance_id'])
f = open(basepath('libvirt.xml'), 'w')
f.write(libvirt_xml)
f.close()
user = manager.AuthManager().get_user(data['user_id'])
if not os.path.exists(basepath('disk')):
yield images.fetch(data['image_id'], basepath('disk-raw'), user)
if not os.path.exists(basepath('kernel')):
yield images.fetch(data['kernel_id'], basepath('kernel'), user)
if not os.path.exists(basepath('ramdisk')):
yield images.fetch(data['ramdisk_id'], basepath('ramdisk'), user)
execute = lambda cmd, input=None: \
process.simple_execute(cmd=cmd,
input=input,
error_ok=1)
key = data['key_data']
net = None
if FLAGS.simple_network:
with open(FLAGS.simple_network_template) as f:
net = f.read() % {'address': data['private_dns_name'],
'network': FLAGS.simple_network_network,
'netmask': FLAGS.simple_network_netmask,
'gateway': FLAGS.simple_network_gateway,
'broadcast': FLAGS.simple_network_broadcast,
'dns': FLAGS.simple_network_dns}
if key or net:
logging.info('Injecting data into image %s', data['image_id'])
yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute)
if os.path.exists(basepath('disk')):
yield process.simple_execute('rm -f %s' % basepath('disk'))
bytes = (instance_types.INSTANCE_TYPES[data['instance_type']]['local_gb']
* 1024 * 1024 * 1024)
yield disk.partition(
basepath('disk-raw'), basepath('disk'), bytes, execute=execute)
def basepath(self, instance, path=''):
return os.path.abspath(os.path.join(instance.datamodel['basepath'], path))
def toXml(self, instance):
# TODO(termie): cache?
logging.debug("Starting the toXML method")
libvirt_xml = open(FLAGS.libvirt_xml_template).read()
xml_info = instance.datamodel.copy()
# TODO(joshua): Make this xml express the attached disks as well
# TODO(termie): lazy lazy hack because xml is annoying
xml_info['nova'] = json.dumps(instance.datamodel.copy())
libvirt_xml = libvirt_xml % xml_info
logging.debug("Finished the toXML method")
return libvirt_xml
def get_info(self, instance_id):
virt_dom = self._conn.lookupByName(instance_id)
(state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info()
return {'state': state,
'max_mem': max_mem,
'mem': mem,
'num_cpu': num_cpu,
'cpu_time': cpu_time}
def get_disks(self, instance_id):
"""
Note that this function takes an instance ID, not an Instance, so
that it can be called by monitor.
Returns a list of all block devices for this domain.
"""
domain = self._conn.lookupByName(instance_id)
# TODO(devcamcar): Replace libxml2 with etree.
xml = domain.XMLDesc(0)
doc = None
try:
doc = libxml2.parseDoc(xml)
except:
return []
ctx = doc.xpathNewContext()
disks = []
try:
ret = ctx.xpathEval('/domain/devices/disk')
for node in ret:
devdst = None
for child in node.children:
if child.name == 'target':
devdst = child.prop('dev')
if devdst == None:
continue
disks.append(devdst)
finally:
if ctx != None:
ctx.xpathFreeContext()
if doc != None:
doc.freeDoc()
return disks
def get_interfaces(self, instance_id):
"""
Note that this function takes an instance ID, not an Instance, so
that it can be called by monitor.
Returns a list of all network interfaces for this instance.
"""
domain = self._conn.lookupByName(instance_id)
# TODO(devcamcar): Replace libxml2 with etree.
xml = domain.XMLDesc(0)
doc = None
try:
doc = libxml2.parseDoc(xml)
except:
return []
ctx = doc.xpathNewContext()
interfaces = []
try:
ret = ctx.xpathEval('/domain/devices/interface')
for node in ret:
devdst = None
for child in node.children:
if child.name == 'target':
devdst = child.prop('dev')
if devdst == None:
continue
interfaces.append(devdst)
finally:
if ctx != None:
ctx.xpathFreeContext()
if doc != None:
doc.freeDoc()
return interfaces
def block_stats(self, instance_id, disk):
"""
Note that this function takes an instance ID, not an Instance, so
that it can be called by monitor.
"""
domain = self._conn.lookupByName(instance_id)
return domain.blockStats(disk)
def interface_stats(self, instance_id, interface):
"""
Note that this function takes an instance ID, not an Instance, so
that it can be called by monitor.
"""
domain = self._conn.lookupByName(instance_id)
return domain.interfaceStats(interface)
+152
View File
@@ -0,0 +1,152 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A connection to XenServer or Xen Cloud Platform.
"""
import logging
from twisted.internet import defer
from twisted.internet import task
from nova import exception
from nova import flags
from nova import process
from nova.compute import power_state
XenAPI = None
FLAGS = flags.FLAGS
flags.DEFINE_string('xenapi_connection_url',
None,
'URL for connection to XenServer/Xen Cloud Platform. Required if connection_type=xenapi.')
flags.DEFINE_string('xenapi_connection_username',
'root',
'Username for connection to XenServer/Xen Cloud Platform. Used only if connection_type=xenapi.')
flags.DEFINE_string('xenapi_connection_password',
None,
'Password for connection to XenServer/Xen Cloud Platform. Used only if connection_type=xenapi.')
def get_connection(_):
"""Note that XenAPI doesn't have a read-only connection mode, so
the read_only parameter is ignored."""
# This is loaded late so that there's no need to install this
# library when not using XenAPI.
global XenAPI
if XenAPI is None:
XenAPI = __import__('XenAPI')
url = FLAGS.xenapi_connection_url
username = FLAGS.xenapi_connection_username
password = FLAGS.xenapi_connection_password
if not url or password is None:
raise Exception('Must specify xenapi_connection_url, xenapi_connection_username (optionally), and xenapi_connection_password to use connection_type=xenapi')
return XenAPIConnection(url, username, password)
class XenAPIConnection(object):
def __init__(self, url, user, pw):
self._conn = XenAPI.Session(url)
self._conn.login_with_password(user, pw)
def list_instances(self):
result = [self._conn.xenapi.VM.get_name_label(vm) \
for vm in self._conn.xenapi.VM.get_all()]
@defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
vm = self.lookup(instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
mem = str(long(instance.datamodel['memory_kb']) * 1024)
vcpus = str(instance.datamodel['vcpus'])
rec = {
'name_label': instance.name,
'name_description': '',
'is_a_template': False,
'memory_static_min': '0',
'memory_static_max': mem,
'memory_dynamic_min': mem,
'memory_dynamic_max': mem,
'VCPUs_at_startup': vcpus,
'VCPUs_max': vcpus,
'VCPUs_params': {},
'actions_after_shutdown': 'destroy',
'actions_after_reboot': 'restart',
'actions_after_crash': 'destroy',
'PV_bootloader': '',
'PV_kernel': instance.datamodel['kernel_id'],
'PV_ramdisk': instance.datamodel['ramdisk_id'],
'PV_args': '',
'PV_bootloader_args': '',
'PV_legacy_args': '',
'HVM_boot_policy': '',
'HVM_boot_params': {},
'platform': {},
'PCI_bus': '',
'recommendations': '',
'affinity': '',
'user_version': '0',
'other_config': {},
}
vm = yield self._conn.xenapi.VM.create(rec)
#yield self._conn.xenapi.VM.start(vm, False, False)
def reboot(self, instance):
vm = self.lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
yield self._conn.xenapi.VM.clean_reboot(vm)
def destroy(self, instance):
vm = self.lookup(instance.name)
if vm is None:
raise Exception('instance not present %s' % instance.name)
yield self._conn.xenapi.VM.destroy(vm)
def get_info(self, instance_id):
vm = self.lookup(instance_id)
if vm is None:
raise Exception('instance not present %s' % instance.name)
rec = self._conn.xenapi.VM.get_record(vm)
return {'state': power_state_from_xenapi[rec['power_state']],
'max_mem': long(rec['memory_static_max']) >> 10,
'mem': long(rec['memory_dynamic_max']) >> 10,
'num_cpu': rec['VCPUs_max'],
'cpu_time': 0}
def lookup(self, i):
vms = self._conn.xenapi.VM.get_by_name_label(i)
n = len(vms)
if n == 0:
return None
elif n > 1:
raise Exception('duplicate name found: %s' % i)
else:
return vms[0]
power_state_from_xenapi = {
'Halted' : power_state.RUNNING, #FIXME
'Running' : power_state.RUNNING,
'Paused' : power_state.PAUSED,
'Suspended': power_state.SHUTDOWN, # FIXME
'Crashed' : power_state.CRASHED
}