Merge "libvirt: Call host connection callbacks asynchronously"
This commit is contained in:
@@ -11096,6 +11096,7 @@ class LibvirtConnTestCase(test.NoDBTestCase):
|
||||
|
||||
drvr.init_host("wibble")
|
||||
drvr.get_num_instances()
|
||||
drvr._host._dispatch_conn_event()
|
||||
self.assertFalse(service_mock.disabled)
|
||||
self.assertIsNone(service_mock.disabled_reason)
|
||||
|
||||
|
||||
@@ -33,6 +33,11 @@ from nova.virt.libvirt import guest as libvirt_guest
|
||||
from nova.virt.libvirt import host
|
||||
|
||||
|
||||
class StringMatcher(object):
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, six.string_types)
|
||||
|
||||
|
||||
class FakeVirtDomain(object):
|
||||
|
||||
def __init__(self, id=-1, name=None):
|
||||
@@ -301,6 +306,69 @@ class HostTestCase(test.NoDBTestCase):
|
||||
self.assertEqual(self.connect_calls, 1)
|
||||
self.assertEqual(self.register_calls, 1)
|
||||
|
||||
@mock.patch.object(host.Host, "_connect")
|
||||
def test_conn_event(self, mock_conn):
|
||||
handler = mock.MagicMock()
|
||||
h = host.Host("qemu:///system", conn_event_handler=handler)
|
||||
|
||||
h.get_connection()
|
||||
h._dispatch_conn_event()
|
||||
|
||||
handler.assert_called_once_with(True, None)
|
||||
|
||||
@mock.patch.object(host.Host, "_connect")
|
||||
def test_conn_event_fail(self, mock_conn):
|
||||
handler = mock.MagicMock()
|
||||
h = host.Host("qemu:///system", conn_event_handler=handler)
|
||||
mock_conn.side_effect = fakelibvirt.libvirtError('test')
|
||||
|
||||
self.assertRaises(exception.HypervisorUnavailable, h.get_connection)
|
||||
h._dispatch_conn_event()
|
||||
|
||||
handler.assert_called_once_with(False, StringMatcher())
|
||||
|
||||
# Attempt to get a second connection, and assert that we don't add
|
||||
# queue a second callback. Note that we can't call
|
||||
# _dispatch_conn_event() and assert no additional call to the handler
|
||||
# here as above. This is because we haven't added an event, so it would
|
||||
# block. We mock the helper method which queues an event for callback
|
||||
# instead.
|
||||
with mock.patch.object(h, '_queue_conn_event_handler') as mock_queue:
|
||||
self.assertRaises(exception.HypervisorUnavailable,
|
||||
h.get_connection)
|
||||
mock_queue.assert_not_called()
|
||||
|
||||
@mock.patch.object(host.Host, "_test_connection")
|
||||
@mock.patch.object(host.Host, "_connect")
|
||||
def test_conn_event_up_down(self, mock_conn, mock_test_conn):
|
||||
handler = mock.MagicMock()
|
||||
h = host.Host("qemu:///system", conn_event_handler=handler)
|
||||
mock_conn.side_effect = (mock.MagicMock(),
|
||||
fakelibvirt.libvirtError('test'))
|
||||
mock_test_conn.return_value = False
|
||||
|
||||
h.get_connection()
|
||||
self.assertRaises(exception.HypervisorUnavailable, h.get_connection)
|
||||
h._dispatch_conn_event()
|
||||
h._dispatch_conn_event()
|
||||
|
||||
handler.assert_has_calls([
|
||||
mock.call(True, None),
|
||||
mock.call(False, StringMatcher())
|
||||
])
|
||||
|
||||
@mock.patch.object(host.Host, "_connect")
|
||||
def test_conn_event_thread(self, mock_conn):
|
||||
event = eventlet.event.Event()
|
||||
h = host.Host("qemu:///system", conn_event_handler=event.send)
|
||||
h.initialize()
|
||||
|
||||
h.get_connection()
|
||||
event.wait()
|
||||
# This test will timeout if it fails. Success is implicit in a
|
||||
# timely return from wait(), indicating that the connection event
|
||||
# handler was called.
|
||||
|
||||
@mock.patch.object(fakelibvirt.virConnect, "getLibVersion")
|
||||
@mock.patch.object(fakelibvirt.virConnect, "getVersion")
|
||||
@mock.patch.object(fakelibvirt.virConnect, "getType")
|
||||
|
||||
@@ -104,6 +104,10 @@ class _FakeDriverBackendTestCase(object):
|
||||
'nova.virt.libvirt.driver.connector',
|
||||
fake_os_brick_connector))
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'nova.virt.libvirt.host.Host._conn_event_thread',
|
||||
lambda *args: None))
|
||||
|
||||
self.flags(rescue_image_id="2",
|
||||
rescue_kernel_id="3",
|
||||
rescue_ramdisk_id=None,
|
||||
|
||||
+66
-20
@@ -86,7 +86,9 @@ class Host(object):
|
||||
|
||||
self._uri = uri
|
||||
self._read_only = read_only
|
||||
self._initial_connection = True
|
||||
self._conn_event_handler = conn_event_handler
|
||||
self._conn_event_handler_queue = six.moves.queue.Queue()
|
||||
self._lifecycle_event_handler = lifecycle_event_handler
|
||||
self._skip_list_all_domains = False
|
||||
self._caps = None
|
||||
@@ -130,6 +132,26 @@ class Host(object):
|
||||
while True:
|
||||
self._dispatch_events()
|
||||
|
||||
def _conn_event_thread(self):
|
||||
"""Dispatches async connection events"""
|
||||
# NOTE(mdbooth): This thread doesn't need to jump through the same
|
||||
# hoops as _dispatch_thread because it doesn't interact directly
|
||||
# with the libvirt native thread.
|
||||
while True:
|
||||
self._dispatch_conn_event()
|
||||
|
||||
def _dispatch_conn_event(self):
|
||||
# NOTE(mdbooth): Splitting out this loop looks redundant, but it
|
||||
# means we can easily dispatch events synchronously from tests and
|
||||
# it isn't completely awful.
|
||||
handler = self._conn_event_handler_queue.get()
|
||||
try:
|
||||
handler()
|
||||
except Exception:
|
||||
LOG.exception(_LE('Exception handling connection event'))
|
||||
finally:
|
||||
self._conn_event_handler_queue.task_done()
|
||||
|
||||
@staticmethod
|
||||
def _event_lifecycle_callback(conn, dom, event, detail, opaque):
|
||||
"""Receives lifecycle events from libvirt.
|
||||
@@ -261,8 +283,7 @@ class Host(object):
|
||||
reason = str(last_close_event['reason'])
|
||||
msg = _("Connection to libvirt lost: %s") % reason
|
||||
self._wrapped_conn = None
|
||||
if self._conn_event_handler is not None:
|
||||
self._conn_event_handler(False, msg)
|
||||
self._queue_conn_event_handler(False, msg)
|
||||
|
||||
def _event_emit_delayed(self, event):
|
||||
"""Emit events - possibly delayed."""
|
||||
@@ -344,21 +365,9 @@ class Host(object):
|
||||
def _get_new_connection(self):
|
||||
# call with _wrapped_conn_lock held
|
||||
LOG.debug('Connecting to libvirt: %s', self._uri)
|
||||
wrapped_conn = None
|
||||
|
||||
try:
|
||||
wrapped_conn = self._connect(self._uri, self._read_only)
|
||||
finally:
|
||||
# Enabling the compute service, in case it was disabled
|
||||
# since the connection was successful.
|
||||
disable_reason = None
|
||||
if not wrapped_conn:
|
||||
disable_reason = 'Failed to connect to libvirt'
|
||||
|
||||
if self._conn_event_handler is not None:
|
||||
self._conn_event_handler(bool(wrapped_conn), disable_reason)
|
||||
|
||||
self._wrapped_conn = wrapped_conn
|
||||
# This will raise an exception on failure
|
||||
wrapped_conn = self._connect(self._uri, self._read_only)
|
||||
|
||||
try:
|
||||
LOG.debug("Registering for lifecycle events %s", self)
|
||||
@@ -390,14 +399,47 @@ class Host(object):
|
||||
|
||||
return wrapped_conn
|
||||
|
||||
def _queue_conn_event_handler(self, *args, **kwargs):
|
||||
if self._conn_event_handler is None:
|
||||
return
|
||||
|
||||
def handler():
|
||||
return self._conn_event_handler(*args, **kwargs)
|
||||
|
||||
self._conn_event_handler_queue.put(handler)
|
||||
|
||||
def _get_connection(self):
|
||||
# multiple concurrent connections are protected by _wrapped_conn_lock
|
||||
with self._wrapped_conn_lock:
|
||||
wrapped_conn = self._wrapped_conn
|
||||
if not wrapped_conn or not self._test_connection(wrapped_conn):
|
||||
wrapped_conn = self._get_new_connection()
|
||||
# Drop the existing connection if it is not usable
|
||||
if (self._wrapped_conn is not None and
|
||||
not self._test_connection(self._wrapped_conn)):
|
||||
self._wrapped_conn = None
|
||||
# Connection was previously up, and went down
|
||||
self._queue_conn_event_handler(
|
||||
False, _('Connection to libvirt lost'))
|
||||
|
||||
return wrapped_conn
|
||||
if self._wrapped_conn is None:
|
||||
try:
|
||||
# This will raise if it fails to get a connection
|
||||
self._wrapped_conn = self._get_new_connection()
|
||||
except Exception as ex:
|
||||
with excutils.save_and_reraise_exception():
|
||||
# If we previously had a connection and it went down,
|
||||
# we generated a down event for that above.
|
||||
# We also want to generate a down event for an initial
|
||||
# failure, which won't be handled above.
|
||||
if self._initial_connection:
|
||||
self._queue_conn_event_handler(
|
||||
False,
|
||||
_('Failed to connect to libvirt: %(msg)s') %
|
||||
{'msg': ex})
|
||||
finally:
|
||||
self._initial_connection = False
|
||||
|
||||
self._queue_conn_event_handler(True, None)
|
||||
|
||||
return self._wrapped_conn
|
||||
|
||||
def get_connection(self):
|
||||
"""Returns a connection to the hypervisor
|
||||
@@ -433,6 +475,10 @@ class Host(object):
|
||||
libvirt.registerErrorHandler(self._libvirt_error_handler, None)
|
||||
libvirt.virEventRegisterDefaultImpl()
|
||||
self._init_events()
|
||||
|
||||
LOG.debug("Starting connection event dispatch thread")
|
||||
utils.spawn(self._conn_event_thread)
|
||||
|
||||
self._initialized = True
|
||||
|
||||
def _version_check(self, lv_ver=None, hv_ver=None, hv_type=None,
|
||||
|
||||
Reference in New Issue
Block a user