diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index d8f67dbdf3..8ef70a069a 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -11096,6 +11096,7 @@ class LibvirtConnTestCase(test.NoDBTestCase): drvr.init_host("wibble") drvr.get_num_instances() + drvr._host._dispatch_conn_event() self.assertFalse(service_mock.disabled) self.assertIsNone(service_mock.disabled_reason) diff --git a/nova/tests/unit/virt/libvirt/test_host.py b/nova/tests/unit/virt/libvirt/test_host.py index a1627ac1c0..456560dc81 100644 --- a/nova/tests/unit/virt/libvirt/test_host.py +++ b/nova/tests/unit/virt/libvirt/test_host.py @@ -33,6 +33,11 @@ from nova.virt.libvirt import guest as libvirt_guest from nova.virt.libvirt import host +class StringMatcher(object): + def __eq__(self, other): + return isinstance(other, six.string_types) + + class FakeVirtDomain(object): def __init__(self, id=-1, name=None): @@ -301,6 +306,69 @@ class HostTestCase(test.NoDBTestCase): self.assertEqual(self.connect_calls, 1) self.assertEqual(self.register_calls, 1) + @mock.patch.object(host.Host, "_connect") + def test_conn_event(self, mock_conn): + handler = mock.MagicMock() + h = host.Host("qemu:///system", conn_event_handler=handler) + + h.get_connection() + h._dispatch_conn_event() + + handler.assert_called_once_with(True, None) + + @mock.patch.object(host.Host, "_connect") + def test_conn_event_fail(self, mock_conn): + handler = mock.MagicMock() + h = host.Host("qemu:///system", conn_event_handler=handler) + mock_conn.side_effect = fakelibvirt.libvirtError('test') + + self.assertRaises(exception.HypervisorUnavailable, h.get_connection) + h._dispatch_conn_event() + + handler.assert_called_once_with(False, StringMatcher()) + + # Attempt to get a second connection, and assert that we don't add + # queue a second callback. Note that we can't call + # _dispatch_conn_event() and assert no additional call to the handler + # here as above. This is because we haven't added an event, so it would + # block. We mock the helper method which queues an event for callback + # instead. + with mock.patch.object(h, '_queue_conn_event_handler') as mock_queue: + self.assertRaises(exception.HypervisorUnavailable, + h.get_connection) + mock_queue.assert_not_called() + + @mock.patch.object(host.Host, "_test_connection") + @mock.patch.object(host.Host, "_connect") + def test_conn_event_up_down(self, mock_conn, mock_test_conn): + handler = mock.MagicMock() + h = host.Host("qemu:///system", conn_event_handler=handler) + mock_conn.side_effect = (mock.MagicMock(), + fakelibvirt.libvirtError('test')) + mock_test_conn.return_value = False + + h.get_connection() + self.assertRaises(exception.HypervisorUnavailable, h.get_connection) + h._dispatch_conn_event() + h._dispatch_conn_event() + + handler.assert_has_calls([ + mock.call(True, None), + mock.call(False, StringMatcher()) + ]) + + @mock.patch.object(host.Host, "_connect") + def test_conn_event_thread(self, mock_conn): + event = eventlet.event.Event() + h = host.Host("qemu:///system", conn_event_handler=event.send) + h.initialize() + + h.get_connection() + event.wait() + # This test will timeout if it fails. Success is implicit in a + # timely return from wait(), indicating that the connection event + # handler was called. + @mock.patch.object(fakelibvirt.virConnect, "getLibVersion") @mock.patch.object(fakelibvirt.virConnect, "getVersion") @mock.patch.object(fakelibvirt.virConnect, "getType") diff --git a/nova/tests/unit/virt/test_virt_drivers.py b/nova/tests/unit/virt/test_virt_drivers.py index d8d2883237..a071a85f27 100644 --- a/nova/tests/unit/virt/test_virt_drivers.py +++ b/nova/tests/unit/virt/test_virt_drivers.py @@ -104,6 +104,10 @@ class _FakeDriverBackendTestCase(object): 'nova.virt.libvirt.driver.connector', fake_os_brick_connector)) + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.host.Host._conn_event_thread', + lambda *args: None)) + self.flags(rescue_image_id="2", rescue_kernel_id="3", rescue_ramdisk_id=None, diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 1e737902c1..5ccdc20855 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -86,7 +86,9 @@ class Host(object): self._uri = uri self._read_only = read_only + self._initial_connection = True self._conn_event_handler = conn_event_handler + self._conn_event_handler_queue = six.moves.queue.Queue() self._lifecycle_event_handler = lifecycle_event_handler self._skip_list_all_domains = False self._caps = None @@ -130,6 +132,26 @@ class Host(object): while True: self._dispatch_events() + def _conn_event_thread(self): + """Dispatches async connection events""" + # NOTE(mdbooth): This thread doesn't need to jump through the same + # hoops as _dispatch_thread because it doesn't interact directly + # with the libvirt native thread. + while True: + self._dispatch_conn_event() + + def _dispatch_conn_event(self): + # NOTE(mdbooth): Splitting out this loop looks redundant, but it + # means we can easily dispatch events synchronously from tests and + # it isn't completely awful. + handler = self._conn_event_handler_queue.get() + try: + handler() + except Exception: + LOG.exception(_LE('Exception handling connection event')) + finally: + self._conn_event_handler_queue.task_done() + @staticmethod def _event_lifecycle_callback(conn, dom, event, detail, opaque): """Receives lifecycle events from libvirt. @@ -261,8 +283,7 @@ class Host(object): reason = str(last_close_event['reason']) msg = _("Connection to libvirt lost: %s") % reason self._wrapped_conn = None - if self._conn_event_handler is not None: - self._conn_event_handler(False, msg) + self._queue_conn_event_handler(False, msg) def _event_emit_delayed(self, event): """Emit events - possibly delayed.""" @@ -344,21 +365,9 @@ class Host(object): def _get_new_connection(self): # call with _wrapped_conn_lock held LOG.debug('Connecting to libvirt: %s', self._uri) - wrapped_conn = None - try: - wrapped_conn = self._connect(self._uri, self._read_only) - finally: - # Enabling the compute service, in case it was disabled - # since the connection was successful. - disable_reason = None - if not wrapped_conn: - disable_reason = 'Failed to connect to libvirt' - - if self._conn_event_handler is not None: - self._conn_event_handler(bool(wrapped_conn), disable_reason) - - self._wrapped_conn = wrapped_conn + # This will raise an exception on failure + wrapped_conn = self._connect(self._uri, self._read_only) try: LOG.debug("Registering for lifecycle events %s", self) @@ -390,14 +399,47 @@ class Host(object): return wrapped_conn + def _queue_conn_event_handler(self, *args, **kwargs): + if self._conn_event_handler is None: + return + + def handler(): + return self._conn_event_handler(*args, **kwargs) + + self._conn_event_handler_queue.put(handler) + def _get_connection(self): # multiple concurrent connections are protected by _wrapped_conn_lock with self._wrapped_conn_lock: - wrapped_conn = self._wrapped_conn - if not wrapped_conn or not self._test_connection(wrapped_conn): - wrapped_conn = self._get_new_connection() + # Drop the existing connection if it is not usable + if (self._wrapped_conn is not None and + not self._test_connection(self._wrapped_conn)): + self._wrapped_conn = None + # Connection was previously up, and went down + self._queue_conn_event_handler( + False, _('Connection to libvirt lost')) - return wrapped_conn + if self._wrapped_conn is None: + try: + # This will raise if it fails to get a connection + self._wrapped_conn = self._get_new_connection() + except Exception as ex: + with excutils.save_and_reraise_exception(): + # If we previously had a connection and it went down, + # we generated a down event for that above. + # We also want to generate a down event for an initial + # failure, which won't be handled above. + if self._initial_connection: + self._queue_conn_event_handler( + False, + _('Failed to connect to libvirt: %(msg)s') % + {'msg': ex}) + finally: + self._initial_connection = False + + self._queue_conn_event_handler(True, None) + + return self._wrapped_conn def get_connection(self): """Returns a connection to the hypervisor @@ -433,6 +475,10 @@ class Host(object): libvirt.registerErrorHandler(self._libvirt_error_handler, None) libvirt.virEventRegisterDefaultImpl() self._init_events() + + LOG.debug("Starting connection event dispatch thread") + utils.spawn(self._conn_event_thread) + self._initialized = True def _version_check(self, lv_ver=None, hv_ver=None, hv_type=None,