Merge "remove nova code related to Quantum v1 API"

This commit is contained in:
Jenkins
2012-08-12 02:15:37 +00:00
committed by Gerrit Code Review
16 changed files with 105 additions and 2838 deletions
-15
View File
@@ -625,21 +625,6 @@ class NetworkCommands(object):
network.project_id,
network.uuid)
def quantum_list(self):
"""List all created networks with Quantum-relevant fields"""
_fmt = "%-32s\t%-10s\t%-10s\t%s , %s"
print _fmt % (_('uuid'),
_('project'),
_('priority'),
_('cidr_v4'),
_('cidr_v6'))
for network in db.network_get_all(context.get_admin_context()):
print _fmt % (network.uuid,
network.project_id,
network.priority,
network.cidr,
network.cidr_v6)
@args('--fixed_range', dest="fixed_range", metavar='<x.x.x.x/yy>',
help='Network to delete')
@args('--uuid', dest='uuid', metavar='<uuid>',
+1 -47
View File
@@ -348,7 +348,7 @@
# volume_api_class=nova.volume.api.API
#### (StrOpt) The full class name of the volume API class to use
# security_group_handler=nova.network.quantum.sg.NullSecurityGroupHandler
# security_group_handler=nova.network.quantumv2.sg.NullSecurityGroupHandler
#### (StrOpt) The full class name of the security group handler class
# default_access_ip_network_name=<None>
@@ -1011,52 +1011,6 @@
# l3_lib=nova.network.l3.LinuxNetL3
#### (StrOpt) Indicates underlying L3 management library
######## defined in nova.network.quantum.manager ########
# quantum_ipam_lib=nova.network.quantum.nova_ipam_lib
#### (StrOpt) Indicates underlying IP address management library
# use_melange_mac_generation=false
#### (BoolOpt) Use Melange for assignment of MAC addresses
# quantum_use_dhcp=false
#### (BoolOpt) Whether or not to enable DHCP for networks
# quantum_use_port_security=false
#### (BoolOpt) Whether or not to enable port security
# quantum_port_security_include_link_local=false
#### (BoolOpt) Add the link local address to the port security list
######## defined in nova.network.quantum.melange_connection ########
# melange_host=127.0.0.1
#### (StrOpt) HOST for connecting to melange
# melange_port=9898
#### (IntOpt) PORT for connecting to melange
# melange_num_retries=0
#### (IntOpt) Number retries when contacting melange
######## defined in nova.network.quantum.quantum_connection ########
# quantum_connection_host=127.0.0.1
#### (StrOpt) HOST for connecting to quantum
# quantum_connection_port=9696
#### (IntOpt) PORT for connecting to quantum
# quantum_default_tenant_id=default
#### (StrOpt) Default tenant id when creating quantum networks
# quantum_request_timeout=20
#### (IntOpt) Maximum amount of time to wait for quantum request
######## defined in nova.notifier.api ########
# default_notification_level=INFO
+1 -1
View File
@@ -410,7 +410,7 @@ global_opts = [
default='nova.volume.api.API',
help='The full class name of the volume API class to use'),
cfg.StrOpt('security_group_handler',
default='nova.network.quantum.sg.NullSecurityGroupHandler',
default='nova.network.sg.NullSecurityGroupHandler',
help='The full class name of the security group handler class'),
cfg.StrOpt('default_access_ip_network_name',
default=None,
+1 -1
View File
@@ -789,7 +789,7 @@ class NetworkManager(manager.SchedulerDependentManager):
# NOTE(tr3buchet: unless manager subclassing NetworkManager has
# already imported ipam, import nova ipam here
if not hasattr(self, 'ipam'):
self._import_ipam_lib('nova.network.quantum.nova_ipam_lib')
self._import_ipam_lib('nova.network.nova_ipam_lib')
l3_lib = kwargs.get("l3_lib", FLAGS.l3_lib)
self.l3driver = importutils.import_object(l3_lib)
+101
View File
@@ -0,0 +1,101 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import db
from nova import exception
from nova import ipv6
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def get_ipam_lib(net_man):
return QuantumNovaIPAMLib(net_man)
class QuantumNovaIPAMLib(object):
"""Implements Quantum IP Address Management (IPAM) interface
using the local Nova database. This implementation is inline
with how IPAM is used by other NetworkManagers.
"""
def __init__(self, net_manager):
"""Holds a reference to the "parent" network manager, used
to take advantage of various FlatManager methods to avoid
code duplication.
"""
self.net_manager = net_manager
def get_subnets_by_net_id(self, context, tenant_id, net_id, _vif_id=None):
"""Returns information about the IPv4 and IPv6 subnets
associated with a Quantum Network UUID.
"""
n = db.network_get_by_uuid(context.elevated(), net_id)
subnet_v4 = {
'network_id': n['uuid'],
'cidr': n['cidr'],
'gateway': n['gateway'],
'broadcast': n['broadcast'],
'netmask': n['netmask'],
'version': 4,
'dns1': n['dns1'],
'dns2': n['dns2']}
#TODO(tr3buchet): I'm noticing we've assumed here that all dns is v4.
# this is probably bad as there is no way to add v6
# dns to nova
subnet_v6 = {
'network_id': n['uuid'],
'cidr': n['cidr_v6'],
'gateway': n['gateway_v6'],
'broadcast': None,
'netmask': n['netmask_v6'],
'version': 6,
'dns1': None,
'dns2': None}
return [subnet_v4, subnet_v6]
def get_routes_by_ip_block(self, context, block_id, project_id):
"""Returns the list of routes for the IP block"""
return []
def get_v4_ips_by_interface(self, context, net_id, vif_id, project_id):
"""Returns a list of IPv4 address strings associated with
the specified virtual interface, based on the fixed_ips table.
"""
# TODO(tr3buchet): link fixed_ips to vif by uuid so only 1 db call
vif_rec = db.virtual_interface_get_by_uuid(context, vif_id)
fixed_ips = db.fixed_ips_by_virtual_interface(context,
vif_rec['id'])
return [fixed_ip['address'] for fixed_ip in fixed_ips]
def get_v6_ips_by_interface(self, context, net_id, vif_id, project_id):
"""Returns a list containing a single IPv6 address strings
associated with the specified virtual interface.
"""
admin_context = context.elevated()
network = db.network_get_by_uuid(admin_context, net_id)
vif_rec = db.virtual_interface_get_by_uuid(context, vif_id)
if network['cidr_v6']:
ip = ipv6.to_global(network['cidr_v6'],
vif_rec['address'],
project_id)
return [ip]
return []
def get_floating_ips_by_fixed_address(self, context, fixed_address):
return db.floating_ip_get_by_fixed_address(context, fixed_address)
-16
View File
@@ -1,16 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-312
View File
@@ -1,312 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Citrix Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Tyler Smith, Cisco Systems
import httplib
import socket
import urllib
from nova.openstack.common import jsonutils
# FIXME(danwent): All content in this file should be removed once the
# packaging work for the quantum client libraries is complete.
# At that point, we will be able to just install the libraries as a
# dependency and import from quantum.client.* and quantum.common.*
# Until then, we have simplified versions of these classes in this file.
class JSONSerializer(object):
"""This is a simple json-only serializer to use until we can just grab
the standard serializer from the quantum library.
"""
def serialize(self, data, content_type):
return jsonutils.dumps(data)
def deserialize(self, data, content_type):
return jsonutils.loads(data)
# Quantum API v1.0 uses 420 + 430 for network + port not found
# Quantum API v1.1 uses 404 for network + port not found
NOT_FOUND_CODES = set((404, 420, 430))
# The full client lib will expose more
# granular exceptions, for now, just try to distinguish
# between the cases we care about.
class QuantumNotFoundException(Exception):
"""Indicates that Quantum Server returned a not-found error code"""
pass
class QuantumServerException(Exception):
"""Indicates any non-404 error from Quantum Server"""
pass
class QuantumIOException(Exception):
"""Indicates network IO trouble reaching Quantum Server"""
pass
class api_call(object):
"""A Decorator to add support for format and tenant overriding"""
def __init__(self, func):
self.func = func
def __get__(self, instance, owner):
def with_params(*args, **kwargs):
"""Temporarily set format and tenant for this request"""
(format, tenant) = (instance.format, instance.tenant)
instance.format = kwargs.pop('format', instance.format)
instance.tenant = kwargs.pop('tenant', instance.tenant)
ret = None
try:
ret = self.func(instance, *args, **kwargs)
finally:
(instance.format, instance.tenant) = (format, tenant)
return ret
return with_params
class Client(object):
"""A base client class - derived from Glance.BaseClient"""
action_prefix = '/v1.1/tenants/{tenant_id}'
"""Action query strings"""
networks_path = "/networks"
network_path = "/networks/%s"
ports_path = "/networks/%s/ports"
port_path = "/networks/%s/ports/%s"
attachment_path = "/networks/%s/ports/%s/attachment"
def __init__(self, host="127.0.0.1", port=9696, use_ssl=False, tenant=None,
format="xml", testing_stub=None, key_file=None,
cert_file=None, logger=None, timeout=None):
"""Creates a new client to some service.
:param host: The host where service resides
:param port: The port where service resides
:param use_ssl: True to use SSL, False to use HTTP
:param tenant: The tenant ID to make requests with
:param format: The format to query the server with
:param testing_stub: A class that stubs basic server methods for tests
:param key_file: The SSL key file to use if use_ssl is true
:param cert_file: The SSL cert file to use if use_ssl is true
:param logger: logging object to be used by client library
"""
self.host = host
self.port = port
self.use_ssl = use_ssl
self.tenant = tenant
self.format = format
self.connection = None
self.testing_stub = testing_stub
self.key_file = key_file
self.cert_file = cert_file
self.logger = logger
self.timeout = timeout
def get_connection_type(self):
"""Returns the proper connection type"""
if self.testing_stub:
return self.testing_stub
elif self.use_ssl:
return httplib.HTTPSConnection
else:
return httplib.HTTPConnection
def do_request(self, method, action, body=None,
headers=None, params=None):
"""Connects to the server and issues a request.
Returns the result data, or raises an appropriate exception if
HTTP status code is not 2xx
:param method: HTTP method ("GET", "POST", "PUT", etc...)
:param body: string of data to send, or None (default)
:param headers: mapping of key/value pairs to add as headers
:param params: dictionary of key/value pairs to add to append
to action
"""
# Ensure we have a tenant id
if not self.tenant:
raise Exception(_("Tenant ID not set"))
# Add format and tenant_id
action += ".%s" % self.format
action = Client.action_prefix + action
action = action.replace('{tenant_id}', self.tenant)
if isinstance(params, dict):
action += '?' + urllib.urlencode(params)
try:
connection_type = self.get_connection_type()
headers = headers or {"Content-Type":
"application/%s" % self.format}
# Open connection and send request, handling SSL certs
kwargs = {}
if self.use_ssl:
if self.key_file:
kwargs['key_file'] = self.key_file
if self.cert_file:
kwargs['cert_file'] = self.cert_file
if self.timeout:
kwargs['timeout'] = self.timeout
c = connection_type(self.host, self.port, **kwargs)
if self.logger:
self.logger.debug(
_("Quantum Client Request: %(method)s %(action)s"),
locals())
if body:
self.logger.debug(body)
c.request(method, action, body, headers)
res = c.getresponse()
status_code = self.get_status_code(res)
data = res.read()
if self.logger:
self.logger.debug("Quantum Client Reply (code = %s) :\n %s" %
(str(status_code), data))
if status_code in NOT_FOUND_CODES:
raise QuantumNotFoundException(
_("Quantum entity not found: %s"), data)
if status_code in (httplib.OK,
httplib.CREATED,
httplib.ACCEPTED,
httplib.NO_CONTENT):
if data is not None and len(data):
return self.deserialize(data, status_code)
else:
raise QuantumServerException(
_("Server %(status_code)s error: %(data)s")
% locals())
except (socket.error, IOError), e:
raise QuantumIOException(_("Unable to connect to "
"server. Got error: %s") % e)
def get_status_code(self, response):
"""Returns the integer status code from the response, which
can be either a Webob.Response (used in testing) or httplib.Response
"""
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status
def serialize(self, data):
if not data:
return None
elif isinstance(data, dict):
return JSONSerializer().serialize(data, self.content_type())
else:
raise Exception(_("unable to deserialize object of type = '%s'") %
type(data))
def deserialize(self, data, status_code):
return JSONSerializer().deserialize(data, self.content_type())
def content_type(self, format=None):
if not format:
format = self.format
return "application/%s" % (format)
@api_call
def list_networks(self, filter_ops=None):
"""Fetches a list of all networks for a tenant"""
return self.do_request("GET", self.networks_path, params=filter_ops)
@api_call
def show_network_details(self, network):
"""Fetches the details of a certain network"""
return self.do_request("GET", self.network_path % (network))
@api_call
def create_network(self, body=None):
"""Creates a new network"""
body = self.serialize(body)
return self.do_request("POST", self.networks_path, body=body)
@api_call
def update_network(self, network, body=None):
"""Updates a network"""
body = self.serialize(body)
return self.do_request("PUT", self.network_path % (network), body=body)
@api_call
def delete_network(self, network):
"""Deletes the specified network"""
return self.do_request("DELETE", self.network_path % (network))
@api_call
def list_ports(self, network, filter_ops=None):
"""Fetches a list of ports on a given network"""
return self.do_request("GET", self.ports_path % (network),
params=filter_ops)
@api_call
def show_port_details(self, network, port):
"""Fetches the details of a certain port"""
return self.do_request("GET", self.port_path % (network, port))
@api_call
def create_port(self, network, body=None):
"""Creates a new port on a given network"""
body = self.serialize(body)
return self.do_request("POST", self.ports_path % (network), body=body)
@api_call
def delete_port(self, network, port):
"""Deletes the specified port from a network"""
return self.do_request("DELETE", self.port_path % (network, port))
@api_call
def set_port_state(self, network, port, body=None):
"""Sets the state of the specified port"""
body = self.serialize(body)
return self.do_request("PUT",
self.port_path % (network, port), body=body)
@api_call
def show_port_attachment(self, network, port):
"""Fetches the attachment-id associated with the specified port"""
return self.do_request("GET", self.attachment_path % (network, port))
@api_call
def attach_resource(self, network, port, body=None):
"""Sets the attachment-id of the specified port"""
body = self.serialize(body)
return self.do_request("PUT",
self.attachment_path % (network, port), body=body)
@api_call
def detach_resource(self, network, port):
"""Removes the attachment-id of the specified port"""
return self.do_request("DELETE",
self.attachment_path % (network, port))
-171
View File
@@ -1,171 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Nicira Networks
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Dan Wendlandt Nicira Networks
from nova.network.quantum import client
from nova import utils
#TODO(danwent): would be nice to have these functions raise QuantumIOErrors
# to make sure such errors are caught and reported properly
# this is a fake quantum client that just stores all data in a nested dict
#
# example:
#
#{'<tenant>':
# { '<net-uuid>' : { 'name' : "t1-net1",
# 'ports' : [ { '<port-uuid'> :
# {'state': 'Active',
# 'attachment': 'iface-id'},
# { 'state': 'Down',
# 'attachment' : None}}]
# }
# }
# }
class FakeClient(object):
"""A fake Quantum Client for testing"""
def __init__(self, logger=None, **kwargs):
"""Creates a new client to some service.
:param logger: logging object to be used by client library
"""
self.logger = logger
self.tenant_map = {}
def _get_tenant_nets(self, tenant):
if tenant is None:
raise Exception("'tenant' cannot be None")
return self.tenant_map.setdefault(tenant, {})
def _verify_net(self, network, nets):
if network not in nets:
raise client.QuantumNotFoundException("no network with uuid %s"
% network)
def _verify_net_and_port(self, network, port, nets):
if network not in nets:
raise client.QuantumNotFoundException("no network with uuid %s"
% network)
if port not in nets[network]['ports']:
raise client.QuantumNotFoundException("no port with uuid %s"
% port)
def list_networks(self, tenant=None, filter_ops=None):
"""Fetches a list of all networks for a tenant"""
nets = self._get_tenant_nets(tenant)
if filter_ops:
raise Exception("Need to implement filters %s in "
"quantum fake client" % filter_ops)
return {"networks": [{"id": uuid} for uuid in nets.keys()]}
def show_network_details(self, network, tenant=None):
"""Fetches the details of a certain network"""
nets = self._get_tenant_nets(tenant)
self._verify_net(network, nets)
return {"network": {"id": network, "name": nets[network]["name"]}}
def create_network(self, body=None, tenant=None):
"""Creates a new network"""
nets = self._get_tenant_nets(tenant)
uuid = str(utils.gen_uuid())
name = body["network"]["name"]
nets[uuid] = {"ports": {}, "name": name}
return {"network": {"id": uuid}}
def update_network(self, network, body=None, tenant=None):
"""Updates a network"""
nets = self._get_tenant_nets(tenant)
self._verify_net(network, nets)
nets[network]['name'] = body["network"]["name"]
return {"network": {"id": network, "name": nets[networks]["name"]}}
def delete_network(self, network, tenant=None):
"""Deletes the specified network"""
nets = self._get_tenant_nets(tenant)
self._verify_net(network, nets)
del nets[network]
def list_ports(self, network, filter_ops=None, tenant=None):
"""Fetches a list of ports on a given network"""
nets = self._get_tenant_nets(tenant)
self._verify_net(network, nets)
ports = nets[network]['ports'].items()
if filter_ops and 'attachment' in filter_ops:
a = filter_ops.pop('attachment')
ports = [p for p in ports if p[1]['attachment'] == a]
if filter_ops:
raise Exception("Need to implement files %s in "
"quantum fake client" % filter_ops)
return {"ports": [{'id': p[0]} for p in ports]}
def show_port_details(self, network, port, tenant=None):
"""Fetches the details of a certain port"""
nets = self._get_tenant_nets(tenant)
self._verify_net_and_port(network, port, nets)
p = nets[network]['ports'][port]
return {"port": {"state": p["state"], "id": port}}
def create_port(self, network, body=None, tenant=None):
"""Creates a new port on a given network"""
nets = self._get_tenant_nets(tenant)
self._verify_net(network, nets)
uuid = str(utils.gen_uuid())
nets[network]['ports'][uuid] = {'attachment': None,
'state': body['port']['state']}
return {"port": {"id": uuid}}
def delete_port(self, network, port, tenant=None):
"""Deletes the specified port from a network"""
nets = self._get_tenant_nets(tenant)
self._verify_net_and_port(network, port, nets)
del nets[network]['ports'][port]
def set_port_state(self, network, port, body=None, tenant=None):
"""Sets the state of the specified port"""
nets = self._get_tenant_nets(tenant)
self._verify_net_and_port(network, port, nets)
new_state = body['port']['state']
nets[network]['ports'][port]['state'] = new_state
return {"port": {"state": new_state}}
def show_port_attachment(self, network, port, tenant=None):
"""Fetches the attachment-id associated with the specified port"""
nets = self._get_tenant_nets(tenant)
self._verify_net_and_port(network, port, nets)
p = nets[network]['ports'][port]
if p['attachment'] is not None:
return {"attachment": {"id": p['attachment']}}
else:
return {"attachment": {}}
def attach_resource(self, network, port, body=None, tenant=None):
nets = self._get_tenant_nets(tenant)
self._verify_net_and_port(network, port, nets)
nets[network]['ports'][port]['attachment'] = body['attachment']['id']
def detach_resource(self, network, port, tenant=None):
"""Removes the attachment-id of the specified port"""
nets = self._get_tenant_nets(tenant)
self._verify_net_and_port(network, port, nets)
nets[network]['ports'][port]['attachment'] = None
-762
View File
@@ -1,762 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import netaddr
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova.network import manager
from nova.network.quantum import melange_ipam_lib
from nova.network.quantum import quantum_connection
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova.openstack.common import rpc
from nova import utils
LOG = logging.getLogger(__name__)
quantum_opts = [
cfg.StrOpt('quantum_ipam_lib',
default='nova.network.quantum.nova_ipam_lib',
help="Indicates underlying IP address management library"),
# TODO(Vek): Eventually, this needs to mean more than just using
# Melange for assignment of MAC addresses (with an
# appropriate flag name change, of course), but this is all
# it does right now
cfg.BoolOpt('use_melange_mac_generation',
default=False,
help="Use Melange for assignment of MAC addresses"),
cfg.BoolOpt('quantum_use_dhcp',
default=False,
help='Whether or not to enable DHCP for networks'),
cfg.BoolOpt('quantum_use_port_security',
default=False,
help='Whether or not to enable port security'),
cfg.BoolOpt('quantum_port_security_include_link_local',
default=False,
help='Add the link local address to the port security list'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(quantum_opts)
class QuantumManager(manager.FloatingIP, manager.FlatManager):
"""NetworkManager class that communicates with a Quantum service
via a web services API to provision VM network connectivity.
For IP Address management, QuantumManager can be configured to
use either Nova's local DB or the Melange IPAM service.
"""
DHCP = FLAGS.quantum_use_dhcp
def __init__(self, q_conn=None, ipam_lib=None, *args, **kwargs):
"""Initialize two key libraries, the connection to a
Quantum service, and the library for implementing IPAM.
Calls inherited FlatManager constructor.
"""
if not q_conn:
q_conn = quantum_connection.QuantumClientConnection()
self.q_conn = q_conn
if not ipam_lib:
ipam_lib = FLAGS.quantum_ipam_lib
self._import_ipam_lib(ipam_lib)
super(QuantumManager, self).__init__(*args, **kwargs)
def init_host(self):
# Initialize general L3 networking
self.l3driver.initialize()
super(QuantumManager, self).init_host()
# Initialize floating ip support (only works for nova ipam currently)
if FLAGS.quantum_ipam_lib == 'nova.network.quantum.nova_ipam_lib':
LOG.debug("Initializing FloatingIP support")
self.init_host_floating_ips()
# Set up all the forwarding rules for any network that has a
# gateway set.
networks = self.get_all_networks(context.get_admin_context())
cidrs = []
for net in networks:
# Don't update host information for network that does not
# belong to you
if net['host'] != self.host:
continue
if net['gateway']:
LOG.debug("Initializing NAT: %s (cidr: %s, gw: %s)" % (
net['label'], net['cidr'], net['gateway']))
cidrs.append(net['cidr'])
self._update_network_host(context.get_admin_context(),
net['uuid'])
# .. and for each network
for c in cidrs:
self.l3driver.initialize_network(c)
# Similar to FlatDHCPMananger, except we check for quantum_use_dhcp flag
# before we try to update_dhcp
def _setup_network_on_host(self, context, network):
"""Sets up network on this host."""
network['dhcp_server'] = self._get_dhcp_ip(context, network)
self.l3driver.initialize_gateway(network)
if FLAGS.quantum_use_dhcp and not FLAGS.fake_network:
dev = self.driver.get_dev(network)
self.driver.update_dhcp(context, dev, network)
if FLAGS.use_ipv6:
self.driver.update_ra(context, dev, network)
gateway = utils.get_my_linklocal(dev)
self.db.network_update(context, network['id'],
{'gateway_v6': gateway})
def _update_network_host(self, context, net_uuid):
"""Set the host column in the networks table: note that this won't
work with multi-host but QuantumManager doesn't support that
anyways. The floating IPs mixin required network['host'] to be
set."""
entry = db.network_get_by_uuid(context.elevated(), net_uuid)
entry['host'] = self.host
db.network_update(context.elevated(), entry['id'], entry)
def _get_nova_id(self, instance=None):
# When creating the network we need to pass in an identifier for
# this zone. Some Quantum plugins need this information in order
# to set up appropriate networking.
if instance and instance['availability_zone']:
return instance['availability_zone']
else:
return FLAGS.node_availability_zone
def get_all_networks(self, context):
networks = []
networks.extend(self.ipam.get_global_networks(context))
networks.extend(self.ipam.get_project_networks(context))
return networks
def create_networks(self, context, label, cidr, multi_host, num_networks,
network_size, cidr_v6, gateway, gateway_v6, bridge,
bridge_interface, dns1=None, dns2=None, uuid=None,
**kwargs):
"""Unlike other NetworkManagers, with QuantumManager, each
create_networks calls should create only a single network.
Two scenarios exist:
- no 'uuid' is specified, in which case we contact
Quantum and create a new network.
- an existing 'uuid' is specified, corresponding to
a Quantum network created out of band.
In both cases, we initialize a subnet using the IPAM lib.
"""
# Enforce Configuration sanity.
#
# These flags are passed in from bin/nova-manage. The script
# collects the arguments and then passes them in through this
# function call. Note that in some cases, the script pre-processes
# the arguments, and sets them to a default value if a parameter's
# value was not specified on the command line. For pre-processed
# parameters, the most effective check to see if the user passed it
# in is to see if is different from the default value. (This
# does miss the use case where the user passes in the default value
# on the command line -- but it is unavoidable.)
if multi_host != FLAGS.multi_host:
# User specified it on the command line.
raise Exception(_("QuantumManager does not use 'multi_host'"
" parameter."))
if num_networks != 1:
raise Exception(_("QuantumManager requires that only one"
" network is created per call"))
if network_size != int(FLAGS.network_size):
# User specified it on the command line.
LOG.warning("Ignoring unnecessary parameter 'network_size'")
if kwargs.get('vlan_start', None):
if kwargs['vlan_start'] != int(FLAGS.vlan_start):
# User specified it on the command line.
LOG.warning(_("QuantumManager does not use 'vlan_start'"
" parameter."))
if kwargs.get('vpn_start', None):
if kwargs['vpn_start'] != int(FLAGS.vpn_start):
# User specified it on the command line.
LOG.warning(_("QuantumManager does not use 'vpn_start'"
" parameter."))
if bridge is not None and len(bridge) > 0:
LOG.warning(_("QuantumManager does not use 'bridge'"
" parameter."))
if bridge_interface is not None and len(bridge_interface) > 0:
LOG.warning(_("QuantumManager does not use 'bridge_interface'"
" parameter."))
if gateway is not None and len(gateway) > 0:
if gateway.split('.')[3] != '1':
raise Exception(_("QuantumManager requires a valid (.1)"
" gateway address."))
q_tenant_id = kwargs["project_id"] or FLAGS.quantum_default_tenant_id
quantum_net_id = uuid
# If a uuid was specified with the network it should have already been
# created in Quantum, so make sure.
if quantum_net_id:
if not self.q_conn.network_exists(q_tenant_id, quantum_net_id):
raise Exception(_("Unable to find existing quantum "
"network for tenant '%(q_tenant_id)s' "
"with net-id '%(quantum_net_id)s'") %
locals())
else:
nova_id = self._get_nova_id()
quantum_net_id = self.q_conn.create_network(q_tenant_id, label,
nova_id=nova_id)
ipam_tenant_id = kwargs.get("project_id", None)
priority = kwargs.get("priority", 0)
# NOTE(tr3buchet): this call creates a nova network in the nova db
self.ipam.create_subnet(context, label, ipam_tenant_id,
quantum_net_id, priority, cidr,
gateway, gateway_v6, cidr_v6, dns1, dns2)
self._update_network_host(context, quantum_net_id)
# Initialize forwarding
self.l3driver.initialize_network(cidr)
return [{'uuid': quantum_net_id}]
def delete_network(self, context, fixed_range, uuid):
"""Lookup network by uuid, delete both the IPAM
subnet and the corresponding Quantum network.
The fixed_range parameter is kept here for interface compatibility
but is not used.
"""
net_ref = db.network_get_by_uuid(context.elevated(), uuid)
project_id = net_ref['project_id']
q_tenant_id = project_id or FLAGS.quantum_default_tenant_id
net_uuid = net_ref['uuid']
# Check for any attached ports on the network and fail the deletion if
# there is anything but the gateway port attached. If it is only the
# gateway port, unattach and delete it.
ports = self.q_conn.get_attached_ports(q_tenant_id, net_uuid)
num_ports = len(ports)
gw_interface_id = self.driver.get_dev(net_ref)
gw_port_uuid = None
if gw_interface_id is not None:
gw_port_uuid = self.q_conn.get_port_by_attachment(q_tenant_id,
net_uuid, gw_interface_id)
if gw_port_uuid:
num_ports -= 1
if num_ports > 0:
raise exception.NetworkBusy(network=net_uuid)
# only delete gw ports if we are going to finish deleting network
if gw_port_uuid:
self.q_conn.detach_and_delete_port(q_tenant_id,
net_uuid,
gw_port_uuid)
self.l3driver.remove_gateway(net_ref)
# Now we can delete the network
self.q_conn.delete_network(q_tenant_id, net_uuid)
LOG.debug("Deleting network %s for tenant: %s" %
(net_uuid, q_tenant_id))
self.ipam.delete_subnets_by_net_id(context, net_uuid, project_id)
# Get rid of dnsmasq
if FLAGS.quantum_use_dhcp:
if net_ref['host'] == self.host:
self.kill_dhcp(net_ref)
else:
topic = rpc.queue_get_for(context,
FLAGS.network_topic,
net_ref['host'])
rpc.call(context, topic, {'method': 'kill_dhcp',
'args': {'net_ref': net_ref}})
def kill_dhcp(self, net_ref):
dev = self.driver.get_dev(net_ref)
if self.driver._device_exists(dev):
self.driver.kill_dhcp(dev)
def allocate_for_instance(self, context, **kwargs):
"""Called by compute when it is creating a new VM.
There are three key tasks:
- Determine the number and order of vNICs to create
- Allocate IP addresses
- Create ports on a Quantum network and attach vNICs.
We support two approaches to determining vNICs:
- By default, a VM gets a vNIC for any network belonging
to the VM's project, and a vNIC for any "global" network
that has a NULL project_id. vNIC order is determined
by the network's 'priority' field.
- If the 'os-create-server-ext' was used to create the VM,
only the networks in 'requested_networks' are used to
create vNICs, and the vNIC order is determiend by the
order in the requested_networks array.
For each vNIC, use the FlatManager to create the entries
in the virtual_interfaces table, contact Quantum to
create a port and attachment the vNIC, and use the IPAM
lib to allocate IP addresses.
"""
instance_id = kwargs['instance_id']
rxtx_factor = kwargs['rxtx_factor']
host = kwargs['host']
project_id = kwargs['project_id']
LOG.debug(_("network allocations for instance %s"), project_id)
requested_networks = kwargs.get('requested_networks')
instance = db.instance_get(context, instance_id)
net_proj_pairs = self.ipam.get_project_and_global_net_ids(context,
project_id)
if requested_networks:
# need to figure out if a requested network is owned
# by the tenant, or by the provider
# Note: these are the only possible options, as the compute
# API already validated networks using validate_network()
proj_net_ids = set([p[0] for p in net_proj_pairs if p[1]])
net_proj_pairs = []
for net_id, _i in requested_networks:
if net_id in proj_net_ids:
net_proj_pairs.append((net_id, project_id))
else:
net_proj_pairs.append((net_id, None))
# Create a port via quantum and attach the vif
for proj_pair in net_proj_pairs:
network = self.get_network(context, proj_pair)
# TODO(tr3buchet): broken. Virtual interfaces require an integer
# network ID and it is not nullable
vif_rec = self.add_virtual_interface(context,
instance['uuid'],
network['id'],
project_id)
# talk to Quantum API to create and attach port.
nova_id = self._get_nova_id(instance)
# Tell the ipam library to allocate an IP
ips = self.ipam.allocate_fixed_ips(context, project_id,
network['quantum_net_id'], network['net_tenant_id'],
vif_rec)
pairs = []
# Set up port security if enabled
if FLAGS.quantum_use_port_security:
if FLAGS.quantum_port_security_include_link_local:
mac = netaddr.EUI(vif_rec['address'])
ips.append(str(mac.ipv6_link_local()))
pairs = [{'mac_address': vif_rec['address'],
'ip_address': ip} for ip in ips]
self.q_conn.create_and_attach_port(network['net_tenant_id'],
network['quantum_net_id'],
vif_rec['uuid'],
vm_id=instance['uuid'],
rxtx_factor=rxtx_factor,
nova_id=nova_id,
allowed_address_pairs=pairs)
# Set up/start the dhcp server for this network if necessary
if FLAGS.quantum_use_dhcp:
if network['host'] == self.host:
self.enable_dhcp(context, network['quantum_net_id'],
network, vif_rec, network['net_tenant_id'])
else:
topic = rpc.queue_get_for(context,
FLAGS.network_topic, network['host'])
rpc.call(context, topic, {'method': 'enable_dhcp',
'args': {'quantum_net_id': network['quantum_net_id'],
'network_ref': network,
'vif_rec': vif_rec,
'project_id': network['net_tenant_id']}})
return self.get_instance_nw_info(context, instance_id,
instance['uuid'],
rxtx_factor, host,
project_id=project_id)
def get_network(self, context, proj_pair):
(quantum_net_id, net_tenant_id) = proj_pair
net_tenant_id = net_tenant_id or FLAGS.quantum_default_tenant_id
# FIXME(danwent): We'd like to have the manager be
# completely decoupled from the nova networks table.
# However, other parts of nova sometimes go behind our
# back and access network data directly from the DB. So
# for now, the quantum manager knows that there is a nova
# networks DB table and accesses it here. updating the
# virtual_interfaces table to use UUIDs would be one
# solution, but this would require significant work
# elsewhere.
admin_context = context.elevated()
# We may not be able to get a network_ref here if this network
# isn't in the database (i.e. it came from Quantum).
network_ref = db.network_get_by_uuid(admin_context,
quantum_net_id)
if network_ref is None:
network_ref = {}
network_ref = {"uuid": quantum_net_id,
"project_id": net_tenant_id,
# NOTE(bgh): We need to document this somewhere but since
# we don't know the priority of any networks we get from
# quantum we just give them a priority of 0. If its
# necessary to specify the order of the vifs and what
# network they map to then the user will have to use the
# OSCreateServer extension and specify them explicitly.
#
# In the future users will be able to tag quantum networks
# with a priority .. and at that point we can update the
# code here to reflect that.
"priority": 0,
"id": 'NULL',
"label": "quantum-net-%s" % quantum_net_id}
network_ref['net_tenant_id'] = net_tenant_id
network_ref['quantum_net_id'] = quantum_net_id
return network_ref
@manager.wrap_check_policy
def get_instance_uuids_by_ip_filter(self, context, filters):
# This is not returning the instance IDs like the method name
# would make you think; it is matching the return format of
# the method it's overriding.
instance_ids = self.ipam.get_instance_ids_by_ip_address(
context, filters.get('ip'))
instances = [db.instance_get(context, id) for id in instance_ids]
return [{'instance_uuid':instance.uuid} for instance in instances]
@utils.synchronized('quantum-enable-dhcp')
def enable_dhcp(self, context, quantum_net_id, network_ref, vif_rec,
project_id):
LOG.info("Using DHCP for network: %s" % network_ref['label'])
# Figure out the ipam tenant id for this subnet: We need to
# query for the tenant_id since the network could be created
# with the project_id as the tenant or the default tenant.
ipam_tenant_id = self.ipam.get_tenant_id_by_net_id(context,
quantum_net_id, vif_rec['uuid'], project_id)
# Figure out what subnets correspond to this network
subnets = self.ipam.get_subnets_by_net_id(context,
ipam_tenant_id, quantum_net_id, vif_rec['uuid'])
# Set up (or find) the dhcp server for each of the subnets
# returned above (both v4 and v6).
for subnet in subnets:
if subnet is None or subnet['cidr'] is None:
continue
# Fill in some of the network fields that we would have
# previously gotten from the network table (they'll be
# passed to the linux_net functions).
network_ref['cidr'] = subnet['cidr']
n = netaddr.IPNetwork(subnet['cidr'])
# NOTE(tr3buchet): should probably not always assume first+1
network_ref['dhcp_server'] = netaddr.IPAddress(n.first + 1)
# TODO(bgh): Melange should probably track dhcp_start
# TODO(tr3buchet): melange should store dhcp_server as well
if (not 'dhcp_start' in network_ref or
network_ref['dhcp_start'] is None):
network_ref['dhcp_start'] = netaddr.IPAddress(n.first + 2)
network_ref['broadcast'] = netaddr.IPAddress(n.broadcast)
network_ref['gateway'] = subnet['gateway']
# Construct the interface id that we'll use for the bridge
interface_id = self.driver.get_dev(network_ref)
network_ref['bridge'] = interface_id
# Query quantum to see if we've already created a port for
# the gateway device and attached the device to the port.
# If we haven't then we need to intiialize it and create
# it. This device will be the one serving dhcp via
# dnsmasq.
q_tenant_id = project_id or FLAGS.quantum_default_tenant_id
port = self.q_conn.get_port_by_attachment(q_tenant_id,
quantum_net_id, interface_id)
if not port: # No dhcp server has been started
self.l3driver.initialize_gateway(network_ref)
LOG.debug("Intializing DHCP for network: %s" %
network_ref)
self.q_conn.create_and_attach_port(q_tenant_id,
quantum_net_id, interface_id)
hosts = self.get_dhcp_hosts_text(context,
subnet['network_id'], project_id)
self.driver.update_dhcp_hostfile_with_text(interface_id, hosts)
self.driver.restart_dhcp(context, interface_id, network_ref)
def add_virtual_interface(self, context, instance_uuid, network_id,
net_tenant_id):
# If we're not using melange, use the default means...
if FLAGS.use_melange_mac_generation:
return self._add_virtual_interface(context, instance_uuid,
network_id, net_tenant_id)
return super(QuantumManager, self).add_virtual_interface(context,
instance_uuid,
network_id)
def _add_virtual_interface(self, context, instance_uuid, network_id,
net_tenant_id):
vif = {'instance_uuid': instance_uuid,
'network_id': network_id,
'uuid': str(utils.gen_uuid())}
# TODO(Vek): Ideally, we would have a VirtualInterface class
# that would take care of delegating to whoever it
# needs to get information from. We'll look at
# this after Trey's refactorings...
m_ipam = melange_ipam_lib.get_ipam_lib(self)
vif['address'] = m_ipam.create_vif(vif['uuid'],
vif['instance_uuid'],
net_tenant_id)
return self.db.virtual_interface_create(context, vif)
def get_instance_nw_info(self, context, instance_id, instance_uuid,
rxtx_factor, host, **kwargs):
"""This method is used by compute to fetch all network data
that should be used when creating the VM.
The method simply loops through all virtual interfaces
stored in the nova DB and queries the IPAM lib to get
the associated IP data.
The format of returned data is 'defined' by the initial
set of NetworkManagers found in nova/network/manager.py .
Ideally this 'interface' will be more formally defined
in the future.
"""
project_id = kwargs['project_id']
vifs = db.virtual_interface_get_by_instance(context, instance_uuid)
net_tenant_dict = dict((net_id, tenant_id)
for (net_id, tenant_id)
in self.ipam.get_project_and_global_net_ids(
context, project_id))
networks = {}
for vif in vifs:
if vif.get('network_id') is not None:
network = db.network_get(context.elevated(), vif['network_id'])
net_tenant_id = net_tenant_dict[network['uuid']]
if net_tenant_id is None:
net_tenant_id = FLAGS.quantum_default_tenant_id
network = {'id': network['id'],
'uuid': network['uuid'],
'bridge': '', # Quantum ignores this field
'label': network['label'],
'injected': FLAGS.flat_injected,
'project_id': net_tenant_id}
networks[vif['uuid']] = network
# update instance network cache and return network_info
nw_info = self.build_network_info_model(context, vifs, networks,
rxtx_factor, host)
db.instance_info_cache_update(context, instance_uuid,
{'network_info': nw_info.json()})
return nw_info
def deallocate_for_instance(self, context, **kwargs):
"""Called when a VM is terminated. Loop through each virtual
interface in the Nova DB and remove the Quantum port and
clear the IP allocation using the IPAM. Finally, remove
the virtual interfaces from the Nova DB.
"""
instance_id = kwargs.get('instance_id')
project_id = kwargs.pop('project_id', None)
admin_context = context.elevated()
instance = db.instance_get(context, instance_id)
vifs = db.virtual_interface_get_by_instance(admin_context,
instance['uuid'])
for vif in vifs:
network = db.network_get(admin_context, vif['network_id'])
self.deallocate_port(vif['uuid'], network['uuid'], project_id,
instance_id)
ipam_tenant_id = self.deallocate_ip_address(context,
network['uuid'], project_id, vif, instance_id)
if FLAGS.quantum_use_dhcp:
if network['host'] == self.host:
self.update_dhcp(context, ipam_tenant_id, network,
vif, project_id)
else:
topic = rpc.queue_get_for(context,
FLAGS.network_topic, network['host'])
rpc.call(context, topic, {'method': 'update_dhcp',
'args': {'ipam_tenant_id': ipam_tenant_id,
'network_ref': network,
'vif_ref': vif,
'project_id': network['project_id']}})
db.virtual_interface_delete(admin_context, vif['id'])
def deallocate_port(self, interface_id, net_id, q_tenant_id, instance_id):
port_id = None
try:
port_id = self.q_conn.get_port_by_attachment(q_tenant_id,
net_id, interface_id)
if not port_id:
q_tenant_id = FLAGS.quantum_default_tenant_id
port_id = self.q_conn.get_port_by_attachment(
q_tenant_id, net_id, interface_id)
if not port_id:
LOG.error("Unable to find port with attachment: %s" %
(interface_id))
else:
self.q_conn.detach_and_delete_port(q_tenant_id,
net_id, port_id)
except Exception:
# except anything so the rest of deallocate can succeed
LOG.exception(_('port deallocation failed for instance: '
'|%(instance_id)s|, port_id: |%(port_id)s|') % locals())
def deallocate_ip_address(self, context, net_id,
project_id, vif_ref, instance_id):
ipam_tenant_id = None
try:
ipam_tenant_id = self.ipam.get_tenant_id_by_net_id(context,
net_id,
vif_ref['uuid'],
project_id)
self.ipam.deallocate_ips_by_vif(context, ipam_tenant_id,
net_id, vif_ref)
except Exception:
# except anything so the rest of deallocate can succeed
vif_uuid = vif_ref['uuid']
msg = _('ipam deallocation failed for instance: '
'|%(instance_id)s|, vif_uuid: |%(vif_uuid)s|') % locals()
LOG.exception(msg)
return ipam_tenant_id
# enable_dhcp() could also use this
#
# Use semaphore to :
# 1) avoid race between restart_dhcps
# 2) avoid race between update_dhcp_hostfile_with_texts
@utils.synchronized('quantum-restart-dhcp')
def _atomic_restart_dhcp(self, context, dev, hosts, network_ref):
self.driver.update_dhcp_hostfile_with_text(dev, hosts)
# Since we only update hosts file, explict kill_dhcp() isn't needed.
# restart_dhcp() will reload hostfile if dnsmasq is already running,
# or start new dnsmasq
self.driver.restart_dhcp(context, dev, network_ref)
# TODO(bgh): At some point we should consider merging enable_dhcp() and
# update_dhcp()
# TODO(tr3buchet): agree, i'm curious why they differ even now..
def update_dhcp(self, context, ipam_tenant_id, network_ref, vif_ref,
project_id):
# Figure out what subnet corresponds to this network/vif
subnets = self.ipam.get_subnets_by_net_id(context,
ipam_tenant_id, network_ref['uuid'], vif_ref['uuid'])
for subnet in subnets:
if subnet is None:
continue
# Fill in some of the network fields that we would have
# previously gotten from the network table (they'll be
# passed to the linux_net functions).
if subnet['cidr']:
network_ref['cidr'] = subnet['cidr']
n = netaddr.IPNetwork(network_ref['cidr'])
network_ref['dhcp_server'] = netaddr.IPAddress(n.first + 1)
network_ref['dhcp_start'] = netaddr.IPAddress(n.first + 2)
network_ref['broadcast'] = netaddr.IPAddress(n.broadcast)
network_ref['gateway'] = netaddr.IPAddress(n.first + 1)
dev = self.driver.get_dev(network_ref)
# And remove the dhcp mappings for the subnet
hosts = self.get_dhcp_hosts_text(context,
subnet['network_id'], project_id)
self._atomic_restart_dhcp(context, dev, hosts, network_ref)
def validate_networks(self, context, networks):
"""Validates that this tenant has quantum networks with the associated
UUIDs. This is called by the 'os-create-server-ext' API extension
code so that we can return an API error code to the caller if they
request an invalid network.
"""
if networks is None:
return
project_id = context.project_id
for (net_id, _i) in networks:
# TODO(bgh): At some point we should figure out whether or
# not we want the verify_subnet_exists call to be optional.
if not self.ipam.verify_subnet_exists(context, project_id,
net_id):
raise exception.NetworkNotFound(network_id=net_id)
is_tenant_net = self.q_conn.network_exists(project_id, net_id)
is_provider_net = self.q_conn.network_exists(
FLAGS.quantum_default_tenant_id,
net_id)
if not (is_tenant_net or is_provider_net):
raise exception.NetworkNotFound(network_id=net_id)
def get_dhcp_hosts_text(self, context, subnet_id, project_id=None):
ips = self.ipam.get_allocated_ips(context, subnet_id, project_id)
hosts_text = ""
admin_context = context.elevated()
for ip in ips:
address, vif_id = ip
vif = db.virtual_interface_get_by_uuid(admin_context, vif_id)
mac_address = vif['address']
text = "%s,%s.%s,%s\n" % (mac_address, "host-" + address,
FLAGS.dhcp_domain, address)
hosts_text += text
LOG.debug("DHCP hosts: %s" % hosts_text)
return hosts_text
def get_dhcp_leases(self, context, network_ref):
"""Return a network's hosts config in dnsmasq leasefile format."""
subnet_id = network_ref['uuid']
project_id = network_ref['project_id']
ips = self.ipam.get_allocated_ips(context, subnet_id, project_id)
leases_text = ""
admin_context = context.elevated()
for ip in ips:
address, vif_id = ip
vif = db.virtual_interface_get_by_uuid(admin_context, vif_id)
mac_address = vif['address']
text = ("%s %s %s %s *\n" % (int(time.time()) -
FLAGS.dhcp_lease_time,
mac_address, address, '*'))
leases_text += text
LOG.debug("DHCP leases: %s" % leases_text)
return leases_text
def setup_networks_on_host(self, *args, **kwargs):
# no host specific setup is needed in quantum manager
pass
-202
View File
@@ -1,202 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib
import socket
import time
import urllib
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
melange_opts = [
cfg.StrOpt('melange_host',
default='127.0.0.1',
help='HOST for connecting to melange'),
cfg.IntOpt('melange_port',
default=9898,
help='PORT for connecting to melange'),
cfg.IntOpt('melange_num_retries',
default=0,
help='Number retries when contacting melange'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(melange_opts)
LOG = logging.getLogger(__name__)
json_content_type = {'Content-type': "application/json"}
# FIXME(danwent): talk to the Melange folks about creating a
# client lib that we can import as a library, instead of
# have to have all of the client code in here.
class MelangeConnection(object):
def __init__(self, host=None, port=None, use_ssl=False):
if host is None:
host = FLAGS.melange_host
if port is None:
port = FLAGS.melange_port
self.host = host
self.port = port
self.use_ssl = use_ssl
self.version = "v0.1"
def get(self, path, params=None, headers=None):
return self.do_request("GET", path, params=params, headers=headers,
retries=FLAGS.melange_num_retries)
def post(self, path, body=None, headers=None):
return self.do_request("POST", path, body=body, headers=headers)
def delete(self, path, headers=None):
return self.do_request("DELETE", path, headers=headers)
def _get_connection(self):
if self.use_ssl:
return httplib.HTTPSConnection(self.host, self.port)
else:
return httplib.HTTPConnection(self.host, self.port)
def do_request(self, method, path, body=None, headers=None, params=None,
content_type=".json", retries=0):
headers = headers or {}
params = params or {}
url = "/%s/%s%s" % (self.version, path, content_type)
if params:
url += "?%s" % urllib.urlencode(params)
for i in xrange(retries + 1):
connection = self._get_connection()
try:
connection.request(method, url, body, headers)
response = connection.getresponse()
response_str = response.read()
if response.status < 400:
return response_str
raise Exception(_("Server returned error: %s") % response_str)
except (socket.error, IOError), e:
LOG.exception(_('Connection error contacting melange'
' service, retrying'))
time.sleep(1)
raise exception.MelangeConnectionFailed(
reason=_("Maximum attempts reached"))
def allocate_ip(self, network_id, network_tenant_id, vif_id,
project_id=None, mac_address=None):
LOG.info(_("allocate IP on network |%(network_id)s| "
"belonging to |%(network_tenant_id)s| "
"to this vif |%(vif_id)s| with mac |%(mac_address)s| "
"belonging to |%(project_id)s| ") % locals())
tenant_scope = "/tenants/%s" % (network_tenant_id
if network_tenant_id else "")
network_info = dict(network=dict(mac_address=mac_address,
tenant_id=project_id))
request_body = jsonutils.dumps(network_info) if mac_address else None
url = ("ipam%(tenant_scope)s/networks/%(network_id)s/"
"interfaces/%(vif_id)s/ip_allocations" % locals())
response = self.post(url, body=request_body, headers=json_content_type)
return jsonutils.loads(response)['ip_addresses']
def create_block(self, network_id, cidr,
project_id=None, gateway=None, dns1=None, dns2=None):
tenant_scope = "/tenants/%s" % project_id if project_id else ""
url = "ipam%(tenant_scope)s/ip_blocks" % locals()
req_params = dict(ip_block=dict(cidr=cidr, network_id=network_id,
type='private', gateway=gateway,
dns1=dns1, dns2=dns2))
self.post(url,
body=jsonutils.dumps(req_params),
headers=json_content_type)
def delete_block(self, block_id, project_id=None):
tenant_scope = "/tenants/%s" % project_id if project_id else ""
url = "ipam%(tenant_scope)s/ip_blocks/%(block_id)s" % locals()
self.delete(url, headers=json_content_type)
def get_blocks(self, project_id=None):
tenant_scope = "/tenants/%s" % project_id if project_id else ""
url = "ipam%(tenant_scope)s/ip_blocks" % locals()
response = self.get(url, headers=json_content_type)
return jsonutils.loads(response)
def get_routes(self, block_id, project_id=None):
tenant_scope = "/tenants/%s" % project_id if project_id else ""
url = ("ipam%(tenant_scope)s/ip_blocks/%(block_id)s/ip_routes" %
locals())
response = self.get(url, headers=json_content_type)
return jsonutils.loads(response)['ip_routes']
def get_allocated_ips(self, network_id, vif_id, project_id=None):
tenant_scope = "/tenants/%s" % project_id if project_id else ""
url = ("ipam%(tenant_scope)s/networks/%(network_id)s/"
"interfaces/%(vif_id)s/ip_allocations" % locals())
response = self.get(url, headers=json_content_type)
return jsonutils.loads(response)['ip_addresses']
def get_allocated_ips_by_address(self, address):
url = "ipam/allocated_ip_addresses"
response = self.get(url, params={'address': address},
headers=json_content_type)
return jsonutils.loads(response).get('ip_addresses', [])
def get_allocated_ips_for_network(self, network_id, project_id=None):
tenant_scope = "/tenants/%s" % project_id if project_id else ""
url = ("ipam%(tenant_scope)s/allocated_ip_addresses" % locals())
# TODO(bgh): This request fails if you add the ".json" to the end so
# it has to call do_request itself. Melange bug?
response = self.do_request("GET", url, content_type="")
return jsonutils.loads(response)['ip_addresses']
def deallocate_ips(self, network_id, vif_id, project_id=None):
tenant_scope = "/tenants/%s" % project_id if project_id else ""
url = ("ipam%(tenant_scope)s/networks/%(network_id)s/"
"interfaces/%(vif_id)s/ip_allocations" % locals())
self.delete(url, headers=json_content_type)
def create_vif(self, vif_id, instance_id, project_id=None):
url = "ipam/interfaces"
request_body = dict(interface=dict(id=vif_id, tenant_id=project_id,
device_id=instance_id))
response = self.post(url,
body=jsonutils.dumps(request_body),
headers=json_content_type)
return jsonutils.loads(response)['interface']['mac_address']
-261
View File
@@ -1,261 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from nova import db
from nova import exception
from nova import flags
from nova.network.quantum import melange_connection
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
FLAGS = flags.FLAGS
def get_ipam_lib(net_man):
return QuantumMelangeIPAMLib()
class QuantumMelangeIPAMLib(object):
"""Implements Quantum IP Address Management (IPAM) interface
using the Melange service, which is access using the Melange
web services API.
"""
def __init__(self):
"""Initialize class used to connect to Melange server"""
self.m_conn = melange_connection.MelangeConnection()
def create_subnet(self, context, label, project_id,
quantum_net_id, priority, cidr=None,
gateway=None, gateway_v6=None, cidr_v6=None,
dns1=None, dns2=None):
"""Contact Melange and create a subnet for any non-NULL
IPv4 or IPv6 subnets.
Also create an entry in the Nova networks DB, but only
to store values not represented in Melange or to
temporarily provide compatibility with Nova code that
accesses IPAM data directly via the DB (e.g., nova-api)
"""
tenant_id = project_id or FLAGS.quantum_default_tenant_id
if cidr:
self.m_conn.create_block(quantum_net_id, cidr,
project_id=tenant_id,
gateway=gateway,
dns1=dns1, dns2=dns2)
if cidr_v6:
self.m_conn.create_block(quantum_net_id, cidr_v6,
project_id=tenant_id,
gateway=gateway_v6,
dns1=dns1, dns2=dns2)
net = {"uuid": quantum_net_id,
"project_id": tenant_id,
"priority": priority,
"label": label}
if FLAGS.quantum_use_dhcp:
if cidr:
n = netaddr.IPNetwork(cidr)
net['dhcp_start'] = netaddr.IPAddress(n.first + 2)
else:
net['dhcp_start'] = None
admin_context = context.elevated()
network = db.network_create_safe(admin_context, net)
def allocate_fixed_ips(self, context, project_id, quantum_net_id,
network_tenant_id, vif_ref):
"""Pass call to allocate fixed IP on to Melange"""
ips = self.m_conn.allocate_ip(quantum_net_id, network_tenant_id,
vif_ref['uuid'], project_id,
vif_ref['address'])
return [ip['address'] for ip in ips]
def delete_subnets_by_net_id(self, context, net_id, project_id):
"""Find Melange block associated with the Quantum UUID,
then tell Melange to delete that block.
"""
admin_context = context.elevated()
tenant_id = project_id or FLAGS.quantum_default_tenant_id
all_blocks = self.m_conn.get_blocks(tenant_id)
for b in all_blocks['ip_blocks']:
if b['network_id'] == net_id:
self.m_conn.delete_block(b['id'], tenant_id)
network = db.network_get_by_uuid(admin_context, net_id)
db.network_delete_safe(context, network['id'])
def get_networks_by_tenant(self, admin_context, tenant_id):
nets = {}
blocks = self.m_conn.get_blocks(tenant_id)
for ip_block in blocks['ip_blocks']:
network_id = ip_block['network_id']
network = db.network_get_by_uuid(admin_context, network_id)
nets[network_id] = network
return nets.values()
def get_global_networks(self, admin_context):
return self.get_networks_by_tenant(admin_context,
FLAGS.quantum_default_tenant_id)
def get_project_networks(self, admin_context):
try:
nets = db.network_get_all(admin_context.elevated())
except exception.NoNetworksFound:
return []
# only return networks with a project_id set
return [net for net in nets if net['project_id']]
def get_project_and_global_net_ids(self, context, project_id):
"""Fetches all networks associated with this project, or
that are "global" (i.e., have no project set).
Returns list sorted by 'priority' (lowest integer value
is highest priority).
"""
if project_id is None:
raise Exception(_("get_project_and_global_net_ids must be called"
" with a non-null project_id"))
admin_context = context.elevated()
# Decorate with priority
priority_nets = []
for tenant_id in (project_id, FLAGS.quantum_default_tenant_id):
nets = self.get_networks_by_tenant(admin_context, tenant_id)
for network in nets:
priority = network['priority']
priority_nets.append((priority, network['uuid'], tenant_id))
# Sort by priority
priority_nets.sort()
# Undecorate
return [(network_id, tenant_id)
for priority, network_id, tenant_id in priority_nets]
def get_tenant_id_by_net_id(self, context, net_id, vif_id, project_id):
ipam_tenant_id = None
tenant_ids = [FLAGS.quantum_default_tenant_id, project_id, None]
# This is confusing, if there are IPs for the given net, vif,
# tenant trifecta we assume that is the tenant for that network
for tid in tenant_ids:
try:
self.m_conn.get_allocated_ips(net_id, vif_id, tid)
except Exception:
continue
ipam_tenant_id = tid
break
return ipam_tenant_id
# TODO(bgh): Rename this method .. it's now more of a
# "get_subnets_by_net_id_and_vif_id" method, but we could probably just
# call it "get_subnets".
def get_subnets_by_net_id(self, context, tenant_id, net_id, vif_id):
"""Returns information about the IPv4 and IPv6 subnets
associated with a Quantum Network UUID.
"""
subnets = []
ips = self.m_conn.get_allocated_ips(net_id, vif_id, tenant_id)
for ip_address in ips:
block = ip_address['ip_block']
subnet = {'network_id': block['network_id'],
'id': block['id'],
'cidr': block['cidr'],
'gateway': block['gateway'],
'broadcast': block['broadcast'],
'netmask': block['netmask'],
'dns1': block['dns1'],
'dns2': block['dns2']}
if ip_address['version'] == 4:
subnet['version'] = 4
else:
subnet['version'] = 6
subnets.append(subnet)
return subnets
def get_routes_by_ip_block(self, context, block_id, project_id):
"""Returns the list of routes for the IP block"""
return self.m_conn.get_routes(block_id, project_id)
def get_v4_ips_by_interface(self, context, net_id, vif_id, project_id):
"""Returns a list of IPv4 address strings associated with
the specified virtual interface.
"""
return self._get_ips_by_interface(context, net_id, vif_id,
project_id, 4)
def get_v6_ips_by_interface(self, context, net_id, vif_id, project_id):
"""Returns a list of IPv6 address strings associated with
the specified virtual interface.
"""
return self._get_ips_by_interface(context, net_id, vif_id,
project_id, 6)
def _get_ips_by_interface(self, context, net_id, vif_id, project_id,
ip_version):
"""Helper method to fetch v4 or v6 addresses for a particular
virtual interface.
"""
tenant_id = project_id or FLAGS.quantum_default_tenant_id
ip_list = self.m_conn.get_allocated_ips(net_id, vif_id, tenant_id)
return [ip['address'] for ip in ip_list
if netaddr.IPNetwork(ip['address']).version == ip_version]
def get_instance_ids_by_ip_address(self, context, address):
ips = self.m_conn.get_allocated_ips_by_address(address)
# TODO(aaron.lee): melange should be storing & returning instance_uuid!
return [ip.get('used_by_device') for ip in ips]
def verify_subnet_exists(self, context, project_id, quantum_net_id):
"""Confirms that a subnet exists that is associated with the
specified Quantum Network UUID.
"""
# TODO(bgh): Would be nice if we could just do something like:
# GET /ipam/tenants/{tenant_id}/networks/{network_id}/ instead
# of searching through all the blocks. Checking for a 404
# will then determine whether it exists.
tenant_id = project_id or FLAGS.quantum_default_tenant_id
all_blocks = self.m_conn.get_blocks(tenant_id)
for b in all_blocks['ip_blocks']:
if b['network_id'] == quantum_net_id:
return True
return False
def deallocate_ips_by_vif(self, context, project_id, net_id, vif_ref):
"""Deallocate all fixed IPs associated with the specified
virtual interface.
"""
tenant_id = project_id or FLAGS.quantum_default_tenant_id
self.m_conn.deallocate_ips(net_id, vif_ref['uuid'], tenant_id)
def get_allocated_ips(self, context, subnet_id, project_id):
ips = self.m_conn.get_allocated_ips_for_network(subnet_id, project_id)
return [(ip['address'], ip['interface_id']) for ip in ips]
def create_vif(self, vif_id, instance_id, project_id=None):
"""Create a new vif with the specified information.
"""
tenant_id = project_id or FLAGS.quantum_default_tenant_id
return self.m_conn.create_vif(vif_id, instance_id, tenant_id)
def get_floating_ips_by_fixed_address(self, context, fixed_address):
"""This call is not supported in quantum yet"""
return []
-159
View File
@@ -1,159 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nicira Networks
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import flags
from nova.network.quantum import client as quantum_client
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
quantum_opts = [
cfg.StrOpt('quantum_connection_host',
default='127.0.0.1',
help='HOST for connecting to quantum'),
cfg.IntOpt('quantum_connection_port',
default=9696,
help='PORT for connecting to quantum'),
cfg.StrOpt('quantum_default_tenant_id',
default="default",
help='Default tenant id when creating quantum networks'),
cfg.IntOpt('quantum_request_timeout',
default=20,
help='Maximum amount of time to wait for quantum request'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(quantum_opts)
class QuantumClientConnection(object):
"""Abstracts connection to Quantum service into higher level
operations performed by the QuantumManager.
Separating this out as a class also let's us create a 'fake'
version of this class for unit tests.
"""
def __init__(self, client=None):
"""Initialize Quantum client class based on flags."""
if client:
self.client = client
else:
self.client = quantum_client.Client(FLAGS.quantum_connection_host,
FLAGS.quantum_connection_port,
timeout=FLAGS.quantum_request_timeout,
format="json",
logger=LOG)
def create_network(self, tenant_id, network_name, **kwargs):
"""Create network using specified name, return Quantum
network UUID.
"""
data = {'network': {'name': network_name}}
for kw in kwargs:
data['network'][kw] = kwargs[kw]
resdict = self.client.create_network(data, tenant=tenant_id)
return resdict["network"]["id"]
def get_network_name(self, tenant_id, network_id):
net = self.client.show_network_details(network_id, tenant=tenant_id)
return net["network"]["name"]
def delete_network(self, tenant_id, net_id):
"""Deletes Quantum network with specified UUID."""
self.client.delete_network(net_id, tenant=tenant_id)
def network_exists(self, tenant_id, net_id):
"""Determine if a Quantum network exists for the
specified tenant.
"""
try:
self.client.show_network_details(net_id, tenant=tenant_id)
return True
except quantum_client.QuantumNotFoundException:
# Not really an error. Real errors will be propogated to caller
return False
def get_networks(self, tenant_id):
"""Retrieve all networks for this tenant"""
return self.client.list_networks(tenant=tenant_id)
def create_and_attach_port(self, tenant_id, net_id, interface_id,
**kwargs):
"""Creates a Quantum port on the specified network, sets
status to ACTIVE to enable traffic, and attaches the
vNIC with the specified interface-id.
"""
LOG.debug(_("Connecting interface %(interface_id)s to "
"net %(net_id)s for %(tenant_id)s"), locals())
port_data = {'port': {'state': 'ACTIVE'}}
for kw in kwargs:
port_data['port'][kw] = kwargs[kw]
resdict = self.client.create_port(net_id, port_data, tenant=tenant_id)
port_id = resdict["port"]["id"]
attach_data = {'attachment': {'id': interface_id}}
self.client.attach_resource(net_id, port_id, attach_data,
tenant=tenant_id)
def detach_and_delete_port(self, tenant_id, net_id, port_id):
"""Detach and delete the specified Quantum port."""
LOG.debug(_("Deleting port %(port_id)s on net %(net_id)s"
" for %(tenant_id)s"), locals())
self.client.detach_resource(net_id, port_id, tenant=tenant_id)
self.client.delete_port(net_id, port_id, tenant=tenant_id)
def get_port_by_attachment(self, tenant_id, net_id, attachment_id):
"""Given a tenant and network, search for the port UUID that
has the specified interface-id attachment.
"""
port_list = []
try:
port_list_resdict = self.client.list_ports(net_id,
tenant=tenant_id,
filter_ops={'attachment': attachment_id})
port_list = port_list_resdict["ports"]
except quantum_client.QuantumNotFoundException:
return None
port_list_len = len(port_list)
if port_list_len == 1:
return port_list[0]['id']
elif port_list_len > 1:
raise Exception("Expected single port with attachment "
"%(attachment_id)s, found %(port_list_len)s" % locals())
return None
def get_attached_ports(self, tenant_id, network_id):
rv = []
port_list = self.client.list_ports(network_id, tenant=tenant_id)
for p in port_list["ports"]:
try:
port_id = p["id"]
port = self.client.show_port_attachment(network_id,
port_id, tenant=tenant_id)
# Skip ports without an attachment
if "id" not in port["attachment"]:
continue
rv.append({'port-id': port_id,
'attachment': port["attachment"]["id"]})
except quantum_client.QuantumNotFoundException:
pass
return rv
+1 -1
View File
@@ -20,7 +20,7 @@ from nova import db
from nova import exception
from nova import flags
from nova.network import manager as network_manager
from nova.network.quantum import nova_ipam_lib
from nova.network import nova_ipam_lib
from nova import utils
-647
View File
@@ -1,647 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011,2012 Nicira, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mox
from nova import context
from nova import db
from nova.db.sqlalchemy import models
from nova.db.sqlalchemy import session as sql_session
from nova import exception
from nova import flags
from nova.network.quantum import client as quantum_client
from nova.network.quantum import fake_client
from nova.network.quantum import manager as quantum_manager
from nova.network.quantum import melange_connection
from nova.network.quantum import melange_ipam_lib
from nova.network.quantum import quantum_connection
from nova.openstack.common import log as logging
from nova import test
from nova import utils
LOG = logging.getLogger(__name__)
FLAGS = flags.FLAGS
networks = [{'label': 'project1-net1',
'injected': False,
'multi_host': False,
'cidr': '100.168.0.0/24',
'cidr_v6': '100:1db8::/64',
'gateway_v6': '100:1db8::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '100.168.0.1',
'broadcast': '100.168.0.255',
'dns1': '8.8.8.8',
'vlan': None,
'host': None,
'vpn_public_address': None,
'project_id': 'fake_project1',
'priority': 1},
{'label': 'project2-net1',
'injected': False,
'multi_host': False,
'cidr': '101.168.1.0/24',
'cidr_v6': '101:1db9::/64',
'gateway_v6': '101:1db9::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '101.168.1.1',
'broadcast': '101.168.1.255',
'dns1': '8.8.8.8',
'vlan': None,
'host': None,
'project_id': 'fake_project2',
'priority': 1},
{'label': "public",
'injected': False,
'multi_host': False,
'cidr': '102.0.0.0/24',
'cidr_v6': '102:1dba::/64',
'gateway_v6': '102:1dba::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '102.0.0.1',
'broadcast': '102.0.0.255',
'dns1': '8.8.8.8',
'vlan': None,
'host': None,
'project_id': None,
'priority': 0},
{'label': "project2-net2",
'injected': False,
'multi_host': False,
'cidr': '103.0.0.0/24',
'cidr_v6': '103:1dbb::/64',
'gateway_v6': '103:1dbb::1',
'netmask_v6': '64',
'netmask': '255.255.255.0',
'bridge': None,
'bridge_interface': None,
'gateway': '103.0.0.1',
'broadcast': '103.0.0.255',
'dns1': '8.8.8.8',
'vlan': None,
'host': None,
'project_id': "fake_project2",
'priority': 2}]
class QuantumConnectionTestCase(test.TestCase):
def test_connection(self):
fc = fake_client.FakeClient(LOG)
qc = quantum_connection.QuantumClientConnection(client=fc)
t = "tenant1"
net1_name = "net1"
net1_uuid = qc.create_network(t, net1_name)
self.assertEquals(net1_name, qc.get_network_name(t, net1_uuid))
self.assertTrue(qc.network_exists(t, net1_uuid))
self.assertFalse(qc.network_exists(t, "fake-uuid"))
self.assertFalse(qc.network_exists("fake-tenant", net1_uuid))
nets = qc.get_networks(t)['networks']
self.assertEquals(len(nets), 1)
self.assertEquals(nets[0]['id'], net1_uuid)
num_ports = 10
for i in range(0, num_ports):
qc.create_and_attach_port(t, net1_uuid,
'iface' + str(i), state='ACTIVE')
self.assertEquals(len(qc.get_attached_ports(t, net1_uuid)), num_ports)
for i in range(0, num_ports):
port_uuid = qc.get_port_by_attachment(t, net1_uuid,
'iface' + str(i))
self.assertTrue(port_uuid)
qc.detach_and_delete_port(t, net1_uuid, port_uuid)
self.assertEquals(len(qc.get_attached_ports(t, net1_uuid)), 0)
# test port not found
qc.create_and_attach_port(t, net1_uuid, 'foo', state='ACTIVE')
port_uuid = qc.get_port_by_attachment(t, net1_uuid, 'foo')
qc.detach_and_delete_port(t, net1_uuid, port_uuid)
self.assertRaises(quantum_client.QuantumNotFoundException,
qc.detach_and_delete_port, t,
net1_uuid, port_uuid)
qc.delete_network(t, net1_uuid)
self.assertFalse(qc.network_exists(t, net1_uuid))
self.assertEquals(len(qc.get_networks(t)['networks']), 0)
self.assertRaises(quantum_client.QuantumNotFoundException,
qc.get_network_name, t, net1_uuid)
# this is a base class to be used by other QuantumManager Test classes
class QuantumNovaTestCase(test.TestCase):
def setUp(self):
super(QuantumNovaTestCase, self).setUp()
self.flags(quantum_use_dhcp=True)
self.flags(l3_lib="nova.network.l3.LinuxNetL3")
linuxdrv = "nova.network.linux_net.LinuxOVSInterfaceDriver"
self.flags(linuxnet_interface_driver=linuxdrv)
fc = fake_client.FakeClient(LOG)
qc = quantum_connection.QuantumClientConnection(client=fc)
self.net_man = quantum_manager.QuantumManager(
ipam_lib="nova.network.quantum.nova_ipam_lib",
q_conn=qc)
def func(arg1, arg2):
pass
def func2(arg1, arg2, arg3):
pass
def func1(arg1):
pass
self.net_man.driver.update_dhcp_hostfile_with_text = func
self.net_man.driver.restart_dhcp = func2
self.net_man.driver.kill_dhcp = func1
# Tests seem to create some networks by default, which
# we don't want. So we delete them.
ctx = context.RequestContext('user1', 'fake_project1').elevated()
for n in db.network_get_all(ctx):
db.network_delete_safe(ctx, n['id'])
# Other unit tests (e.g., test_compute.py) have a nasty
# habit of of creating fixed IPs and not cleaning up, which
# can confuse these tests, so we remove all existing fixed
# ips before starting.
session = sql_session.get_session()
result = session.query(models.FixedIp).all()
with session.begin():
for fip_ref in result:
session.delete(fip_ref)
self.net_man.init_host()
def _create_network(self, n):
ctx = context.RequestContext('user1', n['project_id'])
nwks = self.net_man.create_networks(
ctx,
label=n['label'], cidr=n['cidr'],
multi_host=n['multi_host'],
num_networks=1, network_size=256,
cidr_v6=n['cidr_v6'],
gateway=n['gateway'],
gateway_v6=n['gateway_v6'], bridge=None,
bridge_interface=None, dns1=n['dns1'],
project_id=n['project_id'],
priority=n['priority'])
n['uuid'] = nwks[0]['uuid']
class QuantumAllocationTestCase(QuantumNovaTestCase):
def test_get_network_in_db(self):
context = self.mox.CreateMockAnything()
context.elevated().AndReturn('elevated')
self.mox.StubOutWithMock(db, 'network_get_by_uuid')
self.net_man.context = context
db.network_get_by_uuid('elevated', 'quantum_net_id').AndReturn(
{'uuid': 1})
self.mox.ReplayAll()
network = self.net_man.get_network(context, ('quantum_net_id',
'net_tenant_id'))
self.assertEquals(network['quantum_net_id'], 'quantum_net_id')
self.assertEquals(network['uuid'], 1)
def test_get_network_not_in_db(self):
context = self.mox.CreateMockAnything()
context.elevated().AndReturn('elevated')
self.mox.StubOutWithMock(db, 'network_get_by_uuid')
self.net_man.context = context
db.network_get_by_uuid('elevated', 'quantum_net_id').AndReturn(None)
self.mox.ReplayAll()
network = self.net_man.get_network(context, ('quantum_net_id',
'net_tenant_id'))
self.assertEquals(network['quantum_net_id'], 'quantum_net_id')
self.assertEquals(network['uuid'], 'quantum_net_id')
class QuantumDeallocationTestCase(QuantumNovaTestCase):
def test_deallocate_port(self):
quantum = self.mox.CreateMock(
quantum_connection.QuantumClientConnection)
quantum.get_port_by_attachment('q_tenant_id', 'net_id',
'interface_id').AndReturn('port_id')
quantum.detach_and_delete_port('q_tenant_id', 'net_id', 'port_id')
self.net_man.q_conn = quantum
self.mox.ReplayAll()
self.net_man.deallocate_port('interface_id', 'net_id', 'q_tenant_id',
'instance_id')
def test_deallocate_port_logs_error(self):
quantum = self.mox.CreateMock(
quantum_connection.QuantumClientConnection)
quantum.get_port_by_attachment('q_tenant_id', 'net_id',
'interface_id').AndRaise(Exception)
self.net_man.q_conn = quantum
self.mox.StubOutWithMock(quantum_manager.LOG, 'exception')
quantum_manager.LOG.exception(mox.Regex(r'port deallocation failed'))
self.mox.ReplayAll()
self.net_man.deallocate_port('interface_id', 'net_id', 'q_tenant_id',
'instance_id')
def test_deallocate_ip_address(self):
ipam = self.mox.CreateMock(melange_ipam_lib.QuantumMelangeIPAMLib)
ipam.get_tenant_id_by_net_id('context', 'net_id', {'uuid': 1},
'project_id').AndReturn('ipam_tenant_id')
self.net_man.ipam = ipam
self.mox.ReplayAll()
self.net_man.deallocate_ip_address('context', 'net_id', 'project_id',
{'uuid': 1}, 'instance_id')
def test_deallocate_ip_address_2(self):
ipam = self.mox.CreateMock(melange_ipam_lib.QuantumMelangeIPAMLib)
ipam.get_tenant_id_by_net_id('context', 'net_id', {'uuid': 1},
'project_id').AndRaise(Exception())
self.net_man.ipam = ipam
self.mox.StubOutWithMock(quantum_manager.LOG, 'exception')
quantum_manager.LOG.exception(mox.Regex(r'ipam deallocation failed'))
self.mox.ReplayAll()
self.net_man.deallocate_ip_address('context', 'net_id', 'project_id',
{'uuid': 1}, 'instance_id')
class QuantumManagerTestCase(QuantumNovaTestCase):
def test_create_and_delete_nets(self):
self._create_nets()
self._delete_nets()
def _create_nets(self):
for n in networks:
self._create_network(n)
def _delete_nets(self):
for n in networks:
ctx = context.RequestContext('user1', n['project_id'])
self.net_man.delete_network(ctx, None, n['uuid'])
self.assertRaises(exception.NoNetworksFound,
db.network_get_all, ctx.elevated())
def _validate_nw_info(self, nw_info, expected_net_labels):
self.assertEquals(len(nw_info), len(expected_net_labels))
ctx = context.RequestContext('user1', 'foo').elevated()
all_net_map = {}
for n in db.network_get_all(ctx):
all_net_map[n['label']] = n
for i in range(0, len(nw_info)):
vif = nw_info[i]
net = all_net_map[expected_net_labels[i]]
# simple test assumes that each starting prefix is unique
expected_v4_cidr_start = net['cidr'].split(".")[0].lower()
expected_v6_cidr_start = net['cidr_v6'].split(":")[0].lower()
for subnet in vif['network']['subnets']:
addr = subnet['ips'][0]['address']
if subnet['version'] == 4:
address_start = addr.split(".")[0].lower()
self.assertTrue(expected_v4_cidr_start, address_start)
else:
address_start = addr.split(":")[0].lower()
self.assertTrue(expected_v6_cidr_start, address_start)
# confirm that there is a DHCP device on corresponding net
for l in expected_net_labels:
n = all_net_map[l]
tenant_id = (n['project_id'] or
FLAGS.quantum_default_tenant_id)
ports = self.net_man.q_conn.get_attached_ports(
tenant_id, n['uuid'])
self.assertEquals(len(ports), 2) # gw + instance VIF
# make sure we aren't allowed to delete network with
# active port
self.assertRaises(exception.NetworkBusy,
self.net_man.delete_network,
ctx, None, n['uuid'])
def _check_vifs(self, expect_num_vifs):
ctx = context.RequestContext('user1', "").elevated()
self.assertEqual(len(db.virtual_interface_get_all(ctx)),
expect_num_vifs)
def _allocate_and_deallocate_instance(self, project_id, requested_networks,
expected_labels):
ctx = context.RequestContext('user1', project_id)
self._check_vifs(0)
instance_ref = db.instance_create(ctx,
{"project_id": project_id})
nw_info = self.net_man.allocate_for_instance(ctx.elevated(),
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self._check_vifs(len(nw_info))
self._validate_nw_info(nw_info, expected_labels)
nw_info = self.net_man.get_instance_nw_info(ctx, instance_ref['id'],
instance_ref['uuid'],
instance_ref['instance_type_id'], "",
project_id=project_id)
self._check_vifs(len(nw_info))
self._validate_nw_info(nw_info, expected_labels)
port_net_pairs = []
for vif in nw_info:
nid = vif['network']['id']
pid = self.net_man.q_conn.get_port_by_attachment(
project_id, nid, vif['id'])
if pid is None:
pid = self.net_man.q_conn.get_port_by_attachment(
FLAGS.quantum_default_tenant_id,
nid, vif['id'])
self.assertTrue(pid is not None)
port_net_pairs.append((pid, nid))
self.net_man.deallocate_for_instance(ctx,
instance_id=instance_ref['id'],
project_id=project_id)
for pid, nid in port_net_pairs:
self.assertRaises(quantum_client.QuantumNotFoundException,
self.net_man.q_conn.detach_and_delete_port,
project_id, nid, pid)
self.assertRaises(quantum_client.QuantumNotFoundException,
self.net_man.q_conn.detach_and_delete_port,
FLAGS.quantum_default_tenant_id, nid, pid)
self._check_vifs(0)
def test_allocate_and_deallocate_instance_static(self):
self._create_nets()
self._allocate_and_deallocate_instance("fake_project1", None,
['public', 'project1-net1'])
self._delete_nets()
def test_allocate_and_deallocate_instance_dynamic(self):
self._create_nets()
project_id = "fake_project2"
ctx = context.RequestContext('user1', project_id)
all_valid_networks = self.net_man.ipam.get_project_and_global_net_ids(
ctx, project_id)
requested_networks = [(n[0], None) for n in all_valid_networks]
self.net_man.validate_networks(ctx, requested_networks)
label_map = {}
for n in db.network_get_all(ctx.elevated()):
label_map[n['uuid']] = n['label']
expected_labels = [label_map[uid] for uid, _i in requested_networks]
self._allocate_and_deallocate_instance(project_id, requested_networks,
expected_labels)
self._delete_nets()
def test_validate_bad_network(self):
ctx = context.RequestContext('user1', 'fake_project1')
self.assertRaises(exception.NetworkNotFound,
self.net_man.validate_networks, ctx, [("", None)])
def test_create_net_external_uuid(self):
"""Tests use case where network can be created directly via
Quantum API, then the UUID is passed in via nova-manage"""
project_id = "foo_project"
ctx = context.RequestContext('user1', project_id)
net_id = self.net_man.q_conn.create_network(project_id, 'net1')
self.net_man.create_networks(
ctx,
label='achtungbaby',
cidr="9.9.9.0/24",
multi_host=False,
num_networks=1,
network_size=256,
cidr_v6=None,
gateway="9.9.9.1",
gateway_v6=None,
bridge=None,
bridge_interface=None,
dns1="8.8.8.8",
project_id=project_id,
priority=9,
uuid=net_id)
net = db.network_get_by_uuid(ctx.elevated(), net_id)
self.assertTrue(net is not None)
self.assertEquals(net['uuid'], net_id)
def test_create_net_external_uuid_and_host_is_set(self):
"""Make sure network['host'] is set when creating a network via the
network manager"""
project_id = "foo_project"
ctx = context.RequestContext('user1', project_id)
net_id = self.net_man.q_conn.create_network(project_id, 'net2')
self.net_man.create_networks(
ctx, label='achtungbaby2', cidr="9.9.8.0/24", multi_host=False,
num_networks=1, network_size=256, cidr_v6=None,
gateway="9.9.8.1", gateway_v6=None, bridge=None,
bridge_interface=None, dns1="8.8.8.8", project_id=project_id,
priority=8, uuid=net_id)
net = db.network_get_by_uuid(ctx.elevated(), net_id)
self.assertTrue(net is not None)
self.assertEquals(net['uuid'], net_id)
self.assertTrue(net['host'] != None)
class QuantumNovaMACGenerationTestCase(QuantumNovaTestCase):
def test_local_mac_address_creation(self):
self.flags(use_melange_mac_generation=False)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(utils, "generate_mac_address",
lambda: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
all_valid_networks = self.net_man.ipam.get_project_and_global_net_ids(
ctx, project_id)
requested_networks = [(n[0], None) for n in all_valid_networks]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)
def test_melange_mac_address_creation(self):
self.flags(use_melange_mac_generation=True)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(melange_connection.MelangeConnection, "create_vif",
lambda w, x, y, z: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
all_valid_networks = self.net_man.ipam.get_project_and_global_net_ids(
ctx, project_id)
requested_networks = [(n[0], None) for n in all_valid_networks]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)
class QuantumNovaPortSecurityTestCase(QuantumNovaTestCase):
def test_port_security(self):
self.flags(use_melange_mac_generation=True)
self.flags(quantum_use_port_security=True)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(melange_connection.MelangeConnection, "create_vif",
lambda w, x, y, z: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
all_valid_networks = self.net_man.ipam.get_project_and_global_net_ids(
ctx, project_id)
requested_networks = [(n[0], None) for n in all_valid_networks]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
oldfunc = self.net_man.q_conn.create_and_attach_port
# Make sure we get the appropriate mac set in allowed_address_pairs
# if port security is enabled.
def _instrumented_create_and_attach_port(tenant_id, net_id,
interface_id, **kwargs):
self.assertTrue('allowed_address_pairs' in kwargs.keys())
pairs = kwargs['allowed_address_pairs']
self.assertTrue(pairs[0]['mac_address'] == fake_mac)
self.net_man.q_conn.create_and_attach_port = oldfunc
return oldfunc(tenant_id, net_id, interface_id, **kwargs)
_port_attach = _instrumented_create_and_attach_port
self.net_man.q_conn.create_and_attach_port = _port_attach
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)
def test_port_security_negative(self):
self.flags(use_melange_mac_generation=True)
self.flags(quantum_use_port_security=False)
fake_mac = "ab:cd:ef:ab:cd:ef"
self.stubs.Set(melange_connection.MelangeConnection, "create_vif",
lambda w, x, y, z: fake_mac)
project_id = "fake_project1"
ctx = context.RequestContext('user1', project_id)
self._create_network(networks[0])
all_valid_networks = self.net_man.ipam.get_project_and_global_net_ids(
ctx, project_id)
requested_networks = [(n[0], None) for n in all_valid_networks]
instance_ref = db.api.instance_create(ctx,
{"project_id": project_id})
oldfunc = self.net_man.q_conn.create_and_attach_port
# Make sure no pairs are passed in if port security is turned off
def _instrumented_create_and_attach_port(tenant_id, net_id,
interface_id, **kwargs):
self.assertTrue('allowed_address_pairs' in kwargs.keys())
pairs = kwargs['allowed_address_pairs']
self.assertTrue(len(pairs) == 0)
self.net_man.q_conn.create_and_attach_port = oldfunc
return oldfunc(tenant_id, net_id, interface_id, **kwargs)
_port_attach = _instrumented_create_and_attach_port
self.net_man.q_conn.create_and_attach_port = _port_attach
nw_info = self.net_man.allocate_for_instance(ctx,
instance_id=instance_ref['id'], host="",
rxtx_factor=3,
project_id=project_id,
requested_networks=requested_networks)
self.assertEqual(nw_info[0]['address'], fake_mac)
class QuantumMelangeTestCase(test.TestCase):
def setUp(self):
super(QuantumMelangeTestCase, self).setUp()
fc = fake_client.FakeClient(LOG)
qc = quantum_connection.QuantumClientConnection(client=fc)
self.net_man = quantum_manager.QuantumManager(
ipam_lib="nova.network.quantum.nova_ipam_lib",
q_conn=qc)
def test_get_instance_uuids_by_ip_filter(self):
fake_context = context.RequestContext('user', 'project')
address = '1.2.3.4'
filters = {'ip': address}
self.net_man.ipam = self.mox.CreateMockAnything()
self.net_man.ipam.get_instance_ids_by_ip_address(fake_context,
address).AndReturn(['instance_id'])
instance = self.mox.CreateMockAnything()
instance.uuid = 'instance_uuid'
self.mox.StubOutWithMock(db, 'instance_get')
db.instance_get(fake_context, 'instance_id').AndReturn(instance)
self.mox.ReplayAll()
uuids = self.net_man.get_instance_uuids_by_ip_filter(fake_context,
filters)
self.assertEquals(uuids, [{'instance_uuid':'instance_uuid'}])
-243
View File
@@ -1,243 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import db
from nova import flags
from nova.network.quantum import melange_connection
from nova.network.quantum import melange_ipam_lib
from nova import test
FLAGS = flags.FLAGS
class MelangeIpamLibTestCase(test.TestCase):
def setUp(self):
super(MelangeIpamLibTestCase, self).setUp()
self.m_conn = self.mox.CreateMock(melange_connection.MelangeConnection)
self.ipam = melange_ipam_lib.QuantumMelangeIPAMLib()
self.ipam.m_conn = self.m_conn
def _block_list(self, id='id', cidr='cidr', network_id='network_id'):
return {'ip_blocks': [{'id': id,
'cidr': cidr,
'network_id': network_id}]}
def test_allocate_fixed_ips_extracts_address(self):
self.m_conn.allocate_ip('quantum_net_id', 'network_tenant_id',
'vif_ref_uuid', 'project_id',
'vif_ref_address').AndReturn(
[{'address': 'ip_address'}])
self.mox.ReplayAll()
ips = self.ipam.allocate_fixed_ips('context',
'project_id',
'quantum_net_id',
'network_tenant_id',
{'uuid': 'vif_ref_uuid',
'address': 'vif_ref_address'})
self.assertEqual(ips[0], 'ip_address')
def test_delete_subnets_by_net_id_deletes_block(self):
context = self.mox.CreateMockAnything()
context.elevated().AndReturn('elevated')
self.m_conn.get_blocks('project_id').AndReturn(
self._block_list(id='block_id'))
self.m_conn.delete_block('block_id', 'project_id')
self.mox.StubOutWithMock(db, 'network_get_by_uuid')
db.network_get_by_uuid('elevated', 'network_id').AndReturn(
{'id': 'network_id'})
self.mox.StubOutWithMock(db, 'network_delete_safe')
db.network_delete_safe(context, 'network_id')
self.mox.ReplayAll()
self.ipam.delete_subnets_by_net_id(context, 'network_id', 'project_id')
def test_get_networks_by_tenant_gets_all_networks(self):
block_list = self._block_list(network_id='net_1')
block_list['ip_blocks'] += self._block_list(
network_id='net_2')['ip_blocks']
self.m_conn.get_blocks('tenant_id').AndReturn(block_list)
self.mox.StubOutWithMock(db, 'network_get_by_uuid')
db.network_get_by_uuid('admin_context', 'net_1').AndReturn('network1')
db.network_get_by_uuid('admin_context', 'net_2').AndReturn('network2')
self.mox.ReplayAll()
values = self.ipam.get_networks_by_tenant('admin_context', 'tenant_id')
self.assertEquals(values, ['network1', 'network2'])
def test_get_global_networks(self):
FLAGS.quantum_default_tenant_id = 'quantum_default_tenant_id'
self.mox.StubOutWithMock(self.ipam, 'get_networks_by_tenant')
self.ipam.get_networks_by_tenant('admin_context',
'quantum_default_tenant_id')
self.mox.ReplayAll()
self.ipam.get_global_networks('admin_context')
def test_get_project_networks(self):
context = self.mox.CreateMockAnything()
context.elevated().AndReturn('elevated')
networks = [{'project_id': 1}, {'project_id': None}]
self.mox.StubOutWithMock(db, 'network_get_all')
db.network_get_all('elevated').AndReturn(networks)
self.mox.ReplayAll()
values = self.ipam.get_project_networks(context)
self.assertEquals(values, [networks[0]])
def test_get_project_and_global_net_ids__by_priority(self):
context = self.mox.CreateMockAnything()
context.elevated().AndReturn('elevated')
FLAGS.quantum_default_tenant_id = 'default_tenant_id'
net1 = {'uuid': 'net1_uuid', 'priority': 'net1_priority'}
net2 = {'uuid': 'net2_uuid', 'priority': 'net2_priority'}
self.mox.StubOutWithMock(self.ipam, 'get_networks_by_tenant')
self.ipam.get_networks_by_tenant('elevated',
'project_id').AndReturn([net1])
self.ipam.get_networks_by_tenant('elevated',
'default_tenant_id').AndReturn([net2])
self.mox.ReplayAll()
self.ipam.get_project_and_global_net_ids(context, 'project_id')
def test_get_tenant_id_by_net_id_returns_id(self):
FLAGS.quantum_default_tenant_id = 'qdti'
self.m_conn.get_allocated_ips('net_id', 'vif_id',
'qdti').AndReturn({})
self.mox.ReplayAll()
value = self.ipam.get_tenant_id_by_net_id('context', 'net_id',
'vif_id', 'project_id')
self.assertEqual(value, 'qdti')
def test_get_tenant_id_by_net_id_returns_none_if_none_found(self):
FLAGS.quantum_default_tenant_id = 'qdti'
self.m_conn.get_allocated_ips('net_id', 'vif_id',
'qdti').AndRaise(KeyError())
self.m_conn.get_allocated_ips('net_id', 'vif_id',
'project_id').AndRaise(KeyError())
self.m_conn.get_allocated_ips('net_id', 'vif_id',
None).AndRaise(KeyError())
self.mox.ReplayAll()
value = self.ipam.get_tenant_id_by_net_id('context', 'net_id',
'vif_id', 'project_id')
self.assertEqual(value, None)
def test_get_subnets_by_net_id(self):
ips = [{'ip_block': {'network_id': 'network_id',
'id': 'id',
'cidr': 'cidr',
'gateway': 'gateway',
'broadcast': 'broadcast',
'netmask': 'netmask',
'dns1': 'dns1',
'dns2': 'dns2'},
'version': 4}]
self.m_conn.get_allocated_ips('net_id', 'vif_id',
'tenant_id').AndReturn(ips)
self.mox.ReplayAll()
value = self.ipam.get_subnets_by_net_id('context', 'tenant_id',
'net_id', 'vif_id')
self.assertEquals(value[0]['cidr'], 'cidr')
def test_get_routes_by_ip_block(self):
self.m_conn.get_routes('block_id', 'project_id')
self.mox.ReplayAll()
self.ipam.get_routes_by_ip_block('context', 'block_id', 'project_id')
def test_get_v4_ips_by_interface(self):
self.mox.StubOutWithMock(self.ipam, '_get_ips_by_interface')
self.ipam._get_ips_by_interface('context', 'net_id', 'vif_id',
'project_id', 4)
self.mox.ReplayAll()
self.ipam.get_v4_ips_by_interface('context', 'net_id', 'vif_id',
'project_id')
def test_get_v6_ips_by_interface(self):
self.mox.StubOutWithMock(self.ipam, '_get_ips_by_interface')
self.ipam._get_ips_by_interface('context', 'net_id', 'vif_id',
'project_id', 6)
self.mox.ReplayAll()
self.ipam.get_v6_ips_by_interface('context', 'net_id', 'vif_id',
'project_id')
def test_get_ips_by_interface(self):
ips = [{'address': '10.10.10.10'}, {'address': '2001::CAFE'}]
self.m_conn.get_allocated_ips('net_id', 'vif_id',
'tenant_id').AndReturn(ips)
self.m_conn.get_allocated_ips('net_id', 'vif_id',
'tenant_id').AndReturn(ips)
self.mox.ReplayAll()
values = self.ipam._get_ips_by_interface('context', 'net_id', 'vif_id',
'tenant_id', 4)
self.assertEquals(values, ["10.10.10.10"])
values = self.ipam._get_ips_by_interface('context', 'net_id', 'vif_id',
'tenant_id', 6)
self.assertEquals(values, ["2001::CAFE"])
def test_get_instance_ids_by_ip_address(self):
ips = [{'used_by_device': 'some_vif_uuid'}]
self.m_conn.get_allocated_ips_by_address('ip').AndReturn(ips)
self.mox.ReplayAll()
uuid = self.ipam.get_instance_ids_by_ip_address('context', 'ip')
self.assertEqual(uuid, ['some_vif_uuid'])
def test_verify_subnet_exists(self):
blocks = {'ip_blocks': [{'network_id': 'quantum_net_id'}]}
self.m_conn.get_blocks('tenant_id').AndReturn(blocks)
self.mox.ReplayAll()
value = self.ipam.verify_subnet_exists('context', 'tenant_id',
'quantum_net_id')
self.assertEquals(value, True)
def test_deallocate_ips_by_vif(self):
self.m_conn.deallocate_ips('net_id', 'uuid', 'tenant_id')
self.mox.ReplayAll()
self.ipam.deallocate_ips_by_vif('context', 'tenant_id', 'net_id',
{'uuid': 'uuid'})
def test_get_allocated_ips(self):
ips = [{'address': 'ip_address', 'interface_id': 'interface_id'}]
self.m_conn.get_allocated_ips_for_network('subnet_id',
'project_id').AndReturn(ips)
self.mox.ReplayAll()
self.ipam.get_allocated_ips('context', 'subnet_id', 'project_id')
def test_create_vif(self):
self.m_conn.create_vif('vif_id', 'instance_id', 'project_id')
self.mox.ReplayAll()
self.ipam.create_vif('vif_id', 'instance_id', 'project_id')
def test_get_floating_ips_by_fixed_address(self):
value = self.ipam.get_floating_ips_by_fixed_address('context',
'fixed_address')
self.assertEquals(value, [])