Merge "Libvirt event handling without eventlet"

This commit is contained in:
Zuul
2026-02-05 03:14:05 +00:00
committed by Gerrit Code Review
4 changed files with 297 additions and 180 deletions
+12
View File
@@ -1559,6 +1559,18 @@ class SpawnOnTestCase(test.NoDBTestCase):
'test_spawn_on_warns_on_full_executor.cell_worker', task) 'test_spawn_on_warns_on_full_executor.cell_worker', task)
class SpawnAfterTestCase(test.NoDBTestCase):
@mock.patch.object(time, "sleep")
def test_spawn_after_submits_work_after_delay(self, mock_sleep):
task = mock.MagicMock()
future = utils.spawn_after(0.1, task, 13, foo='bar')
future.result()
task.assert_called_once_with(13, foo='bar')
mock_sleep.assert_called_once_with(0.1)
class ExecutorStatsTestCase(test.NoDBTestCase): class ExecutorStatsTestCase(test.NoDBTestCase):
def setUp(self): def setUp(self):
+43 -41
View File
@@ -19,7 +19,6 @@ from unittest import mock
import ddt import ddt
import eventlet import eventlet
from eventlet import greenthread
from eventlet import tpool from eventlet import tpool
from lxml import etree from lxml import etree
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
@@ -119,10 +118,18 @@ class HostTestCase(test.NoDBTestCase):
self.assertEqual(0, len(log_mock.method_calls), self.assertEqual(0, len(log_mock.method_calls),
'LOG should not be used in _connect_auth_cb.') 'LOG should not be used in _connect_auth_cb.')
@mock.patch.object(greenthread, 'spawn_after') @mock.patch.object(utils, 'spawn_after')
def test_event_dispatch(self, mock_spawn_after): def test_event_dispatch(self, mock_spawn_after):
# Validate that the libvirt self-pipe for forwarding # Validate that the libvirt self-pipe for forwarding
# events between threads is working sanely # events between threads is working sanely
# Simulate that the dispatch thread runs.
# In threading mode we don't have such thread as we don't need it
# so for threading this is noop.
def run_dispatch(hostimpl):
if not utils.concurrency_mode_threading():
hostimpl._event_handler._dispatch_events()
def handler(event): def handler(event):
got_events.append(event) got_events.append(event)
@@ -130,17 +137,15 @@ class HostTestCase(test.NoDBTestCase):
lifecycle_event_handler=handler) lifecycle_event_handler=handler)
got_events = [] got_events = []
hostimpl._init_events_pipe()
event1 = event.LifecycleEvent( event1 = event.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686", "cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_STARTED) event.EVENT_LIFECYCLE_STARTED)
event2 = event.LifecycleEvent( event2 = event.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686", "cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_PAUSED) event.EVENT_LIFECYCLE_PAUSED)
hostimpl._queue_event(event1) hostimpl._event_handler._queue_event(event1)
hostimpl._queue_event(event2) hostimpl._event_handler._queue_event(event2)
hostimpl._dispatch_events() run_dispatch(hostimpl)
want_events = [event1, event2] want_events = [event1, event2]
self.assertEqual(want_events, got_events) self.assertEqual(want_events, got_events)
@@ -152,9 +157,9 @@ class HostTestCase(test.NoDBTestCase):
"cef19ce0-0ca2-11df-855d-b19fbce37686", "cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_STOPPED) event.EVENT_LIFECYCLE_STOPPED)
hostimpl._queue_event(event3) hostimpl._event_handler._queue_event(event3)
hostimpl._queue_event(event4) hostimpl._event_handler._queue_event(event4)
hostimpl._dispatch_events() run_dispatch(hostimpl)
want_events = [event1, event2, event3] want_events = [event1, event2, event3]
self.assertEqual(want_events, got_events) self.assertEqual(want_events, got_events)
@@ -163,21 +168,13 @@ class HostTestCase(test.NoDBTestCase):
mock_spawn_after.assert_called_once_with( mock_spawn_after.assert_called_once_with(
hostimpl._lifecycle_delay, hostimpl._event_emit, event4) hostimpl._lifecycle_delay, hostimpl._event_emit, event4)
def test_event_lifecycle(self): @mock.patch('nova.virt.libvirt.host.Host._event_emit_delayed')
got_events = [] def test_event_lifecycle(self, mock_emit):
# Validate that libvirt events are correctly translated
# to Nova events
def spawn_after(seconds, func, *args, **kwargs):
got_events.append(args[0])
return mock.Mock(spec=greenthread.GreenThread)
greenthread.spawn_after = mock.Mock(side_effect=spawn_after)
hostimpl = host.Host("qemu:///system", hostimpl = host.Host("qemu:///system",
lifecycle_event_handler=lambda e: None) lifecycle_event_handler=lambda e: None)
conn = hostimpl.get_connection() conn = hostimpl.get_connection()
hostimpl._init_events_pipe()
fake_dom_xml = """ fake_dom_xml = """
<domain type='kvm'> <domain type='kvm'>
<uuid>cef19ce0-0ca2-11df-855d-b19fbce37686</uuid> <uuid>cef19ce0-0ca2-11df-855d-b19fbce37686</uuid>
@@ -194,13 +191,18 @@ class HostTestCase(test.NoDBTestCase):
hostimpl._event_lifecycle_callback( hostimpl._event_lifecycle_callback(
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_STOPPED, 0, hostimpl) conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_STOPPED, 0, hostimpl)
hostimpl._dispatch_events() # Simulate that the dispatch thread runs.
self.assertEqual(len(got_events), 1) # In threading mode we don't have such thread as we don't need it
self.assertIsInstance(got_events[0], event.LifecycleEvent) if not utils.concurrency_mode_threading():
self.assertEqual(got_events[0].uuid, hostimpl._event_handler._dispatch_events()
"cef19ce0-0ca2-11df-855d-b19fbce37686")
self.assertEqual(got_events[0].transition, mock_emit.assert_called_once()
event.EVENT_LIFECYCLE_STOPPED) args, _ = mock_emit.call_args
got_event = args[0]
self.assertIsInstance(got_event, event.LifecycleEvent)
self.assertEqual(
got_event.uuid, "cef19ce0-0ca2-11df-855d-b19fbce37686")
self.assertEqual(got_event.transition, event.EVENT_LIFECYCLE_STOPPED)
def test_event_lifecycle_callback_suspended_postcopy(self): def test_event_lifecycle_callback_suspended_postcopy(self):
"""Tests the suspended lifecycle event with libvirt with post-copy""" """Tests the suspended lifecycle event with libvirt with post-copy"""
@@ -216,7 +218,7 @@ class HostTestCase(test.NoDBTestCase):
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED,
detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_POSTCOPY, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_POSTCOPY,
opaque=hostimpl) opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0] expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(event.EVENT_LIFECYCLE_POSTCOPY_STARTED, self.assertEqual(event.EVENT_LIFECYCLE_POSTCOPY_STARTED,
expected_event.transition) expected_event.transition)
@@ -238,7 +240,7 @@ class HostTestCase(test.NoDBTestCase):
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED,
detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED,
opaque=hostimpl) opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0] expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(event.EVENT_LIFECYCLE_MIGRATION_COMPLETED, self.assertEqual(event.EVENT_LIFECYCLE_MIGRATION_COMPLETED,
expected_event.transition) expected_event.transition)
get_job_info.assert_called_once_with() get_job_info.assert_called_once_with()
@@ -265,7 +267,7 @@ class HostTestCase(test.NoDBTestCase):
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED,
detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED,
opaque=hostimpl) opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0] expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(event.EVENT_LIFECYCLE_PAUSED, self.assertEqual(event.EVENT_LIFECYCLE_PAUSED,
expected_event.transition) expected_event.transition)
get_job_info.assert_called_once_with() get_job_info.assert_called_once_with()
@@ -273,30 +275,30 @@ class HostTestCase(test.NoDBTestCase):
test.MatchType(libvirt_guest.Guest), instance=None, test.MatchType(libvirt_guest.Guest), instance=None,
logging_ok=False) logging_ok=False)
def test_event_emit_delayed_call_delayed(self): @mock.patch.object(utils, 'spawn_after')
def test_event_emit_delayed_call_delayed(self, mock_spawn_after):
ev = event.LifecycleEvent( ev = event.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686", "cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_STOPPED) event.EVENT_LIFECYCLE_STOPPED)
spawn_after_mock = mock.Mock()
greenthread.spawn_after = spawn_after_mock
hostimpl = host.Host( hostimpl = host.Host(
'qemu:///system', lifecycle_event_handler=lambda e: None) 'qemu:///system', lifecycle_event_handler=lambda e: None)
hostimpl._event_emit_delayed(ev) hostimpl._event_emit_delayed(ev)
spawn_after_mock.assert_called_once_with( mock_spawn_after.assert_called_once_with(
15, hostimpl._event_emit, ev) 15, hostimpl._event_emit, ev)
@mock.patch.object(greenthread, 'spawn_after') @mock.patch.object(utils, 'spawn_after')
def test_event_emit_delayed_call_delayed_pending(self, spawn_after_mock): def test_event_emit_delayed_call_delayed_pending(self, spawn_after_mock):
hostimpl = host.Host( hostimpl = host.Host(
'qemu:///system', lifecycle_event_handler=lambda e: None) 'qemu:///system', lifecycle_event_handler=lambda e: None)
uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686" uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686"
gt_mock = mock.Mock()
hostimpl._events_delayed[uuid] = gt_mock
ev = event.LifecycleEvent( ev = event.LifecycleEvent(
uuid, event.EVENT_LIFECYCLE_STOPPED) uuid, event.EVENT_LIFECYCLE_STOPPED)
hostimpl._event_emit_delayed(ev) hostimpl._event_emit_delayed(ev)
gt_mock.cancel.assert_called_once_with() mock_future = spawn_after_mock.return_value
mock_future.add_done_callback.assert_called_once()
self.assertTrue(spawn_after_mock.called) self.assertTrue(spawn_after_mock.called)
self.assertIs(mock_future, hostimpl._events_delayed[uuid])
def test_event_delayed_cleanup(self): def test_event_delayed_cleanup(self):
hostimpl = host.Host( hostimpl = host.Host(
@@ -321,7 +323,7 @@ class HostTestCase(test.NoDBTestCase):
dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True) dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True)
host.Host._event_device_removed_callback( host.Host._event_device_removed_callback(
conn, dom, dev='virtio-1', opaque=hostimpl) conn, dom, dev='virtio-1', opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0] expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual( self.assertEqual(
libvirtevent.DeviceRemovedEvent, type(expected_event)) libvirtevent.DeviceRemovedEvent, type(expected_event))
self.assertEqual( self.assertEqual(
@@ -339,7 +341,7 @@ class HostTestCase(test.NoDBTestCase):
dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True) dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True)
host.Host._event_device_removal_failed_callback( host.Host._event_device_removal_failed_callback(
conn, dom, dev='virtio-1', opaque=hostimpl) conn, dom, dev='virtio-1', opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0] expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual( self.assertEqual(
libvirtevent.DeviceRemovalFailedEvent, type(expected_event)) libvirtevent.DeviceRemovalFailedEvent, type(expected_event))
self.assertEqual( self.assertEqual(
+10
View File
@@ -577,6 +577,16 @@ def spawn(func, *args, **kwargs) -> futurist.Future:
return spawn_on(_get_default_executor(), func, *args, **kwargs) return spawn_on(_get_default_executor(), func, *args, **kwargs)
def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future:
"""Executing the function asynchronously after the given time."""
def delayed(*args, **kwargs):
time.sleep(seconds)
return func(*args, **kwargs)
return spawn(delayed, *args, **kwargs)
def _executor_is_full(executor): def _executor_is_full(executor):
if concurrency_mode_threading(): if concurrency_mode_threading():
# TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor # TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor
+232 -139
View File
@@ -39,9 +39,6 @@ import queue
import threading import threading
import typing as ty import typing as ty
from eventlet import greenio
from eventlet import greenthread
from eventlet import patcher
from lxml import etree from lxml import etree
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
@@ -78,10 +75,6 @@ except ImportError:
CONF = nova.conf.CONF CONF = nova.conf.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
native_socket = patcher.original('socket')
native_threading = patcher.original("threading")
native_Queue = patcher.original("queue")
# This list is for libvirt hypervisor drivers that need special handling. # This list is for libvirt hypervisor drivers that need special handling.
# This is *not* the complete list of supported hypervisor drivers. # This is *not* the complete list of supported hypervisor drivers.
HV_DRIVER_QEMU = "QEMU" HV_DRIVER_QEMU = "QEMU"
@@ -121,6 +114,205 @@ def _get_loaders():
return _loaders return _loaders
class LibvirtEventHandler:
def __init__(self, conn_event_handler=None, lifecycle_event_handler=None):
self._lifecycle_event_handler = lifecycle_event_handler
self._conn_event_handler = conn_event_handler
def _queue_event(self, event):
raise NotImplementedError()
def start(self):
raise NotImplementedError()
@classmethod
def create(cls, conn_event_handler=None, lifecycle_event_handler=None):
if utils.concurrency_mode_threading():
return _ThreadingLibvirtEventHandler(
conn_event_handler, lifecycle_event_handler)
else:
return _EventletLibvirtEventHandler(
conn_event_handler, lifecycle_event_handler)
class _EventletLibvirtEventHandler(LibvirtEventHandler):
def __init__(self, conn_event_handler=None, lifecycle_event_handler=None):
super().__init__(conn_event_handler, lifecycle_event_handler)
from eventlet import greenio
from eventlet import patcher
self.native_threading = patcher.original("threading")
self.native_queue = patcher.original("queue")
self._event_thread = None
# This is a Queue between the native libvirt event thread
# and the main thread with the eventlet hub. This needs to be
# a native queue.
self._event_queue: queue.Queue[
virtevent.InstanceEvent | Mapping[str, ty.Any]
] = self.native_queue.Queue()
# Create a self-pipe for the native thread to synchronize on.
#
# This code is taken from the eventlet tpool module, under terms
# of the Apache License v2.0.
rpipe, wpipe = os.pipe()
self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0)
self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0)
def start(self):
"""Initializes the libvirt events subsystem.
This requires running a native thread to provide the
libvirt event loop integration. This forwards events
to a green thread which does the actual dispatching.
"""
libvirt.virEventRegisterDefaultImpl()
LOG.debug("Starting event thread")
self._event_thread = self.native_threading.Thread(
target=self._native_thread)
self._event_thread.daemon = True
self._event_thread.start()
LOG.debug("Starting event dispatch greenthread")
utils.spawn(self._dispatch_thread)
def _queue_event(self, event):
"""Puts an event on the queue for dispatch.
This method is called by the native event thread to
put events on the queue for later dispatch by the
green thread. Any use of logging APIs is forbidden.
"""
if self._event_queue is None:
return
# Queue the event...
self._event_queue.put(event)
# ...then wakeup the green thread to dispatch it
c = ' '.encode()
self._event_notify_send.write(c)
self._event_notify_send.flush()
def _native_thread(self):
"""Receives async events coming in from libvirtd.
This is a native thread which runs the default
libvirt event loop implementation. This processes
any incoming async events from libvirtd and queues
them for later dispatch. This thread is only
permitted to use libvirt python APIs, and the
driver.queue_event method. In particular any use
of logging is forbidden, since it will confuse
eventlet's greenthread integration
"""
while True:
libvirt.virEventRunDefaultImpl()
def _dispatch_thread(self):
"""Dispatches async events coming in from libvirtd.
This is a green thread which waits for events to
arrive from the libvirt event loop thread. This
then dispatches the events to the compute manager.
"""
while True:
self._dispatch_events()
def _dispatch_events(self):
"""Wait for & dispatch events from native thread
Blocks until native thread indicates some events
are ready. Then dispatches all queued events.
"""
# Wait to be notified that there are some
# events pending
try:
_c = self._event_notify_recv.read(1)
assert _c
except ValueError:
return # will be raised when pipe is closed
# Process as many events as possible without
# blocking
last_close_event = None
# required for mypy
if self._event_queue is None:
return
while not self._event_queue.empty():
try:
event: virtevent.InstanceEvent | Mapping[str, ty.Any] = (
self._event_queue.get(block=False))
if issubclass(type(event), virtevent.InstanceEvent):
self._lifecycle_event_handler(event)
elif 'conn' in event and 'reason' in event:
last_close_event = event
except self.native_queue.Empty:
pass
if last_close_event is None:
return
conn = last_close_event['conn']
reason = str(last_close_event['reason'])
msg = _("Connection to libvirt lost: %s") % reason
self._conn_event_handler(conn, False, msg)
class _ThreadingLibvirtEventHandler(LibvirtEventHandler):
def __init__(self, conn_event_handler=None, lifecycle_event_handler=None):
super().__init__(conn_event_handler, lifecycle_event_handler)
self._event_thread = None
self._started = False
def start(self):
"""Initializes the libvirt events subsystem.
This requires running a native thread that pools for libvirt events.
"""
libvirt.virEventRegisterDefaultImpl()
LOG.debug("Starting event thread")
self._event_thread = threading.Thread(target=self._native_thread)
self._event_thread.daemon = True
self._event_thread.start()
def _queue_event(self, event):
"""Puts an event on the queue for dispatch.
In native threading mode instead of queueing we directly dispatch on
the thread that receives the event as we don't have the requirement to
move the event handler to the main thread with the eventlet hub
"""
if issubclass(type(event), virtevent.InstanceEvent):
self._lifecycle_event_handler(event)
elif 'conn' in event and 'reason' in event:
conn = event['conn']
reason = str(event['reason'])
msg = _("Connection to libvirt lost: %s") % reason
self._conn_event_handler(conn, False, msg)
def _native_thread(self):
"""Receives async events coming in from libvirtd.
This is a native thread which runs the default
libvirt event loop implementation. This processes
any incoming async events from libvirtd.
"""
while True:
libvirt.virEventRunDefaultImpl()
class Host(object): class Host(object):
def __init__(self, uri, read_only=False, def __init__(self, uri, read_only=False,
@@ -129,7 +321,14 @@ class Host(object):
self._uri = uri self._uri = uri
self._read_only = read_only self._read_only = read_only
self._initial_connection = True self._initial_connection = True
self._event_handler = LibvirtEventHandler.create(
self._connection_closed, self._event_emit_delayed)
self._conn_event_handler = conn_event_handler self._conn_event_handler = conn_event_handler
# This queue is just for the async handling of connection closed
# events. In eventlet mode this only pass events within the main
# native thread with the eventlet hub, so no proxying is needed.
self._conn_event_handler_queue: queue.Queue[ self._conn_event_handler_queue: queue.Queue[
Callable[[], None] Callable[[], None]
] = queue.Queue() ] = queue.Queue()
@@ -141,9 +340,6 @@ class Host(object):
self._wrapped_conn = None self._wrapped_conn = None
self._wrapped_conn_lock = threading.Lock() self._wrapped_conn_lock = threading.Lock()
self._event_queue: queue.Queue[
virtevent.InstanceEvent | Mapping[str, ty.Any]
] | None = None
self._events_delayed = {} self._events_delayed = {}
# Note(toabctl): During a reboot of a domain, STOPPED and # Note(toabctl): During a reboot of a domain, STOPPED and
@@ -195,38 +391,11 @@ class Host(object):
# executing proxied calls in a native thread. # executing proxied calls in a native thread.
return utils.tpool_wrap(obj, autowrap=self._libvirt_proxy_classes) return utils.tpool_wrap(obj, autowrap=self._libvirt_proxy_classes)
def _native_thread(self):
"""Receives async events coming in from libvirtd.
This is a native thread which runs the default
libvirt event loop implementation. This processes
any incoming async events from libvirtd and queues
them for later dispatch. This thread is only
permitted to use libvirt python APIs, and the
driver.queue_event method. In particular any use
of logging is forbidden, since it will confuse
eventlet's greenthread integration
"""
while True:
libvirt.virEventRunDefaultImpl()
def _dispatch_thread(self):
"""Dispatches async events coming in from libvirtd.
This is a green thread which waits for events to
arrive from the libvirt event loop thread. This
then dispatches the events to the compute manager.
"""
while True:
self._dispatch_events()
def _conn_event_thread(self): def _conn_event_thread(self):
"""Dispatches async connection events""" """Dispatches async connection events"""
# NOTE(mdbooth): This thread doesn't need to jump through the same # NOTE(mdbooth): This thread doesn't need to jump through the same
# hoops as _dispatch_thread because it doesn't interact directly # hoops as the lifecycle event handling because it doesn't interact
# with the libvirt native thread. # directly with the libvirt native thread.
while True: while True:
self._dispatch_conn_event() self._dispatch_conn_event()
@@ -248,12 +417,13 @@ class Host(object):
NB: this method is executing in a native thread, not NB: this method is executing in a native thread, not
an eventlet coroutine. It can only invoke other libvirt an eventlet coroutine. It can only invoke other libvirt
APIs, or use self._queue_event(). Any use of logging APIs APIs, or use self._event_handler._queue_event(). Any use of logging
in particular is forbidden. APIs in particular is forbidden.
""" """
self = opaque self = opaque
uuid = dom.UUIDString() uuid = dom.UUIDString()
self._queue_event(libvirtevent.DeviceRemovedEvent(uuid, dev)) self._event_handler._queue_event(
libvirtevent.DeviceRemovedEvent(uuid, dev))
@staticmethod @staticmethod
def _event_device_removal_failed_callback(conn, dom, dev, opaque): def _event_device_removal_failed_callback(conn, dom, dev, opaque):
@@ -261,12 +431,13 @@ class Host(object):
NB: this method is executing in a native thread, not NB: this method is executing in a native thread, not
an eventlet coroutine. It can only invoke other libvirt an eventlet coroutine. It can only invoke other libvirt
APIs, or use self._queue_event(). Any use of logging APIs APIs, or use self._event_handler._queue_event(). Any use of logging
in particular is forbidden. APIs in particular is forbidden.
""" """
self = opaque self = opaque
uuid = dom.UUIDString() uuid = dom.UUIDString()
self._queue_event(libvirtevent.DeviceRemovalFailedEvent(uuid, dev)) self._event_handler._queue_event(
libvirtevent.DeviceRemovalFailedEvent(uuid, dev))
@staticmethod @staticmethod
def _event_lifecycle_callback(conn, dom, event, detail, opaque): def _event_lifecycle_callback(conn, dom, event, detail, opaque):
@@ -274,8 +445,8 @@ class Host(object):
NB: this method is executing in a native thread, not NB: this method is executing in a native thread, not
an eventlet coroutine. It can only invoke other libvirt an eventlet coroutine. It can only invoke other libvirt
APIs, or use self._queue_event(). Any use of logging APIs APIs, or use self._event_handler._queue_event(). Any use of logging
in particular is forbidden. APIs in particular is forbidden.
""" """
self = opaque self = opaque
@@ -316,11 +487,12 @@ class Host(object):
transition = virtevent.EVENT_LIFECYCLE_RESUMED transition = virtevent.EVENT_LIFECYCLE_RESUMED
if transition is not None: if transition is not None:
self._queue_event(virtevent.LifecycleEvent(uuid, transition)) self._event_handler._queue_event(
virtevent.LifecycleEvent(uuid, transition))
def _close_callback(self, conn, reason, opaque): def _close_callback(self, conn, reason, opaque):
close_info = {'conn': conn, 'reason': reason} close_info = {'conn': conn, 'reason': reason}
self._queue_event(close_info) self._event_handler._queue_event(close_info)
@staticmethod @staticmethod
def _test_connection(conn): def _test_connection(conn):
@@ -359,77 +531,22 @@ class Host(object):
flags = libvirt.VIR_CONNECT_RO flags = libvirt.VIR_CONNECT_RO
return self._libvirt_proxy.openAuth(uri, auth, flags) return self._libvirt_proxy.openAuth(uri, auth, flags)
def _queue_event(self, event): def _connection_closed(self, conn, *args, **kwargs):
"""Puts an event on the queue for dispatch.
This method is called by the native event thread to
put events on the queue for later dispatch by the
green thread. Any use of logging APIs is forbidden.
"""
if self._event_queue is None:
return
# Queue the event...
self._event_queue.put(event)
# ...then wakeup the green thread to dispatch it
c = ' '.encode()
self._event_notify_send.write(c)
self._event_notify_send.flush()
def _dispatch_events(self):
"""Wait for & dispatch events from native thread
Blocks until native thread indicates some events
are ready. Then dispatches all queued events.
"""
# Wait to be notified that there are some
# events pending
try:
_c = self._event_notify_recv.read(1)
assert _c
except ValueError:
return # will be raised when pipe is closed
# Process as many events as possible without
# blocking
last_close_event = None
# required for mypy
if self._event_queue is None:
return
while not self._event_queue.empty():
try:
event: virtevent.InstanceEvent | Mapping[str, ty.Any] = self._event_queue.get(block=False) # noqa: E501
if issubclass(type(event), virtevent.InstanceEvent):
# call possibly with delay
self._event_emit_delayed(event)
elif 'conn' in event and 'reason' in event:
last_close_event = event
except native_Queue.Empty:
pass
if last_close_event is None:
return
conn = last_close_event['conn']
# get_new_connection may already have disabled the host, # get_new_connection may already have disabled the host,
# in which case _wrapped_conn is None. # in which case _wrapped_conn is None.
with self._wrapped_conn_lock: with self._wrapped_conn_lock:
if conn == self._wrapped_conn: if conn == self._wrapped_conn:
reason = str(last_close_event['reason'])
msg = _("Connection to libvirt lost: %s") % reason
self._wrapped_conn = None self._wrapped_conn = None
self._queue_conn_event_handler(False, msg)
self._queue_conn_event_handler(*args, **kwargs)
def _event_emit_delayed(self, event): def _event_emit_delayed(self, event):
"""Emit events - possibly delayed.""" """Emit events - possibly delayed."""
def event_cleanup(gt, *args, **kwargs): def event_cleanup(event):
"""Callback function for greenthread. Called """Callback function for greenthread. Called
to cleanup the _events_delayed dictionary when an event to cleanup the _events_delayed dictionary when an event
was called. was called.
""" """
event = args[0]
self._events_delayed.pop(event.uuid, None) self._events_delayed.pop(event.uuid, None)
# Cleanup possible delayed stop events. # Cleanup possible delayed stop events.
@@ -442,12 +559,12 @@ class Host(object):
event.transition == virtevent.EVENT_LIFECYCLE_STOPPED): event.transition == virtevent.EVENT_LIFECYCLE_STOPPED):
# Delay STOPPED event, as they may be followed by a STARTED # Delay STOPPED event, as they may be followed by a STARTED
# event in case the instance is rebooting # event in case the instance is rebooting
id_ = greenthread.spawn_after(self._lifecycle_delay, id_ = utils.spawn_after(
self._event_emit, event) self._lifecycle_delay, self._event_emit, event)
self._events_delayed[event.uuid] = id_ self._events_delayed[event.uuid] = id_
# add callback to cleanup self._events_delayed dict after # add callback to cleanup self._events_delayed dict after
# event was called # event was called
id_.link(event_cleanup, event) id_.add_done_callback(lambda _: event_cleanup(event))
else: else:
self._event_emit(event) self._event_emit(event)
@@ -455,37 +572,14 @@ class Host(object):
if self._lifecycle_event_handler is not None: if self._lifecycle_event_handler is not None:
self._lifecycle_event_handler(event) self._lifecycle_event_handler(event)
def _init_events_pipe(self):
"""Create a self-pipe for the native thread to synchronize on.
This code is taken from the eventlet tpool module, under terms
of the Apache License v2.0.
"""
self._event_queue = native_Queue.Queue()
rpipe, wpipe = os.pipe()
self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0)
self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0)
def _init_events(self): def _init_events(self):
"""Initializes the libvirt events subsystem. """Initializes the libvirt events subsystem.
This requires running a native thread to provide the
libvirt event loop integration. This forwards events
to a green thread which does the actual dispatching.
""" """
self._event_handler.start()
self._init_events_pipe() # This thread is just for async connection closed event handling.
# In eventlet mode it only handles tasks within the main thread with
LOG.debug("Starting native event thread") # the eventlet hub.
self._event_thread = native_threading.Thread(
target=self._native_thread)
self._event_thread.daemon = True
self._event_thread.start()
LOG.debug("Starting green dispatch thread")
utils.spawn(self._dispatch_thread)
LOG.debug("Starting connection event dispatch thread") LOG.debug("Starting connection event dispatch thread")
utils.spawn(self._conn_event_thread) utils.spawn(self._conn_event_thread)
@@ -605,7 +699,6 @@ class Host(object):
# connection is used for the first time. Otherwise, the # connection is used for the first time. Otherwise, the
# handler does not get registered. # handler does not get registered.
libvirt.registerErrorHandler(self._libvirt_error_handler, None) libvirt.registerErrorHandler(self._libvirt_error_handler, None)
libvirt.virEventRegisterDefaultImpl()
self._init_events() self._init_events()
self._initialized = True self._initialized = True