From 14199b7e7083c36e081079cc4960542651bc4099 Mon Sep 17 00:00:00 2001 From: Matthew Booth Date: Thu, 3 Nov 2016 13:08:13 +0000 Subject: [PATCH] libvirt: Call host connection callbacks asynchronously Host.get_connection() does 2 types of job when initialising a new connection: 1. Register event handling callbacks on the connection itself. 2. Call the _set_host_enabled in the driver to enable/disable the compute service. The first is essential to run before the connection is used for anything. The second needs only to run eventually once the connection is established. This patch creates a new helper thread which runs connection up/down callbacks asynchronously to the caller. This means that the caller of get_connection() returns as soon as the connection is usable, and the driver callback happens concurrently. This is a minor improvement currently, but will be more useful when we add additional work to the connection up/down event. It also means that the callback runs without holding _wrapped_conn_lock, which it doesn't need. This means the callback itself can call get_connection() without deadlocking, and use the connection which was just initialised. In updating the callback logic, we also fix a minor bug: previously we would not generate down/up events if we detected a failed connection and succeeded creating a new one on the first attempt. This is currently only of limited importance, as we are only marking the service down or up for the scheduler, but could be more significant when we add additional work to the callback. Change-Id: Idf0f20d711f015e9f1331d5f65397aca2d67951a --- nova/tests/unit/virt/libvirt/test_driver.py | 1 + nova/tests/unit/virt/libvirt/test_host.py | 68 ++++++++++++++++ nova/tests/unit/virt/test_virt_drivers.py | 4 + nova/virt/libvirt/host.py | 86 ++++++++++++++++----- 4 files changed, 139 insertions(+), 20 deletions(-) diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index ea21da60ff..1396c26206 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -10982,6 +10982,7 @@ class LibvirtConnTestCase(test.NoDBTestCase): drvr.init_host("wibble") drvr.get_num_instances() + drvr._host._dispatch_conn_event() self.assertFalse(service_mock.disabled) self.assertIsNone(service_mock.disabled_reason) diff --git a/nova/tests/unit/virt/libvirt/test_host.py b/nova/tests/unit/virt/libvirt/test_host.py index a1627ac1c0..456560dc81 100644 --- a/nova/tests/unit/virt/libvirt/test_host.py +++ b/nova/tests/unit/virt/libvirt/test_host.py @@ -33,6 +33,11 @@ from nova.virt.libvirt import guest as libvirt_guest from nova.virt.libvirt import host +class StringMatcher(object): + def __eq__(self, other): + return isinstance(other, six.string_types) + + class FakeVirtDomain(object): def __init__(self, id=-1, name=None): @@ -301,6 +306,69 @@ class HostTestCase(test.NoDBTestCase): self.assertEqual(self.connect_calls, 1) self.assertEqual(self.register_calls, 1) + @mock.patch.object(host.Host, "_connect") + def test_conn_event(self, mock_conn): + handler = mock.MagicMock() + h = host.Host("qemu:///system", conn_event_handler=handler) + + h.get_connection() + h._dispatch_conn_event() + + handler.assert_called_once_with(True, None) + + @mock.patch.object(host.Host, "_connect") + def test_conn_event_fail(self, mock_conn): + handler = mock.MagicMock() + h = host.Host("qemu:///system", conn_event_handler=handler) + mock_conn.side_effect = fakelibvirt.libvirtError('test') + + self.assertRaises(exception.HypervisorUnavailable, h.get_connection) + h._dispatch_conn_event() + + handler.assert_called_once_with(False, StringMatcher()) + + # Attempt to get a second connection, and assert that we don't add + # queue a second callback. Note that we can't call + # _dispatch_conn_event() and assert no additional call to the handler + # here as above. This is because we haven't added an event, so it would + # block. We mock the helper method which queues an event for callback + # instead. + with mock.patch.object(h, '_queue_conn_event_handler') as mock_queue: + self.assertRaises(exception.HypervisorUnavailable, + h.get_connection) + mock_queue.assert_not_called() + + @mock.patch.object(host.Host, "_test_connection") + @mock.patch.object(host.Host, "_connect") + def test_conn_event_up_down(self, mock_conn, mock_test_conn): + handler = mock.MagicMock() + h = host.Host("qemu:///system", conn_event_handler=handler) + mock_conn.side_effect = (mock.MagicMock(), + fakelibvirt.libvirtError('test')) + mock_test_conn.return_value = False + + h.get_connection() + self.assertRaises(exception.HypervisorUnavailable, h.get_connection) + h._dispatch_conn_event() + h._dispatch_conn_event() + + handler.assert_has_calls([ + mock.call(True, None), + mock.call(False, StringMatcher()) + ]) + + @mock.patch.object(host.Host, "_connect") + def test_conn_event_thread(self, mock_conn): + event = eventlet.event.Event() + h = host.Host("qemu:///system", conn_event_handler=event.send) + h.initialize() + + h.get_connection() + event.wait() + # This test will timeout if it fails. Success is implicit in a + # timely return from wait(), indicating that the connection event + # handler was called. + @mock.patch.object(fakelibvirt.virConnect, "getLibVersion") @mock.patch.object(fakelibvirt.virConnect, "getVersion") @mock.patch.object(fakelibvirt.virConnect, "getType") diff --git a/nova/tests/unit/virt/test_virt_drivers.py b/nova/tests/unit/virt/test_virt_drivers.py index 3c2f36e2bd..0be898b5dc 100644 --- a/nova/tests/unit/virt/test_virt_drivers.py +++ b/nova/tests/unit/virt/test_virt_drivers.py @@ -104,6 +104,10 @@ class _FakeDriverBackendTestCase(object): 'nova.virt.libvirt.driver.connector', fake_os_brick_connector)) + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.host.Host._conn_event_thread', + lambda *args: None)) + self.flags(rescue_image_id="2", rescue_kernel_id="3", rescue_ramdisk_id=None, diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 1e737902c1..5ccdc20855 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -86,7 +86,9 @@ class Host(object): self._uri = uri self._read_only = read_only + self._initial_connection = True self._conn_event_handler = conn_event_handler + self._conn_event_handler_queue = six.moves.queue.Queue() self._lifecycle_event_handler = lifecycle_event_handler self._skip_list_all_domains = False self._caps = None @@ -130,6 +132,26 @@ class Host(object): while True: self._dispatch_events() + def _conn_event_thread(self): + """Dispatches async connection events""" + # NOTE(mdbooth): This thread doesn't need to jump through the same + # hoops as _dispatch_thread because it doesn't interact directly + # with the libvirt native thread. + while True: + self._dispatch_conn_event() + + def _dispatch_conn_event(self): + # NOTE(mdbooth): Splitting out this loop looks redundant, but it + # means we can easily dispatch events synchronously from tests and + # it isn't completely awful. + handler = self._conn_event_handler_queue.get() + try: + handler() + except Exception: + LOG.exception(_LE('Exception handling connection event')) + finally: + self._conn_event_handler_queue.task_done() + @staticmethod def _event_lifecycle_callback(conn, dom, event, detail, opaque): """Receives lifecycle events from libvirt. @@ -261,8 +283,7 @@ class Host(object): reason = str(last_close_event['reason']) msg = _("Connection to libvirt lost: %s") % reason self._wrapped_conn = None - if self._conn_event_handler is not None: - self._conn_event_handler(False, msg) + self._queue_conn_event_handler(False, msg) def _event_emit_delayed(self, event): """Emit events - possibly delayed.""" @@ -344,21 +365,9 @@ class Host(object): def _get_new_connection(self): # call with _wrapped_conn_lock held LOG.debug('Connecting to libvirt: %s', self._uri) - wrapped_conn = None - try: - wrapped_conn = self._connect(self._uri, self._read_only) - finally: - # Enabling the compute service, in case it was disabled - # since the connection was successful. - disable_reason = None - if not wrapped_conn: - disable_reason = 'Failed to connect to libvirt' - - if self._conn_event_handler is not None: - self._conn_event_handler(bool(wrapped_conn), disable_reason) - - self._wrapped_conn = wrapped_conn + # This will raise an exception on failure + wrapped_conn = self._connect(self._uri, self._read_only) try: LOG.debug("Registering for lifecycle events %s", self) @@ -390,14 +399,47 @@ class Host(object): return wrapped_conn + def _queue_conn_event_handler(self, *args, **kwargs): + if self._conn_event_handler is None: + return + + def handler(): + return self._conn_event_handler(*args, **kwargs) + + self._conn_event_handler_queue.put(handler) + def _get_connection(self): # multiple concurrent connections are protected by _wrapped_conn_lock with self._wrapped_conn_lock: - wrapped_conn = self._wrapped_conn - if not wrapped_conn or not self._test_connection(wrapped_conn): - wrapped_conn = self._get_new_connection() + # Drop the existing connection if it is not usable + if (self._wrapped_conn is not None and + not self._test_connection(self._wrapped_conn)): + self._wrapped_conn = None + # Connection was previously up, and went down + self._queue_conn_event_handler( + False, _('Connection to libvirt lost')) - return wrapped_conn + if self._wrapped_conn is None: + try: + # This will raise if it fails to get a connection + self._wrapped_conn = self._get_new_connection() + except Exception as ex: + with excutils.save_and_reraise_exception(): + # If we previously had a connection and it went down, + # we generated a down event for that above. + # We also want to generate a down event for an initial + # failure, which won't be handled above. + if self._initial_connection: + self._queue_conn_event_handler( + False, + _('Failed to connect to libvirt: %(msg)s') % + {'msg': ex}) + finally: + self._initial_connection = False + + self._queue_conn_event_handler(True, None) + + return self._wrapped_conn def get_connection(self): """Returns a connection to the hypervisor @@ -433,6 +475,10 @@ class Host(object): libvirt.registerErrorHandler(self._libvirt_error_handler, None) libvirt.virEventRegisterDefaultImpl() self._init_events() + + LOG.debug("Starting connection event dispatch thread") + utils.spawn(self._conn_event_thread) + self._initialized = True def _version_check(self, lv_ver=None, hv_ver=None, hv_type=None,