655 lines
22 KiB
Python
655 lines
22 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""
|
|
Implementation of SQLAlchemy backend
|
|
"""
|
|
|
|
from nova import db
|
|
from nova import exception
|
|
from nova import flags
|
|
from nova.db.sqlalchemy import models
|
|
from nova.db.sqlalchemy.session import managed_session
|
|
from sqlalchemy import or_
|
|
from sqlalchemy.sql import func
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
# NOTE(vish): disabling docstring pylint because the docstrings are
|
|
# in the interface definition
|
|
# pylint: disable-msg=C0111
|
|
|
|
###################
|
|
|
|
|
|
def service_destroy(context, service_id):
|
|
service_ref = service_get(context, service_id)
|
|
service_ref.delete()
|
|
|
|
def service_get(_context, service_id):
|
|
return models.Service.find(service_id)
|
|
|
|
|
|
def service_get_all_by_topic(context, topic):
|
|
with managed_session() as session:
|
|
return session.query(models.Service) \
|
|
.filter_by(deleted=False) \
|
|
.filter_by(topic=topic) \
|
|
.all()
|
|
|
|
|
|
def _service_get_all_topic_subquery(_context, session, topic, subq, label):
|
|
sort_value = getattr(subq.c, label)
|
|
return session.query(models.Service, sort_value) \
|
|
.filter_by(topic=topic) \
|
|
.filter_by(deleted=False) \
|
|
.outerjoin((subq, models.Service.host == subq.c.host)) \
|
|
.order_by(sort_value) \
|
|
.all()
|
|
|
|
|
|
def service_get_all_compute_sorted(context):
|
|
with managed_session() as session:
|
|
# NOTE(vish): The intended query is below
|
|
# SELECT services.*, inst_count.instance_count
|
|
# FROM services LEFT OUTER JOIN
|
|
# (SELECT host, count(*) AS instance_count
|
|
# FROM instances GROUP BY host) AS inst_count
|
|
# ON services.host == inst_count.host
|
|
topic = 'compute'
|
|
label = 'instance_count'
|
|
subq = session.query(models.Instance.host,
|
|
func.count('*').label(label)) \
|
|
.filter_by(deleted=False) \
|
|
.group_by(models.Instance.host) \
|
|
.subquery()
|
|
return _service_get_all_topic_subquery(context,
|
|
session,
|
|
topic,
|
|
subq,
|
|
label)
|
|
|
|
|
|
def service_get_all_network_sorted(context):
|
|
with managed_session() as session:
|
|
topic = 'network'
|
|
label = 'network_count'
|
|
subq = session.query(models.Network.host,
|
|
func.count('*').label(label)) \
|
|
.filter_by(deleted=False) \
|
|
.group_by(models.Network.host) \
|
|
.subquery()
|
|
return _service_get_all_topic_subquery(context,
|
|
session,
|
|
topic,
|
|
subq,
|
|
label)
|
|
|
|
|
|
def service_get_all_volume_sorted(context):
|
|
with managed_session() as session:
|
|
topic = 'volume'
|
|
label = 'volume_count'
|
|
subq = session.query(models.Volume.host,
|
|
func.count('*').label(label)) \
|
|
.filter_by(deleted=False) \
|
|
.group_by(models.Volume.host) \
|
|
.subquery()
|
|
return _service_get_all_topic_subquery(context,
|
|
session,
|
|
topic,
|
|
subq,
|
|
label)
|
|
|
|
|
|
def service_get_by_args(_context, host, binary):
|
|
return models.Service.find_by_args(host, binary)
|
|
|
|
|
|
def service_create(_context, values):
|
|
service_ref = models.Service()
|
|
for (key, value) in values.iteritems():
|
|
service_ref[key] = value
|
|
service_ref.save()
|
|
return service_ref.id
|
|
|
|
|
|
def service_update(context, service_id, values):
|
|
service_ref = service_get(context, service_id)
|
|
for (key, value) in values.iteritems():
|
|
service_ref[key] = value
|
|
service_ref.save()
|
|
|
|
|
|
###################
|
|
|
|
|
|
def floating_ip_allocate_address(_context, host, project_id):
|
|
with managed_session(autocommit=False) as session:
|
|
floating_ip_ref = session.query(models.FloatingIp) \
|
|
.filter_by(host=host) \
|
|
.filter_by(fixed_ip_id=None) \
|
|
.filter_by(deleted=False) \
|
|
.with_lockmode('update') \
|
|
.first()
|
|
# NOTE(vish): if with_lockmode isn't supported, as in sqlite,
|
|
# then this has concurrency issues
|
|
if not floating_ip_ref:
|
|
raise db.NoMoreAddresses()
|
|
floating_ip_ref['project_id'] = project_id
|
|
session.add(floating_ip_ref)
|
|
session.commit()
|
|
return floating_ip_ref['address']
|
|
|
|
|
|
def floating_ip_create(_context, address, host):
|
|
floating_ip_ref = models.FloatingIp()
|
|
floating_ip_ref['address'] = address
|
|
floating_ip_ref['host'] = host
|
|
floating_ip_ref.save()
|
|
return floating_ip_ref
|
|
|
|
|
|
def floating_ip_fixed_ip_associate(_context, floating_address, fixed_address):
|
|
with managed_session(autocommit=False) as session:
|
|
floating_ip_ref = models.FloatingIp.find_by_str(floating_address,
|
|
session=session)
|
|
fixed_ip_ref = models.FixedIp.find_by_str(fixed_address,
|
|
session=session)
|
|
floating_ip_ref.fixed_ip = fixed_ip_ref
|
|
floating_ip_ref.save(session=session)
|
|
session.commit()
|
|
|
|
|
|
def floating_ip_disassociate(_context, address):
|
|
with managed_session(autocommit=False) as session:
|
|
floating_ip_ref = models.FloatingIp.find_by_str(address,
|
|
session=session)
|
|
fixed_ip_ref = floating_ip_ref.fixed_ip
|
|
if fixed_ip_ref:
|
|
fixed_ip_address = fixed_ip_ref['address']
|
|
else:
|
|
fixed_ip_address = None
|
|
floating_ip_ref.fixed_ip = None
|
|
floating_ip_ref.save(session=session)
|
|
session.commit()
|
|
return fixed_ip_address
|
|
|
|
|
|
def floating_ip_deallocate(_context, address):
|
|
with managed_session(autocommit=False) as session:
|
|
floating_ip_ref = models.FloatingIp.find_by_str(address,
|
|
session=session)
|
|
floating_ip_ref['project_id'] = None
|
|
floating_ip_ref.save(session=session)
|
|
|
|
|
|
def floating_ip_get_by_address(_context, address):
|
|
return models.FloatingIp.find_by_str(address)
|
|
|
|
|
|
def floating_ip_get_instance(_context, address):
|
|
with managed_session() as session:
|
|
floating_ip_ref = models.FloatingIp.find_by_str(address,
|
|
session=session)
|
|
return floating_ip_ref.fixed_ip.instance
|
|
|
|
|
|
###################
|
|
|
|
|
|
def fixed_ip_allocate(_context, network_id):
|
|
with managed_session(autocommit=False) as session:
|
|
network_or_none = or_(models.FixedIp.network_id == network_id,
|
|
models.FixedIp.network_id == None)
|
|
fixed_ip_ref = session.query(models.FixedIp) \
|
|
.filter(network_or_none) \
|
|
.filter_by(reserved=False) \
|
|
.filter_by(allocated=False) \
|
|
.filter_by(leased=False) \
|
|
.filter_by(deleted=False) \
|
|
.with_lockmode('update') \
|
|
.first()
|
|
# NOTE(vish): if with_lockmode isn't supported, as in sqlite,
|
|
# then this has concurrency issues
|
|
if not fixed_ip_ref:
|
|
raise db.NoMoreAddresses()
|
|
if not fixed_ip_ref.network:
|
|
fixed_ip_ref.network = models.Network.find(network_id)
|
|
fixed_ip_ref['allocated'] = True
|
|
session.add(fixed_ip_ref)
|
|
session.commit()
|
|
return fixed_ip_ref['address']
|
|
|
|
|
|
def fixed_ip_create(_context, values):
|
|
fixed_ip_ref = models.FixedIp()
|
|
for (key, value) in values.iteritems():
|
|
fixed_ip_ref[key] = value
|
|
fixed_ip_ref.save()
|
|
return fixed_ip_ref['address']
|
|
|
|
|
|
def fixed_ip_get_by_address(_context, address):
|
|
return models.FixedIp.find_by_str(address)
|
|
|
|
|
|
def fixed_ip_get_instance(_context, address):
|
|
with managed_session() as session:
|
|
return models.FixedIp.find_by_str(address, session=session).instance
|
|
|
|
|
|
def fixed_ip_get_network(_context, address):
|
|
with managed_session() as session:
|
|
return models.FixedIp.find_by_str(address, session=session).network
|
|
|
|
|
|
def fixed_ip_deallocate(context, address):
|
|
fixed_ip_ref = fixed_ip_get_by_address(context, address)
|
|
fixed_ip_ref['allocated'] = False
|
|
fixed_ip_ref.save()
|
|
|
|
|
|
def fixed_ip_instance_associate(_context, address, instance_id):
|
|
with managed_session(autocommit=False) as session:
|
|
fixed_ip_ref = models.FixedIp.find_by_str(address, session=session)
|
|
instance_ref = models.Instance.find(instance_id, session=session)
|
|
fixed_ip_ref.instance = instance_ref
|
|
fixed_ip_ref.save(session=session)
|
|
session.commit()
|
|
|
|
|
|
def fixed_ip_instance_disassociate(_context, address):
|
|
with managed_session(autocommit=False) as session:
|
|
fixed_ip_ref = models.FixedIp.find_by_str(address, session=session)
|
|
fixed_ip_ref.instance = None
|
|
fixed_ip_ref.save(session=session)
|
|
session.commit()
|
|
|
|
|
|
def fixed_ip_update(context, address, values):
|
|
fixed_ip_ref = fixed_ip_get_by_address(context, address)
|
|
for (key, value) in values.iteritems():
|
|
fixed_ip_ref[key] = value
|
|
fixed_ip_ref.save()
|
|
|
|
|
|
###################
|
|
|
|
|
|
def instance_create(_context, values):
|
|
instance_ref = models.Instance()
|
|
for (key, value) in values.iteritems():
|
|
instance_ref[key] = value
|
|
instance_ref.save()
|
|
return instance_ref.id
|
|
|
|
|
|
def instance_destroy(context, instance_id):
|
|
instance_ref = instance_get(context, instance_id)
|
|
instance_ref.delete()
|
|
|
|
|
|
def instance_get(_context, instance_id):
|
|
return models.Instance.find(instance_id)
|
|
|
|
|
|
def instance_get_all(_context):
|
|
return models.Instance.all()
|
|
|
|
|
|
def instance_get_by_project(_context, project_id):
|
|
with managed_session() as session:
|
|
return session.query(models.Instance) \
|
|
.filter_by(project_id=project_id) \
|
|
.filter_by(deleted=False) \
|
|
.all()
|
|
|
|
|
|
def instance_get_by_reservation(_context, reservation_id):
|
|
with managed_session() as session:
|
|
return session.query(models.Instance) \
|
|
.filter_by(reservation_id=reservation_id) \
|
|
.filter_by(deleted=False) \
|
|
.all()
|
|
|
|
|
|
def instance_get_by_str(_context, str_id):
|
|
return models.Instance.find_by_str(str_id)
|
|
|
|
|
|
def instance_get_fixed_address(_context, instance_id):
|
|
with managed_session() as session:
|
|
instance_ref = models.Instance.find(instance_id, session=session)
|
|
if not instance_ref.fixed_ip:
|
|
return None
|
|
return instance_ref.fixed_ip['address']
|
|
|
|
|
|
def instance_get_floating_address(_context, instance_id):
|
|
with managed_session() as session:
|
|
instance_ref = models.Instance.find(instance_id, session=session)
|
|
if not instance_ref.fixed_ip:
|
|
return None
|
|
if not instance_ref.fixed_ip.floating_ips:
|
|
return None
|
|
# NOTE(vish): this just returns the first floating ip
|
|
return instance_ref.fixed_ip.floating_ips[0]['address']
|
|
|
|
|
|
def instance_get_host(context, instance_id):
|
|
instance_ref = instance_get(context, instance_id)
|
|
return instance_ref['host']
|
|
|
|
|
|
def instance_is_vpn(context, instance_id):
|
|
instance_ref = instance_get(context, instance_id)
|
|
return instance_ref['image_id'] == FLAGS.vpn_image_id
|
|
|
|
|
|
def instance_state(context, instance_id, state, description=None):
|
|
instance_ref = instance_get(context, instance_id)
|
|
instance_ref.set_state(state, description)
|
|
|
|
|
|
def instance_update(context, instance_id, values):
|
|
instance_ref = instance_get(context, instance_id)
|
|
for (key, value) in values.iteritems():
|
|
instance_ref[key] = value
|
|
instance_ref.save()
|
|
|
|
|
|
###################
|
|
|
|
|
|
def network_count(_context):
|
|
return models.Network.count()
|
|
|
|
|
|
def network_count_allocated_ips(_context, network_id):
|
|
with managed_session() as session:
|
|
return session.query(models.FixedIp) \
|
|
.filter_by(network_id=network_id) \
|
|
.filter_by(allocated=True) \
|
|
.filter_by(deleted=False) \
|
|
.count()
|
|
|
|
|
|
def network_count_available_ips(_context, network_id):
|
|
with managed_session() as session:
|
|
return session.query(models.FixedIp) \
|
|
.filter_by(network_id=network_id) \
|
|
.filter_by(allocated=False) \
|
|
.filter_by(reserved=False) \
|
|
.filter_by(deleted=False) \
|
|
.count()
|
|
|
|
|
|
def network_count_reserved_ips(_context, network_id):
|
|
with managed_session() as session:
|
|
return session.query(models.FixedIp) \
|
|
.filter_by(network_id=network_id) \
|
|
.filter_by(reserved=True) \
|
|
.filter_by(deleted=False) \
|
|
.count()
|
|
|
|
|
|
def network_create(_context, values):
|
|
network_ref = models.Network()
|
|
for (key, value) in values.iteritems():
|
|
network_ref[key] = value
|
|
network_ref.save()
|
|
return network_ref
|
|
|
|
|
|
def network_destroy(_context, network_id):
|
|
with managed_session(autocommit=False) as session:
|
|
# TODO(vish): do we have to use sql here?
|
|
session.execute('update networks set deleted=1 where id=:id',
|
|
{'id': network_id})
|
|
session.execute('update fixed_ips set deleted=1 where network_id=:id',
|
|
{'id': network_id})
|
|
session.execute('update floating_ips set deleted=1 '
|
|
'where fixed_ip_id in '
|
|
'(select id from fixed_ips '
|
|
'where network_id=:id)',
|
|
{'id': network_id})
|
|
session.execute('update network_indexes set network_id=NULL '
|
|
'where network_id=:id',
|
|
{'id': network_id})
|
|
session.commit()
|
|
|
|
|
|
def network_get(_context, network_id):
|
|
return models.Network.find(network_id)
|
|
|
|
|
|
# pylint: disable-msg=C0103
|
|
def network_get_associated_fixed_ips(_context, network_id):
|
|
with managed_session() as session:
|
|
return session.query(models.FixedIp) \
|
|
.filter_by(network_id=network_id) \
|
|
.filter(models.FixedIp.instance_id != None) \
|
|
.filter_by(deleted=False) \
|
|
.all()
|
|
|
|
|
|
def network_get_by_bridge(_context, bridge):
|
|
with managed_session() as session:
|
|
rv = session.query(models.Network) \
|
|
.filter_by(bridge=bridge) \
|
|
.filter_by(deleted=False) \
|
|
.first()
|
|
if not rv:
|
|
raise exception.NotFound('No network for bridge %s' % bridge)
|
|
return rv
|
|
|
|
|
|
def network_get_host(context, network_id):
|
|
network_ref = network_get(context, network_id)
|
|
return network_ref['host']
|
|
|
|
|
|
def network_get_index(_context, network_id):
|
|
with managed_session(autocommit=False) as session:
|
|
network_index = session.query(models.NetworkIndex) \
|
|
.filter_by(network_id=None) \
|
|
.filter_by(deleted=False) \
|
|
.with_lockmode('update') \
|
|
.first()
|
|
if not network_index:
|
|
raise db.NoMoreNetworks()
|
|
network_index['network'] = models.Network.find(network_id,
|
|
session=session)
|
|
session.add(network_index)
|
|
session.commit()
|
|
return network_index['index']
|
|
|
|
|
|
def network_index_count(_context):
|
|
return models.NetworkIndex.count()
|
|
|
|
|
|
def network_index_create(_context, values):
|
|
network_index_ref = models.NetworkIndex()
|
|
for (key, value) in values.iteritems():
|
|
network_index_ref[key] = value
|
|
network_index_ref.save()
|
|
|
|
|
|
def network_set_host(_context, network_id, host_id):
|
|
with managed_session(autocommit=False) as session:
|
|
network = session.query(models.Network) \
|
|
.filter_by(id=network_id) \
|
|
.filter_by(deleted=False) \
|
|
.with_lockmode('update') \
|
|
.first()
|
|
if not network:
|
|
raise exception.NotFound("Couldn't find network with %s" %
|
|
network_id)
|
|
# NOTE(vish): if with_lockmode isn't supported, as in sqlite,
|
|
# then this has concurrency issues
|
|
if network.host:
|
|
session.commit()
|
|
return network['host']
|
|
network['host'] = host_id
|
|
session.add(network)
|
|
session.commit()
|
|
return network['host']
|
|
|
|
|
|
def network_update(context, network_id, values):
|
|
network_ref = network_get(context, network_id)
|
|
for (key, value) in values.iteritems():
|
|
network_ref[key] = value
|
|
network_ref.save()
|
|
|
|
|
|
###################
|
|
|
|
|
|
def project_get_network(_context, project_id):
|
|
with managed_session() as session:
|
|
rv = session.query(models.Network) \
|
|
.filter_by(project_id=project_id) \
|
|
.filter_by(deleted=False) \
|
|
.first()
|
|
if not rv:
|
|
raise exception.NotFound('No network for project: %s' % project_id)
|
|
return rv
|
|
|
|
|
|
###################
|
|
|
|
|
|
def queue_get_for(_context, topic, physical_node_id):
|
|
# FIXME(ja): this should be servername?
|
|
return "%s.%s" % (topic, physical_node_id)
|
|
|
|
###################
|
|
|
|
|
|
def export_device_count(_context):
|
|
return models.ExportDevice.count()
|
|
|
|
|
|
def export_device_create(_context, values):
|
|
export_device_ref = models.ExportDevice()
|
|
for (key, value) in values.iteritems():
|
|
export_device_ref[key] = value
|
|
export_device_ref.save()
|
|
return export_device_ref
|
|
|
|
|
|
###################
|
|
|
|
|
|
def volume_allocate_shelf_and_blade(_context, volume_id):
|
|
with managed_session(autocommit=False) as session:
|
|
export_device = session.query(models.ExportDevice) \
|
|
.filter_by(volume=None) \
|
|
.filter_by(deleted=False) \
|
|
.with_lockmode('update') \
|
|
.first()
|
|
# NOTE(vish): if with_lockmode isn't supported, as in sqlite,
|
|
# then this has concurrency issues
|
|
if not export_device:
|
|
raise db.NoMoreBlades()
|
|
export_device.volume_id = volume_id
|
|
session.add(export_device)
|
|
session.commit()
|
|
return (export_device.shelf_id, export_device.blade_id)
|
|
|
|
|
|
def volume_attached(context, volume_id, instance_id, mountpoint):
|
|
volume_ref = volume_get(context, volume_id)
|
|
volume_ref.instance_id = instance_id
|
|
volume_ref['status'] = 'in-use'
|
|
volume_ref['mountpoint'] = mountpoint
|
|
volume_ref['attach_status'] = 'attached'
|
|
volume_ref.save()
|
|
|
|
|
|
def volume_create(_context, values):
|
|
volume_ref = models.Volume()
|
|
for (key, value) in values.iteritems():
|
|
volume_ref[key] = value
|
|
volume_ref.save()
|
|
return volume_ref
|
|
|
|
|
|
def volume_destroy(_context, volume_id):
|
|
with managed_session(autocommit=False) as session:
|
|
# TODO(vish): do we have to use sql here?
|
|
session.execute('update volumes set deleted=1 where id=:id',
|
|
{'id': volume_id})
|
|
session.execute('update export_devices set volume_id=NULL '
|
|
'where volume_id=:id',
|
|
{'id': volume_id})
|
|
session.commit()
|
|
|
|
|
|
def volume_detached(context, volume_id):
|
|
volume_ref = volume_get(context, volume_id)
|
|
volume_ref['instance_id'] = None
|
|
volume_ref['mountpoint'] = None
|
|
volume_ref['status'] = 'available'
|
|
volume_ref['attach_status'] = 'detached'
|
|
volume_ref.save()
|
|
|
|
|
|
def volume_get(_context, volume_id):
|
|
return models.Volume.find(volume_id)
|
|
|
|
|
|
def volume_get_all(_context):
|
|
return models.Volume.all()
|
|
|
|
|
|
def volume_get_by_project(_context, project_id):
|
|
with managed_session() as session:
|
|
return session.query(models.Volume) \
|
|
.filter_by(project_id=project_id) \
|
|
.filter_by(deleted=False) \
|
|
.all()
|
|
|
|
|
|
def volume_get_by_str(_context, str_id):
|
|
return models.Volume.find_by_str(str_id)
|
|
|
|
|
|
def volume_get_host(context, volume_id):
|
|
volume_ref = volume_get(context, volume_id)
|
|
return volume_ref['host']
|
|
|
|
|
|
def volume_get_shelf_and_blade(_context, volume_id):
|
|
with managed_session() as session:
|
|
export_device = session.query(models.ExportDevice) \
|
|
.filter_by(volume_id=volume_id) \
|
|
.first()
|
|
if not export_device:
|
|
raise exception.NotFound()
|
|
return (export_device.shelf_id, export_device.blade_id)
|
|
|
|
|
|
def volume_update(context, volume_id, values):
|
|
volume_ref = volume_get(context, volume_id)
|
|
for (key, value) in values.iteritems():
|
|
volume_ref[key] = value
|
|
volume_ref.save()
|