From 11d909c2cbf80ce899b7a68ce72897dfb2945fed Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Thu, 3 Oct 2019 11:55:00 -0700 Subject: [PATCH] Add cache_images() to conductor This adds the bulk of the image pre-caching logic to the conductor task manager. It takes an aggregate and list of image ids from the API service and handles the process of calling to the relevant compute nodes to initiate the image downloads, honoring the (new) config knob for overall task parallelism. Related to blueprint image-precache-support Change-Id: Id7c0ab7ae0586d49d88ff2afae149e25e59a3489 --- nova/conductor/api.py | 19 +++++ nova/conductor/manager.py | 70 ++++++++++++++++- nova/conductor/rpcapi.py | 11 +++ nova/conf/__init__.py | 2 + nova/conf/imagecache.py | 47 ++++++++++++ nova/notifications/objects/base.py | 3 +- nova/objects/fields.py | 3 +- .../functional/compute/test_cache_image.py | 76 +++++++++++++++++++ nova/tests/unit/conductor/test_conductor.py | 70 +++++++++++++++++ .../objects/test_notification.py | 2 +- nova/virt/fake.py | 13 ++++ 11 files changed, 312 insertions(+), 4 deletions(-) create mode 100644 nova/conf/imagecache.py create mode 100644 nova/tests/functional/compute/test_cache_image.py diff --git a/nova/conductor/api.py b/nova/conductor/api.py index a7c49d7b9d..3ad5a4b168 100644 --- a/nova/conductor/api.py +++ b/nova/conductor/api.py @@ -20,6 +20,7 @@ import oslo_messaging as messaging from nova import baserpc from nova.conductor import rpcapi import nova.conf +from nova import image CONF = nova.conf.CONF @@ -83,6 +84,7 @@ class ComputeTaskAPI(object): def __init__(self): self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI() + self.image_api = image.API() # TODO(stephenfin): Remove the 'reservations' parameter since we don't use # reservations anymore @@ -155,3 +157,20 @@ class ComputeTaskAPI(object): preserve_ephemeral=preserve_ephemeral, host=host, request_spec=request_spec) + + def cache_images(self, context, aggregate, image_ids): + """Request images be pre-cached on hosts within an aggregate. + + :param context: The RequestContext + :param aggregate: The objects.Aggregate representing the hosts to + contact + :param image_ids: A list of image ID strings to send to the hosts + """ + for image_id in image_ids: + # Validate that we can get the image by id before we go + # ask a bunch of hosts to do the same. We let this bubble + # up to the API, which catches NovaException for the 4xx and + # otherwise 500s if this fails in some unexpected way. + self.image_api.get(context, image_id) + self.conductor_compute_rpcapi.cache_images(context, aggregate, + image_ids) diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index e052f3798a..ce29fbb64e 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -16,6 +16,7 @@ import contextlib import copy +import eventlet import functools import sys @@ -230,7 +231,7 @@ class ComputeTaskManager(base.Base): may involve coordinating activities on multiple compute nodes. """ - target = messaging.Target(namespace='compute_task', version='1.20') + target = messaging.Target(namespace='compute_task', version='1.21') def __init__(self): super(ComputeTaskManager, self).__init__() @@ -1629,3 +1630,70 @@ class ComputeTaskManager(base.Base): pass return False return True + + def cache_images(self, context, aggregate, image_ids): + """Cache a set of images on the set of hosts in an aggregate. + + :param context: The RequestContext + :param aggregate: The Aggregate object from the request to constrain + the host list + :param image_id: The IDs of the image to cache + """ + + # TODO(danms): Fix notification sample for IMAGE_CACHE action + compute_utils.notify_about_aggregate_action( + context, aggregate, + fields.NotificationAction.IMAGE_CACHE, + fields.NotificationPhase.START) + + clock = timeutils.StopWatch() + threads = CONF.image_cache.precache_concurrency + fetch_pool = eventlet.GreenPool(size=threads) + + hosts_by_cell = {} + cells_by_uuid = {} + # TODO(danms): Make this a much more efficient bulk query + for hostname in aggregate.hosts: + hmap = objects.HostMapping.get_by_host(context, hostname) + cells_by_uuid.setdefault(hmap.cell_mapping.uuid, hmap.cell_mapping) + hosts_by_cell.setdefault(hmap.cell_mapping.uuid, []) + hosts_by_cell[hmap.cell_mapping.uuid].append(hostname) + + LOG.info('Preparing to request pre-caching of image(s) %(image_ids)s ' + 'on %(hosts)i hosts across %(cells)s cells.', + {'image_ids': ','.join(image_ids), + 'hosts': len(aggregate.hosts), + 'cells': len(hosts_by_cell)}) + clock.start() + + for cell_uuid, hosts in hosts_by_cell.items(): + cell = cells_by_uuid[cell_uuid] + with nova_context.target_cell(context, cell) as target_ctxt: + for host in hosts: + service = objects.Service.get_by_compute_host(target_ctxt, + host) + if not self.servicegroup_api.service_is_up(service): + LOG.info( + 'Skipping image pre-cache request to compute ' + '%(host)r because it is not up', + {'host': host}) + continue + + fetch_pool.spawn_n(self.compute_rpcapi.cache_images, + target_ctxt, + host=host, + image_ids=image_ids) + + # Wait until all those things finish + fetch_pool.waitall() + + clock.stop() + LOG.info('Image pre-cache operation for image(s) %(image_ids)s ' + 'completed in %(time).2f seconds', + {'image_ids': ','.join(image_ids), + 'time': clock.elapsed()}) + + compute_utils.notify_about_aggregate_action( + context, aggregate, + fields.NotificationAction.IMAGE_CACHE, + fields.NotificationPhase.END) diff --git a/nova/conductor/rpcapi.py b/nova/conductor/rpcapi.py index 6a93d92a0e..bc17f6fade 100644 --- a/nova/conductor/rpcapi.py +++ b/nova/conductor/rpcapi.py @@ -20,6 +20,7 @@ from oslo_serialization import jsonutils from oslo_versionedobjects import base as ovo_base import nova.conf +from nova import exception from nova.objects import base as objects_base from nova import profiler from nova import rpc @@ -281,6 +282,7 @@ class ComputeTaskAPI(object): instance. 1.20 - migrate_server() now gets a 'host_list' parameter that represents potential alternate hosts for retries within a cell. + 1.21 - Added cache_images() """ def __init__(self): @@ -436,3 +438,12 @@ class ComputeTaskAPI(object): del kw['request_spec'] cctxt = self.client.prepare(version=version) cctxt.cast(ctxt, 'rebuild_instance', **kw) + + def cache_images(self, ctxt, aggregate, image_ids): + version = '1.21' + if not self.client.can_send_version(version): + raise exception.NovaException('Conductor RPC version pin does not ' + 'allow cache_images() to be called') + cctxt = self.client.prepare(version=version) + cctxt.cast(ctxt, 'cache_images', aggregate=aggregate, + image_ids=image_ids) diff --git a/nova/conf/__init__.py b/nova/conf/__init__.py index 36c3583e92..7cf5b19706 100644 --- a/nova/conf/__init__.py +++ b/nova/conf/__init__.py @@ -35,6 +35,7 @@ from nova.conf import ephemeral_storage from nova.conf import glance from nova.conf import guestfs from nova.conf import hyperv +from nova.conf import imagecache from nova.conf import ironic from nova.conf import key_manager from nova.conf import keystone @@ -88,6 +89,7 @@ glance.register_opts(CONF) guestfs.register_opts(CONF) hyperv.register_opts(CONF) mks.register_opts(CONF) +imagecache.register_opts(CONF) ironic.register_opts(CONF) key_manager.register_opts(CONF) keystone.register_opts(CONF) diff --git a/nova/conf/imagecache.py b/nova/conf/imagecache.py new file mode 100644 index 0000000000..932c9ceee1 --- /dev/null +++ b/nova/conf/imagecache.py @@ -0,0 +1,47 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +imagecache_group = cfg.OptGroup( + 'image_cache', + title='Image Cache Options', + help=""" +A collection of options specific to image caching. +""") +imagecache_opts = [ + cfg.IntOpt('precache_concurrency', + default=1, + min=1, + help=""" +Maximum number of compute hosts to trigger image precaching in parallel. + +When an image precache request is made, compute nodes will be contacted +to initiate the download. This number constrains the number of those that +will happen in parallel. Higher numbers will cause more computes to work +in parallel and may result in reduced time to complete the operation, but +may also DDoS the image service. Lower numbers will result in more sequential +operation, lower image service load, but likely longer runtime to completion. +"""), +] + + +ALL_OPTS = (imagecache_opts,) + + +def register_opts(conf): + conf.register_group(imagecache_group) + conf.register_opts(imagecache_opts, group=imagecache_group) + + +def list_opts(): + return {imagecache_group: imagecache_opts} diff --git a/nova/notifications/objects/base.py b/nova/notifications/objects/base.py index 18485d583f..3e223776fe 100644 --- a/nova/notifications/objects/base.py +++ b/nova/notifications/objects/base.py @@ -71,7 +71,8 @@ class EventType(NotificationObject): # NotificationActionField enum # Version 1.19: SELECT_DESTINATIONS is added to the NotificationActionField # enum - VERSION = '1.19' + # Version 1.20: IMAGE_CACHE is added to the NotificationActionField enum + VERSION = '1.20' fields = { 'object': fields.StringField(nullable=False), diff --git a/nova/objects/fields.py b/nova/objects/fields.py index a768d0236a..fbd2854389 100644 --- a/nova/objects/fields.py +++ b/nova/objects/fields.py @@ -864,6 +864,7 @@ class NotificationAction(BaseNovaEnum): BUILD_INSTANCES = 'build_instances' MIGRATE_SERVER = 'migrate_server' REBUILD_SERVER = 'rebuild_server' + IMAGE_CACHE = 'cache_images' ALL = (UPDATE, EXCEPTION, DELETE, PAUSE, UNPAUSE, RESIZE, VOLUME_SWAP, SUSPEND, POWER_ON, REBOOT, SHUTDOWN, SNAPSHOT, INTERFACE_ATTACH, @@ -877,7 +878,7 @@ class NotificationAction(BaseNovaEnum): REMOVE_HOST, ADD_MEMBER, UPDATE_METADATA, LOCK, UNLOCK, REBUILD_SCHEDULED, UPDATE_PROP, LIVE_MIGRATION_FORCE_COMPLETE, CONNECT, USAGE, BUILD_INSTANCES, MIGRATE_SERVER, REBUILD_SERVER, - SELECT_DESTINATIONS) + SELECT_DESTINATIONS, IMAGE_CACHE) # TODO(rlrossit): These should be changed over to be a StateMachine enum from diff --git a/nova/tests/functional/compute/test_cache_image.py b/nova/tests/functional/compute/test_cache_image.py new file mode 100644 index 0000000000..844ecae0bd --- /dev/null +++ b/nova/tests/functional/compute/test_cache_image.py @@ -0,0 +1,76 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils.fixture import uuidsentinel as uuids + +from nova import context +from nova import objects +from nova import test +from nova.tests.unit import fake_notifier + + +class ImageCacheTest(test.TestCase): + NUMBER_OF_CELLS = 2 + + def setUp(self): + super(ImageCacheTest, self).setUp() + + self.flags(compute_driver='fake.FakeDriverWithCaching') + + fake_notifier.stub_notifier(self) + self.addCleanup(fake_notifier.reset) + self.context = context.get_admin_context() + + self.conductor = self.start_service('conductor') + self.compute1 = self.start_service('compute', host='compute1') + self.compute2 = self.start_service('compute', host='compute2') + self.compute3 = self.start_service('compute', host='compute3', + cell='cell2') + self.compute4 = self.start_service('compute', host='compute4', + cell='cell2') + self.compute5 = self.start_service('compute', host='compute5', + cell='cell2') + + cell2 = self.cell_mappings['cell2'] + with context.target_cell(self.context, cell2) as cctxt: + srv = objects.Service.get_by_compute_host(cctxt, 'compute5') + srv.forced_down = True + srv.save() + + def test_cache_image(self): + """Test caching images by injecting the request directly to + the conductor service and making sure it fans out and calls + the expected nodes. + """ + + aggregate = objects.Aggregate(name='test', + uuid=uuids.aggregate, + id=1, + hosts=['compute1', 'compute3', + 'compute4', 'compute5']) + self.conductor.compute_task_mgr.cache_images( + self.context, aggregate, ['an-image']) + + # NOTE(danms): We expect only three image cache attempts because + # compute5 is marked as forced-down and compute2 is not in the + # requested aggregate. + for host in ['compute1', 'compute3', 'compute4']: + mgr = getattr(self, host) + self.assertEqual(set(['an-image']), mgr.driver.cached_images) + for host in ['compute2', 'compute5']: + mgr = getattr(self, host) + self.assertEqual(set(), mgr.driver.cached_images) + + fake_notifier.wait_for_versioned_notifications( + 'aggregate.cache_images.start') + fake_notifier.wait_for_versioned_notifications( + 'aggregate.cache_images.end') diff --git a/nova/tests/unit/conductor/test_conductor.py b/nova/tests/unit/conductor/test_conductor.py index 0f6689eba2..6ab271de03 100644 --- a/nova/tests/unit/conductor/test_conductor.py +++ b/nova/tests/unit/conductor/test_conductor.py @@ -3567,6 +3567,23 @@ class ConductorTaskRPCAPITestCase(_BaseTaskTestCase, self.context, 'build_instances', **kw) _test() + def test_cache_images(self): + with mock.patch.object(self.conductor, 'client') as client: + self.conductor.cache_images(self.context, mock.sentinel.aggregate, + [mock.sentinel.image]) + client.prepare.return_value.cast.assert_called_once_with( + self.context, 'cache_images', + aggregate=mock.sentinel.aggregate, + image_ids=[mock.sentinel.image]) + client.prepare.assert_called_once_with(version='1.21') + + with mock.patch.object(self.conductor.client, 'can_send_version') as v: + v.return_value = False + self.assertRaises(exc.NovaException, + self.conductor.cache_images, + self.context, mock.sentinel.aggregate, + [mock.sentinel.image]) + class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase): """Compute task API Tests.""" @@ -3591,3 +3608,56 @@ class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase): self.context, inst_obj, {'host': 'destination'}, True, False, None, 'block_migration', 'disk_over_commit', None, request_spec=None) + + def test_cache_images(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + self.conductor.cache_images(self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_image.assert_has_calls([mock.call(self.context, + mock.sentinel.image1), + mock.call(self.context, + mock.sentinel.image2)]) + mock_cache.assert_called_once_with( + self.context, mock.sentinel.aggregate, + [mock.sentinel.image1, mock.sentinel.image2]) + + _test() + + def test_cache_images_fail(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + mock_image.side_effect = test.TestingException() + # We should expect to see non-NovaException errors + # raised directly so the API can 500 for them. + self.assertRaises(test.TestingException, + self.conductor.cache_images, + self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_cache.assert_not_called() + + _test() + + def test_cache_images_missing(self): + @mock.patch.object(self.conductor.conductor_compute_rpcapi, + 'cache_images') + @mock.patch.object(self.conductor.image_api, 'get') + def _test(mock_image, mock_cache): + mock_image.side_effect = exc.ImageNotFound('foo') + self.assertRaises(exc.ImageNotFound, + self.conductor.cache_images, + self.context, + mock.sentinel.aggregate, + [mock.sentinel.image1, + mock.sentinel.image2]) + mock_cache.assert_not_called() + + _test() diff --git a/nova/tests/unit/notifications/objects/test_notification.py b/nova/tests/unit/notifications/objects/test_notification.py index c05287bfa1..47cc29f8ba 100644 --- a/nova/tests/unit/notifications/objects/test_notification.py +++ b/nova/tests/unit/notifications/objects/test_notification.py @@ -375,7 +375,7 @@ notification_object_data = { 'ComputeTaskNotification': '1.0-a73147b93b520ff0061865849d3dfa56', 'ComputeTaskPayload': '1.0-e3d34762c14d131c98337b72e8c600e1', 'DestinationPayload': '1.0-4ccf26318dd18c4377dada2b1e74ec2e', - 'EventType': '1.19-000a76e83b06a9de11d365465a755a5e', + 'EventType': '1.20-4e02a676d3a18cab99579cacd1c91453', 'ExceptionNotification': '1.0-a73147b93b520ff0061865849d3dfa56', 'ExceptionPayload': '1.1-6c43008bd81885a63bc7f7c629f0793b', 'FlavorNotification': '1.0-a73147b93b520ff0061865849d3dfa56', diff --git a/nova/virt/fake.py b/nova/virt/fake.py index e185ad14b0..2bbae8bd1b 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -1013,3 +1013,16 @@ class FakeDriverWithPciResources(SmallFakeDriver): }, ]) return host_status + + +class FakeDriverWithCaching(FakeDriver): + def __init__(self, *a, **k): + super(FakeDriverWithCaching, self).__init__(*a, **k) + self.cached_images = set() + + def cache_image(self, context, image_id): + if image_id in self.cached_images: + return False + else: + self.cached_images.add(image_id) + return True