Merge "Change MQ targeting to honor only what is in the context"
This commit is contained in:
@@ -370,8 +370,12 @@ def set_target_cell(context, cell_mapping):
|
||||
"""
|
||||
# avoid circular import
|
||||
from nova import db
|
||||
from nova import rpc
|
||||
db_connection_string = cell_mapping.database_connection
|
||||
context.db_connection = db.create_context_manager(db_connection_string)
|
||||
if not cell_mapping.transport_url.startswith('none'):
|
||||
context.mq_connection = rpc.create_transport(
|
||||
cell_mapping.transport_url)
|
||||
|
||||
|
||||
@contextmanager
|
||||
@@ -386,8 +390,10 @@ def target_cell(context, cell_mapping):
|
||||
:param cell_mapping: A objects.CellMapping object
|
||||
"""
|
||||
original_db_connection = context.db_connection
|
||||
original_mq_connection = context.mq_connection
|
||||
set_target_cell(context, cell_mapping)
|
||||
try:
|
||||
yield context
|
||||
finally:
|
||||
context.db_connection = original_db_connection
|
||||
context.mq_connection = original_mq_connection
|
||||
|
||||
+20
-66
@@ -34,13 +34,11 @@ from oslo_messaging.rpc import dispatcher
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
import nova.conf
|
||||
import nova.context
|
||||
import nova.exception
|
||||
from nova.i18n import _
|
||||
from nova import objects
|
||||
|
||||
profiler = importutils.try_import("osprofiler.profiler")
|
||||
|
||||
@@ -395,27 +393,14 @@ class LegacyValidatingNotifier(object):
|
||||
getattr(self.notifier, priority)(ctxt, event_type, payload)
|
||||
|
||||
|
||||
class ClientWrapper(object):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
self.last_access_time = timeutils.utcnow()
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
self.last_access_time = timeutils.utcnow()
|
||||
return self._client
|
||||
|
||||
|
||||
class ClientRouter(periodic_task.PeriodicTasks):
|
||||
"""Creates and caches RPC clients that route to cells or the default.
|
||||
|
||||
The default client connects to the API cell message queue. The rest of the
|
||||
clients connect to compute cell message queues.
|
||||
"""Creates RPC clients that honor the context's RPC transport
|
||||
or provides a default.
|
||||
"""
|
||||
|
||||
def __init__(self, default_client):
|
||||
super(ClientRouter, self).__init__(CONF)
|
||||
self.clients = {}
|
||||
self.clients['default'] = ClientWrapper(default_client)
|
||||
self.default_client = default_client
|
||||
self.target = default_client.target
|
||||
self.version_cap = default_client.version_cap
|
||||
# NOTE(melwitt): Cells v1 does its own serialization and won't
|
||||
@@ -424,55 +409,24 @@ class ClientRouter(periodic_task.PeriodicTasks):
|
||||
# Prevent this empty context from overwriting the thread local copy
|
||||
self.run_periodic_tasks(nova.context.RequestContext(overwrite=False))
|
||||
|
||||
def _client(self, context, cell_mapping=None):
|
||||
if cell_mapping:
|
||||
client_id = cell_mapping.uuid
|
||||
def _client(self, context, transport=None):
|
||||
if transport:
|
||||
return messaging.RPCClient(transport, self.target,
|
||||
version_cap=self.version_cap,
|
||||
serializer=self.serializer)
|
||||
else:
|
||||
client_id = 'default'
|
||||
|
||||
try:
|
||||
client = self.clients[client_id].client
|
||||
except KeyError:
|
||||
transport = create_transport(cell_mapping.transport_url)
|
||||
client = messaging.RPCClient(transport, self.target,
|
||||
version_cap=self.version_cap,
|
||||
serializer=self.serializer)
|
||||
self.clients[client_id] = ClientWrapper(client)
|
||||
|
||||
return client
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _remove_stale_clients(self, context):
|
||||
timeout = 60
|
||||
|
||||
def stale(client_id, last_access_time):
|
||||
if timeutils.is_older_than(last_access_time, timeout):
|
||||
LOG.debug('Removing stale RPC client: %s as it was last '
|
||||
'accessed at %s', client_id, last_access_time)
|
||||
return True
|
||||
return False
|
||||
|
||||
# Never expire the default client
|
||||
items_copy = list(self.clients.items())
|
||||
for client_id, client_wrapper in items_copy:
|
||||
if (client_id != 'default' and
|
||||
stale(client_id, client_wrapper.last_access_time)):
|
||||
del self.clients[client_id]
|
||||
return self.default_client
|
||||
|
||||
def by_instance(self, context, instance):
|
||||
try:
|
||||
cell_mapping = objects.InstanceMapping.get_by_instance_uuid(
|
||||
context, instance.uuid).cell_mapping
|
||||
except nova.exception.InstanceMappingNotFound:
|
||||
# Not a cells v2 deployment
|
||||
cell_mapping = None
|
||||
return self._client(context, cell_mapping=cell_mapping)
|
||||
"""Deprecated."""
|
||||
if context.mq_connection:
|
||||
return self._client(context, transport=context.mq_connection)
|
||||
else:
|
||||
return self.default_client
|
||||
|
||||
def by_host(self, context, host):
|
||||
try:
|
||||
cell_mapping = objects.HostMapping.get_by_host(
|
||||
context, host).cell_mapping
|
||||
except nova.exception.HostMappingNotFound:
|
||||
# Not a cells v2 deployment
|
||||
cell_mapping = None
|
||||
return self._client(context, cell_mapping=cell_mapping)
|
||||
"""Deprecated."""
|
||||
if context.mq_connection:
|
||||
return self._client(context, transport=context.mq_connection)
|
||||
else:
|
||||
return self.default_client
|
||||
|
||||
@@ -115,7 +115,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
|
||||
|
||||
# This test wants to run the real prepare function, so must use
|
||||
# a real client object
|
||||
default_client = rpcapi.router.clients['default'].client
|
||||
default_client = rpcapi.router.default_client
|
||||
|
||||
orig_prepare = default_client.prepare
|
||||
base_version = rpcapi.router.target.version
|
||||
|
||||
@@ -290,18 +290,24 @@ class ContextTestCase(test.NoDBTestCase):
|
||||
mock_authorize.assert_called_once_with(ctxt, mock.sentinel.rule,
|
||||
mock.sentinel.target)
|
||||
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('nova.db.create_context_manager')
|
||||
def test_target_cell(self, mock_create_ctxt_mgr):
|
||||
mock_create_ctxt_mgr.return_value = mock.sentinel.cm
|
||||
def test_target_cell(self, mock_create_ctxt_mgr, mock_rpc):
|
||||
mock_create_ctxt_mgr.return_value = mock.sentinel.cdb
|
||||
mock_rpc.return_value = mock.sentinel.cmq
|
||||
ctxt = context.RequestContext('111',
|
||||
'222',
|
||||
roles=['admin', 'weasel'])
|
||||
# Verify the existing db_connection, if any, is restored
|
||||
ctxt.db_connection = mock.sentinel.db_conn
|
||||
mapping = objects.CellMapping(database_connection='fake://')
|
||||
ctxt.mq_connection = mock.sentinel.mq_conn
|
||||
mapping = objects.CellMapping(database_connection='fake://',
|
||||
transport_url='fake://')
|
||||
with context.target_cell(ctxt, mapping):
|
||||
self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
|
||||
self.assertEqual(ctxt.db_connection, mock.sentinel.cdb)
|
||||
self.assertEqual(ctxt.mq_connection, mock.sentinel.cmq)
|
||||
self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection)
|
||||
self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection)
|
||||
|
||||
def test_get_context(self):
|
||||
ctxt = context.get_context()
|
||||
|
||||
+24
-134
@@ -12,18 +12,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import fixture as utils_fixture
|
||||
import testtools
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova import rpc
|
||||
from nova import test
|
||||
@@ -445,179 +442,72 @@ class TestProfilerRequestContextSerializer(test.NoDBTestCase):
|
||||
|
||||
|
||||
class TestClientRouter(test.NoDBTestCase):
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_instance(self, mock_rpcclient, mock_create, mock_get):
|
||||
def test_by_instance(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
cm = objects.CellMapping(uuid=uuids.cell_mapping,
|
||||
transport_url='fake:///')
|
||||
mock_get.return_value = objects.InstanceMapping(cell_mapping=cm)
|
||||
ctxt.mq_connection = mock.sentinel.transport
|
||||
instance = objects.Instance(uuid=uuids.instance)
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_instance(ctxt, instance)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock_create.return_value, default_client.target,
|
||||
mock.sentinel.transport, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
# reset and check that cached client is returned the second time
|
||||
mock_rpcclient.reset_mock()
|
||||
mock_create.reset_mock()
|
||||
mock_get.reset_mock()
|
||||
|
||||
client = router.by_instance(ctxt, instance)
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.HostMapping.get_by_host')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_host(self, mock_rpcclient, mock_create, mock_get):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
cm = objects.CellMapping(uuid=uuids.cell_mapping,
|
||||
transport_url='fake:///')
|
||||
mock_get.return_value = objects.HostMapping(cell_mapping=cm)
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock_create.return_value, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
# reset and check that cached client is returned the second time
|
||||
mock_rpcclient.reset_mock()
|
||||
mock_create.reset_mock()
|
||||
mock_get.reset_mock()
|
||||
|
||||
client = router.by_host(ctxt, host)
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid',
|
||||
side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance))
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_instance_not_found(self, mock_rpcclient, mock_create,
|
||||
mock_get):
|
||||
def test_by_instance_untargeted(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
ctxt.mq_connection = None
|
||||
instance = objects.Instance(uuid=uuids.instance)
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_instance(ctxt, instance)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, instance.uuid)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
# verify default client was returned
|
||||
self.assertEqual(default_client, client)
|
||||
self.assertEqual(router.default_client, client)
|
||||
self.assertFalse(mock_rpcclient.called)
|
||||
|
||||
@mock.patch('nova.objects.HostMapping.get_by_host',
|
||||
side_effect=exception.HostMappingNotFound(name='fake-host'))
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get):
|
||||
def test_by_host(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
ctxt.mq_connection = mock.sentinel.transport
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
mock_get.assert_called_once_with(ctxt, host)
|
||||
mock_rpcclient.assert_not_called()
|
||||
mock_create.assert_not_called()
|
||||
# verify default client was returned
|
||||
self.assertEqual(default_client, client)
|
||||
# verify a client was created by ClientRouter
|
||||
mock_rpcclient.assert_called_once_with(
|
||||
mock.sentinel.transport, default_client.target,
|
||||
version_cap=default_client.version_cap,
|
||||
serializer=default_client.serializer)
|
||||
# verify cell client was returned
|
||||
self.assertEqual(cell_client, client)
|
||||
|
||||
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
|
||||
@mock.patch('nova.rpc.create_transport')
|
||||
@mock.patch('oslo_messaging.RPCClient')
|
||||
def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get):
|
||||
t0 = datetime.datetime(2016, 8, 9, 0, 0, 0)
|
||||
time_fixture = self.useFixture(utils_fixture.TimeFixture(t0))
|
||||
|
||||
def test_by_host_untargeted(self, mock_rpcclient):
|
||||
default_client = mock.Mock()
|
||||
cell_client = mock.Mock()
|
||||
mock_rpcclient.return_value = cell_client
|
||||
ctxt = mock.Mock()
|
||||
|
||||
cm1 = objects.CellMapping(uuid=uuids.cell_mapping1,
|
||||
transport_url='fake:///')
|
||||
cm2 = objects.CellMapping(uuid=uuids.cell_mapping2,
|
||||
transport_url='fake:///')
|
||||
cm3 = objects.CellMapping(uuid=uuids.cell_mapping3,
|
||||
transport_url='fake:///')
|
||||
mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1),
|
||||
objects.InstanceMapping(cell_mapping=cm2),
|
||||
objects.InstanceMapping(cell_mapping=cm3),
|
||||
objects.InstanceMapping(cell_mapping=cm3)]
|
||||
instance1 = objects.Instance(uuid=uuids.instance1)
|
||||
instance2 = objects.Instance(uuid=uuids.instance2)
|
||||
instance3 = objects.Instance(uuid=uuids.instance3)
|
||||
ctxt.mq_connection = None
|
||||
host = 'fake-host'
|
||||
|
||||
router = rpc.ClientRouter(default_client)
|
||||
cell1_client = router.by_instance(ctxt, instance1)
|
||||
cell2_client = router.by_instance(ctxt, instance2)
|
||||
client = router.by_host(ctxt, host)
|
||||
|
||||
# default client, cell1 client, cell2 client
|
||||
self.assertEqual(3, len(router.clients))
|
||||
expected = {'default': default_client,
|
||||
uuids.cell_mapping1: cell1_client,
|
||||
uuids.cell_mapping2: cell2_client}
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
|
||||
# expire cell1 client and cell2 client
|
||||
time_fixture.advance_time_seconds(80)
|
||||
|
||||
# add cell3 client
|
||||
cell3_client = router.by_instance(ctxt, instance3)
|
||||
|
||||
router._remove_stale_clients(ctxt)
|
||||
|
||||
# default client, cell3 client
|
||||
expected = {'default': default_client,
|
||||
uuids.cell_mapping3: cell3_client}
|
||||
self.assertEqual(2, len(router.clients))
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
|
||||
# expire cell3 client
|
||||
time_fixture.advance_time_seconds(80)
|
||||
|
||||
# access cell3 client to refresh it
|
||||
cell3_client = router.by_instance(ctxt, instance3)
|
||||
|
||||
router._remove_stale_clients(ctxt)
|
||||
|
||||
# default client and cell3 client should be there
|
||||
self.assertEqual(2, len(router.clients))
|
||||
for client_id, client in expected.items():
|
||||
self.assertEqual(client, router.clients[client_id].client)
|
||||
self.assertEqual(router.default_client, client)
|
||||
self.assertFalse(mock_rpcclient.called)
|
||||
|
||||
Reference in New Issue
Block a user