diff --git a/contrib/clean-on-delete.py b/contrib/clean-on-delete.py deleted file mode 100755 index 2a097b18b5..0000000000 --- a/contrib/clean-on-delete.py +++ /dev/null @@ -1,355 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2025 Red Hat -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import collections -import logging -import socket -import subprocess -import threading -import time - -from keystoneauth1 import exceptions -from keystoneauth1 import loading -from openstack import connection -from oslo_concurrency import lockutils -from oslo_config import cfg -import oslo_messaging -from oslo_serialization import jsonutils - -LOG = logging.getLogger('cleaner') -loading.register_session_conf_options(cfg.CONF, 'placement') # noqa -loading.register_auth_conf_options(cfg.CONF, 'placement') # noqa -loading.register_adapter_conf_options(cfg.CONF, 'placement') # noqa -cfg.CONF.register_opts([cfg.StrOpt('host')]) # noqa - - -def link_for_rp(rp, linktype): - """Return the URL for a named link relation""" - for link in rp.links: - if link['rel'] == linktype: - # This link will have the full path as used to access it, - # but keystoneauth will re-add the baseurl (i.e. /placement - # to it) - return link['href'].split('/', 2)[2] - - -def periodic_thread(cleaner, delay): - while True: - time.sleep(delay) - LOG.info('Running periodic clean') - try: - cleaner.trigger() - except Exception as e: - LOG.exception('Failed to run periodic cleaning: %s', e) - - -class CleaningEndpoint: - """A notification listener endpoint that triggers cleaner on delete.""" - filter_rule = oslo_messaging.NotificationFilter( - event_type=r'instance\.delete\.end$') - - def __init__(self, cleaner, publisher_filter): - self.cleaner = cleaner - self.publisher_filter = publisher_filter - - def info(self, ctxt, publisher_id, event_type, payload, metadata): - if publisher_id == self.publisher_filter: - LOG.info('Instance deleted on %s', publisher_id) - self.cleaner.trigger() - else: - LOG.debug('Ignoring notification from %s != %s', - publisher_id, self.publisher_filter) - - audit = info - debug = info - warn = info - error = info - critical = info - sample = info - - -class Cleaner: - """Base class for a cleaner for a given device type""" - - def match(self, pci_addr): - """Determine if a PCI device matches this type""" - return False - - def clean(self, rp, pci_addr): - """Clean a specific device""" - raise NotImplementedError() - - -class GenericCleaner(Cleaner): - def match(self, pci_addr): - return True - - def clean(self, rp, pci_addr): - cmd = [self.CMD, pci_addr, rp.id] - LOG.debug('Running %r', ' '.join(cmd)) - subprocess.check_call(cmd) - - @classmethod - def for_command(cls, cmd): - class CommandCleaner(cls): - CMD = cmd - - return CommandCleaner - - -class NVMECleaner(Cleaner): - def __init__(self): - output = subprocess.check_output('nvme list -v -o json', shell=True) - data = jsonutils.loads(output) - self._nvmes = collections.defaultdict(list) - for device in data['Devices']: - for controller in device['Controllers']: - for ns in controller['Namespaces']: - self._nvmes[controller['Address']].append(ns['NameSpace']) - LOG.debug('Found NVMe devices: %s', self._nvmes) - - def match(self, pci_addr): - return pci_addr in self._nvmes - - def clean(self, rp, pci_addr): - devices = self._nvmes[pci_addr] - # NOTE(danms): This may not be good enough (we may need to use the - # "sanitize" option), or we may need to provide more options based - # on what the NVMe supports. - LOG.info('Cleaning %s', ','.join(devices)) - for nsdev in devices: - cmd = 'nvme format -f -s1 /dev/%s' % nsdev - LOG.debug('Running %r to clean %s (%s %s)', cmd, nsdev, - rp.id, pci_addr) - subprocess.check_call(cmd, shell=True) - - -class CleaningWatcher(): - def __init__(self, client, rp_name, cleaners, dry_run=False): - self.client = client - self.endpoint = CleaningEndpoint(self, 'nova-compute:%s' % rp_name) - self.rp = client.find_resource_provider(rp_name) - self.cleaners = cleaners - self.dryrun = dry_run - - def _getjson(self, url): - return self.client.session.get( - url, microversion='1.6', - endpoint_filter={'service_type': 'placement'}).json() - - def _putjson(self, url, data): - return self.client.session.put( - url, json=data, microversion='1.6', - endpoint_filter={'service_type': 'placement'}) - - def traits_for_rp(self, rp): - # NOTE(danms): openstacksdk has no support for resource provider - # traits, so query directly here. - url = link_for_rp(rp, 'traits') - return self._getjson(url)['traits'] - - def allocations_for_rp(self, rp): - # NOTE(danms): openstacksdk has no support for resource provider - # allocations, so query directly here. - url = link_for_rp(rp, 'allocations') - return self._getjson(url)['allocations'] - - def rp_inventory_to_clean(self, rp): - inventory = list(self.client.resource_provider_inventories(rp)) - if len(inventory) != 1: - # This has OTU traitage, but doesn't look like a PCI-in-placement - # provider, so skip it for safety. - LOG.warning( - 'Skipping provider %s because it has %i inventories', - rp.id, len(inventory)) - return - # OTU devices are "burned" when they have reserved == total - is_reserved = inventory[0].reserved == inventory[0].total - allocs = self.allocations_for_rp(rp) - LOG.debug('Allocations for %s: %s', rp.id, allocs) - LOG.debug('Inventory for %s: %s', rp.id, inventory) - # A provider needs cleaning if it is reserved and has no allocations - if is_reserved and not allocs: - return inventory[0] - - def pci_addr_for_rp(self, rp): - # This format is defined in the PCI-in-placement work as: - # hostname_DDDD:BB:SS.FF - host, addr = rp.name.split('_') - return addr - - def unreserve(self, inventory): - LOG.info('Unreserving %s on %s', inventory.resource_class, - inventory.resource_provider_id) - # NOTE(danms): openstacksdk apparently can't change *just* reserved - # for a provider inventory. Placement requires total be in the payload, - # which the sdk will not include if it thinks it is unchanged. So, - # manually PUT the inventory here. - url = '/'.join([ - inventory.base_path % { - 'resource_provider_id': inventory.resource_provider_id}, - inventory.resource_class]) - inventory = { - 'resource_provider_generation': ( - inventory.resource_provider_generation), - 'reserved': 0, - 'total': inventory.total, - } - - r = self._putjson(url, inventory) - r.raise_for_status() - - def run_cleaner(self, rp, pci_addr, cleaner, inventory): - if self.dryrun: - LOG.info('Would use %s to clean %s %s', - cleaner.__class__.__name__, - inventory.resource_provider_id, - inventory.resource_class) - return - try: - cleaner.clean(rp, pci_addr) - self.unreserve(inventory) - except subprocess.CalledProcessError as e: - LOG.error('Failed to clean %s (%s) - action returned %i', - pci_addr, rp.id, e.returncode) - except exceptions.HttpError as e: - LOG.error('Failed to unreserve %s: %s', rp.id, e) - except Exception as e: - LOG.exception('Unexpected error cleaning %s (%s): %s', - pci_addr, rp.id, e) - - @lockutils.synchronized('cleaner') - def trigger(self): - children = self.client.resource_providers(in_tree=self.rp.id) - children = list(children) - cleaners = [cleaner_cls() for cleaner_cls in self.cleaners] - for child in children: - is_otu = 'HW_PCI_ONE_TIME_USE' in self.traits_for_rp(child) - inventory_to_clean = self.rp_inventory_to_clean(child) - needs_cleaning = is_otu and inventory_to_clean - LOG.debug('Provider %s needs cleaning: %s (otu=%s)', - child.id, needs_cleaning, is_otu) - if not needs_cleaning: - continue - pci_addr = self.pci_addr_for_rp(child) - matched_cleaners = [cleaner for cleaner in cleaners - if cleaner.match(pci_addr)] - for cleaner in matched_cleaners: - self.run_cleaner(child, pci_addr, cleaner, inventory_to_clean) - if not matched_cleaners and needs_cleaning: - # Unsupported device type, don't clean - LOG.warning('Child provider %s for %s needs cleaning but no ' - 'cleaning implementations matched!', - child.id, pci_addr) - - -class LoggerEndpoint: - """A notification endpoint that just logs each event received for debug""" - - def info(self, ctxt, publisher_id, event_type, payload, metadata): - LOG.info('Event: %s %s', publisher_id, event_type) - - -def main(): - p = argparse.ArgumentParser() - p.add_argument('--topic', default=None, - help=('Notification queue. Defaults to ' - 'notifications-$provider_name for recommended ' - 'per-compute topic configuration')) - p.add_argument('--debug', action='store_true', - help='Enable verbose debug logging') - p.add_argument('--provider-name', default=None, - help=('Parent provider name for this compute node ' - '(if unspecified, attempt to detect)')) - p.add_argument('--one', action='store_true', - help='Run one cleaning pass and exit') - p.add_argument('--dry-run', action='store_true', - help='Do not actually clean, just print what would be done') - p.add_argument('--pool', default=None, - help=('oslo.messaging receiver pool name (see caveats ' - 'about this usage)')) - p.add_argument('--periodic', type=int, default=0, - help='Scan for missed required cleaning every N minutes') - p.add_argument('--nvme', action='store_true', - help=('Clean NVMe devices with `nvme format` on all ' - 'namespaces')) - p.add_argument('--generic-cmd', - help=('Run this command for each PCI device needing to be ' - 'cleaned. Arguments are "pci_addr rp_uuid"')) - args = p.parse_args() - cfg.CONF([], project='nova') - - auth = loading.load_auth_from_conf_options(cfg.CONF, 'placement') - sess = loading.load_session_from_conf_options(cfg.CONF, 'placement', - auth=auth) - placement = connection.Connection(session=sess, - oslo_conf=cfg.CONF).placement - - logging.basicConfig(level=args.debug and logging.DEBUG or logging.INFO) - - if not args.provider_name: - if cfg.CONF.host: - args.provider_name = cfg.CONF.host - else: - args.provider_name = socket.gethostname().split('.')[0] - if not args.topic: - args.topic = 'notifications-%s' % args.provider_name - - # This could be dynamic based on arguments to control what things we do - cleaners = [] - if args.nvme: - cleaners.append(NVMECleaner) - if args.generic_cmd: - cleaners.append(GenericCleaner.for_command(args.generic_cmd)) - if len(cleaners) > 1: - # NOTE(danms): This should be supportable, but it complicates our - # "when to unreserve" logic, so just keep this simple at the moment. - print('Only one type of cleaner may be enabled at a time') - return 1 - c = CleaningWatcher(placement, args.provider_name, cleaners, - dry_run=args.dry_run) - # Trigger at least once at startup before we potentially start processing - # notifications - c.trigger() - - if args.one: - return - - if args.periodic: - periodic = threading.Thread(target=periodic_thread, - args=(c, args.periodic * 60)) - periodic.daemon = True - periodic.start() - - transport = oslo_messaging.get_notification_transport(cfg.CONF) - targets = [oslo_messaging.Target(topic=args.topic, exchange='nova')] - endpoints = [c.endpoint] - if args.debug: - endpoints.insert(0, LoggerEndpoint()) - - server = oslo_messaging.get_notification_listener(transport, targets, - endpoints, - pool=args.pool) - server.start() - try: - server.wait() - except KeyboardInterrupt: - pass - - -if __name__ == '__main__': - main()