From a89c1b44c56e04223f61925305b0f48f3791c7d8 Mon Sep 17 00:00:00 2001 From: Balazs Gibizer Date: Mon, 3 Nov 2025 17:28:40 +0100 Subject: [PATCH] Libvirt event handling without eventlet Our libvirt interface is not eventlet aware and not pure python. So eventlet monkey patching is not enough. So the libvirt driver implemented a native polling thread for libvirt and the queue + pipe mechanism to push event from the native polling thread to the main thread with the eventlet event loop. We don't need all of these complications in native thread mode. There we only need a single thread that poll libvirt for the events. The received events can be executed directly on the polling thread as that is no different from any other threads in the system now. To make the change more understandable the event handling logic is moved behind an abstraction that is implemented twice, once for eventlet with the existing implementation just moved around, and once for native threading with the simplified handling. Change-Id: If479574cd91975810098afa8e3c220c7316a9431 Signed-off-by: Balazs Gibizer --- nova/tests/unit/test_utils.py | 12 + nova/tests/unit/virt/libvirt/test_host.py | 84 ++--- nova/utils.py | 10 + nova/virt/libvirt/host.py | 371 ++++++++++++++-------- 4 files changed, 297 insertions(+), 180 deletions(-) diff --git a/nova/tests/unit/test_utils.py b/nova/tests/unit/test_utils.py index 897cbebb3e..a2d7f6d873 100644 --- a/nova/tests/unit/test_utils.py +++ b/nova/tests/unit/test_utils.py @@ -1559,6 +1559,18 @@ class SpawnOnTestCase(test.NoDBTestCase): 'test_spawn_on_warns_on_full_executor.cell_worker', task) +class SpawnAfterTestCase(test.NoDBTestCase): + @mock.patch.object(time, "sleep") + def test_spawn_after_submits_work_after_delay(self, mock_sleep): + task = mock.MagicMock() + + future = utils.spawn_after(0.1, task, 13, foo='bar') + future.result() + + task.assert_called_once_with(13, foo='bar') + mock_sleep.assert_called_once_with(0.1) + + class ExecutorStatsTestCase(test.NoDBTestCase): def setUp(self): diff --git a/nova/tests/unit/virt/libvirt/test_host.py b/nova/tests/unit/virt/libvirt/test_host.py index 2dca412cb5..e575d9a57f 100644 --- a/nova/tests/unit/virt/libvirt/test_host.py +++ b/nova/tests/unit/virt/libvirt/test_host.py @@ -19,7 +19,6 @@ from unittest import mock import ddt import eventlet -from eventlet import greenthread from eventlet import tpool from lxml import etree from oslo_serialization import jsonutils @@ -119,10 +118,18 @@ class HostTestCase(test.NoDBTestCase): self.assertEqual(0, len(log_mock.method_calls), 'LOG should not be used in _connect_auth_cb.') - @mock.patch.object(greenthread, 'spawn_after') + @mock.patch.object(utils, 'spawn_after') def test_event_dispatch(self, mock_spawn_after): # Validate that the libvirt self-pipe for forwarding # events between threads is working sanely + + # Simulate that the dispatch thread runs. + # In threading mode we don't have such thread as we don't need it + # so for threading this is noop. + def run_dispatch(hostimpl): + if not utils.concurrency_mode_threading(): + hostimpl._event_handler._dispatch_events() + def handler(event): got_events.append(event) @@ -130,17 +137,15 @@ class HostTestCase(test.NoDBTestCase): lifecycle_event_handler=handler) got_events = [] - hostimpl._init_events_pipe() - event1 = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STARTED) event2 = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_PAUSED) - hostimpl._queue_event(event1) - hostimpl._queue_event(event2) - hostimpl._dispatch_events() + hostimpl._event_handler._queue_event(event1) + hostimpl._event_handler._queue_event(event2) + run_dispatch(hostimpl) want_events = [event1, event2] self.assertEqual(want_events, got_events) @@ -152,9 +157,9 @@ class HostTestCase(test.NoDBTestCase): "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STOPPED) - hostimpl._queue_event(event3) - hostimpl._queue_event(event4) - hostimpl._dispatch_events() + hostimpl._event_handler._queue_event(event3) + hostimpl._event_handler._queue_event(event4) + run_dispatch(hostimpl) want_events = [event1, event2, event3] self.assertEqual(want_events, got_events) @@ -163,21 +168,13 @@ class HostTestCase(test.NoDBTestCase): mock_spawn_after.assert_called_once_with( hostimpl._lifecycle_delay, hostimpl._event_emit, event4) - def test_event_lifecycle(self): - got_events = [] - - # Validate that libvirt events are correctly translated - # to Nova events - def spawn_after(seconds, func, *args, **kwargs): - got_events.append(args[0]) - return mock.Mock(spec=greenthread.GreenThread) - - greenthread.spawn_after = mock.Mock(side_effect=spawn_after) + @mock.patch('nova.virt.libvirt.host.Host._event_emit_delayed') + def test_event_lifecycle(self, mock_emit): hostimpl = host.Host("qemu:///system", lifecycle_event_handler=lambda e: None) + conn = hostimpl.get_connection() - hostimpl._init_events_pipe() fake_dom_xml = """ cef19ce0-0ca2-11df-855d-b19fbce37686 @@ -194,13 +191,18 @@ class HostTestCase(test.NoDBTestCase): hostimpl._event_lifecycle_callback( conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_STOPPED, 0, hostimpl) - hostimpl._dispatch_events() - self.assertEqual(len(got_events), 1) - self.assertIsInstance(got_events[0], event.LifecycleEvent) - self.assertEqual(got_events[0].uuid, - "cef19ce0-0ca2-11df-855d-b19fbce37686") - self.assertEqual(got_events[0].transition, - event.EVENT_LIFECYCLE_STOPPED) + # Simulate that the dispatch thread runs. + # In threading mode we don't have such thread as we don't need it + if not utils.concurrency_mode_threading(): + hostimpl._event_handler._dispatch_events() + + mock_emit.assert_called_once() + args, _ = mock_emit.call_args + got_event = args[0] + self.assertIsInstance(got_event, event.LifecycleEvent) + self.assertEqual( + got_event.uuid, "cef19ce0-0ca2-11df-855d-b19fbce37686") + self.assertEqual(got_event.transition, event.EVENT_LIFECYCLE_STOPPED) def test_event_lifecycle_callback_suspended_postcopy(self): """Tests the suspended lifecycle event with libvirt with post-copy""" @@ -216,7 +218,7 @@ class HostTestCase(test.NoDBTestCase): conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_POSTCOPY, opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual(event.EVENT_LIFECYCLE_POSTCOPY_STARTED, expected_event.transition) @@ -238,7 +240,7 @@ class HostTestCase(test.NoDBTestCase): conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED, opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual(event.EVENT_LIFECYCLE_MIGRATION_COMPLETED, expected_event.transition) get_job_info.assert_called_once_with() @@ -265,7 +267,7 @@ class HostTestCase(test.NoDBTestCase): conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED, opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual(event.EVENT_LIFECYCLE_PAUSED, expected_event.transition) get_job_info.assert_called_once_with() @@ -273,30 +275,30 @@ class HostTestCase(test.NoDBTestCase): test.MatchType(libvirt_guest.Guest), instance=None, logging_ok=False) - def test_event_emit_delayed_call_delayed(self): + @mock.patch.object(utils, 'spawn_after') + def test_event_emit_delayed_call_delayed(self, mock_spawn_after): ev = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STOPPED) - spawn_after_mock = mock.Mock() - greenthread.spawn_after = spawn_after_mock hostimpl = host.Host( 'qemu:///system', lifecycle_event_handler=lambda e: None) hostimpl._event_emit_delayed(ev) - spawn_after_mock.assert_called_once_with( + mock_spawn_after.assert_called_once_with( 15, hostimpl._event_emit, ev) - @mock.patch.object(greenthread, 'spawn_after') + @mock.patch.object(utils, 'spawn_after') def test_event_emit_delayed_call_delayed_pending(self, spawn_after_mock): hostimpl = host.Host( 'qemu:///system', lifecycle_event_handler=lambda e: None) uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686" - gt_mock = mock.Mock() - hostimpl._events_delayed[uuid] = gt_mock + ev = event.LifecycleEvent( uuid, event.EVENT_LIFECYCLE_STOPPED) hostimpl._event_emit_delayed(ev) - gt_mock.cancel.assert_called_once_with() + mock_future = spawn_after_mock.return_value + mock_future.add_done_callback.assert_called_once() self.assertTrue(spawn_after_mock.called) + self.assertIs(mock_future, hostimpl._events_delayed[uuid]) def test_event_delayed_cleanup(self): hostimpl = host.Host( @@ -321,7 +323,7 @@ class HostTestCase(test.NoDBTestCase): dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True) host.Host._event_device_removed_callback( conn, dom, dev='virtio-1', opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual( libvirtevent.DeviceRemovedEvent, type(expected_event)) self.assertEqual( @@ -339,7 +341,7 @@ class HostTestCase(test.NoDBTestCase): dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True) host.Host._event_device_removal_failed_callback( conn, dom, dev='virtio-1', opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual( libvirtevent.DeviceRemovalFailedEvent, type(expected_event)) self.assertEqual( diff --git a/nova/utils.py b/nova/utils.py index aeee85b30c..a17aba5438 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -577,6 +577,16 @@ def spawn(func, *args, **kwargs) -> futurist.Future: return spawn_on(_get_default_executor(), func, *args, **kwargs) +def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future: + """Executing the function asynchronously after the given time.""" + + def delayed(*args, **kwargs): + time.sleep(seconds) + return func(*args, **kwargs) + + return spawn(delayed, *args, **kwargs) + + def _executor_is_full(executor): if concurrency_mode_threading(): # TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 17aced16fa..1b24f8bbbd 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -39,9 +39,6 @@ import queue import threading import typing as ty -from eventlet import greenio -from eventlet import greenthread -from eventlet import patcher from lxml import etree from oslo_log import log as logging from oslo_serialization import jsonutils @@ -78,10 +75,6 @@ except ImportError: CONF = nova.conf.CONF LOG = logging.getLogger(__name__) -native_socket = patcher.original('socket') -native_threading = patcher.original("threading") -native_Queue = patcher.original("queue") - # This list is for libvirt hypervisor drivers that need special handling. # This is *not* the complete list of supported hypervisor drivers. HV_DRIVER_QEMU = "QEMU" @@ -121,6 +114,205 @@ def _get_loaders(): return _loaders +class LibvirtEventHandler: + def __init__(self, conn_event_handler=None, lifecycle_event_handler=None): + self._lifecycle_event_handler = lifecycle_event_handler + self._conn_event_handler = conn_event_handler + + def _queue_event(self, event): + raise NotImplementedError() + + def start(self): + raise NotImplementedError() + + @classmethod + def create(cls, conn_event_handler=None, lifecycle_event_handler=None): + if utils.concurrency_mode_threading(): + return _ThreadingLibvirtEventHandler( + conn_event_handler, lifecycle_event_handler) + else: + return _EventletLibvirtEventHandler( + conn_event_handler, lifecycle_event_handler) + + +class _EventletLibvirtEventHandler(LibvirtEventHandler): + def __init__(self, conn_event_handler=None, lifecycle_event_handler=None): + super().__init__(conn_event_handler, lifecycle_event_handler) + + from eventlet import greenio + from eventlet import patcher + + self.native_threading = patcher.original("threading") + self.native_queue = patcher.original("queue") + + self._event_thread = None + # This is a Queue between the native libvirt event thread + # and the main thread with the eventlet hub. This needs to be + # a native queue. + self._event_queue: queue.Queue[ + virtevent.InstanceEvent | Mapping[str, ty.Any] + ] = self.native_queue.Queue() + + # Create a self-pipe for the native thread to synchronize on. + # + # This code is taken from the eventlet tpool module, under terms + # of the Apache License v2.0. + rpipe, wpipe = os.pipe() + self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0) + self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0) + + def start(self): + """Initializes the libvirt events subsystem. + + This requires running a native thread to provide the + libvirt event loop integration. This forwards events + to a green thread which does the actual dispatching. + """ + libvirt.virEventRegisterDefaultImpl() + + LOG.debug("Starting event thread") + self._event_thread = self.native_threading.Thread( + target=self._native_thread) + self._event_thread.daemon = True + self._event_thread.start() + + LOG.debug("Starting event dispatch greenthread") + utils.spawn(self._dispatch_thread) + + def _queue_event(self, event): + """Puts an event on the queue for dispatch. + + This method is called by the native event thread to + put events on the queue for later dispatch by the + green thread. Any use of logging APIs is forbidden. + """ + + if self._event_queue is None: + return + + # Queue the event... + self._event_queue.put(event) + + # ...then wakeup the green thread to dispatch it + c = ' '.encode() + self._event_notify_send.write(c) + self._event_notify_send.flush() + + def _native_thread(self): + """Receives async events coming in from libvirtd. + + This is a native thread which runs the default + libvirt event loop implementation. This processes + any incoming async events from libvirtd and queues + them for later dispatch. This thread is only + permitted to use libvirt python APIs, and the + driver.queue_event method. In particular any use + of logging is forbidden, since it will confuse + eventlet's greenthread integration + """ + + while True: + libvirt.virEventRunDefaultImpl() + + def _dispatch_thread(self): + """Dispatches async events coming in from libvirtd. + + This is a green thread which waits for events to + arrive from the libvirt event loop thread. This + then dispatches the events to the compute manager. + """ + + while True: + self._dispatch_events() + + def _dispatch_events(self): + """Wait for & dispatch events from native thread + + Blocks until native thread indicates some events + are ready. Then dispatches all queued events. + """ + + # Wait to be notified that there are some + # events pending + try: + _c = self._event_notify_recv.read(1) + assert _c + except ValueError: + return # will be raised when pipe is closed + + # Process as many events as possible without + # blocking + last_close_event = None + # required for mypy + if self._event_queue is None: + return + while not self._event_queue.empty(): + try: + event: virtevent.InstanceEvent | Mapping[str, ty.Any] = ( + self._event_queue.get(block=False)) + if issubclass(type(event), virtevent.InstanceEvent): + self._lifecycle_event_handler(event) + + elif 'conn' in event and 'reason' in event: + last_close_event = event + except self.native_queue.Empty: + pass + if last_close_event is None: + return + + conn = last_close_event['conn'] + reason = str(last_close_event['reason']) + msg = _("Connection to libvirt lost: %s") % reason + self._conn_event_handler(conn, False, msg) + + +class _ThreadingLibvirtEventHandler(LibvirtEventHandler): + def __init__(self, conn_event_handler=None, lifecycle_event_handler=None): + super().__init__(conn_event_handler, lifecycle_event_handler) + + self._event_thread = None + self._started = False + + def start(self): + """Initializes the libvirt events subsystem. + + This requires running a native thread that pools for libvirt events. + """ + libvirt.virEventRegisterDefaultImpl() + + LOG.debug("Starting event thread") + self._event_thread = threading.Thread(target=self._native_thread) + self._event_thread.daemon = True + self._event_thread.start() + + def _queue_event(self, event): + """Puts an event on the queue for dispatch. + + In native threading mode instead of queueing we directly dispatch on + the thread that receives the event as we don't have the requirement to + move the event handler to the main thread with the eventlet hub + """ + + if issubclass(type(event), virtevent.InstanceEvent): + self._lifecycle_event_handler(event) + elif 'conn' in event and 'reason' in event: + conn = event['conn'] + reason = str(event['reason']) + msg = _("Connection to libvirt lost: %s") % reason + self._conn_event_handler(conn, False, msg) + + def _native_thread(self): + """Receives async events coming in from libvirtd. + + This is a native thread which runs the default + libvirt event loop implementation. This processes + any incoming async events from libvirtd. + """ + + while True: + libvirt.virEventRunDefaultImpl() + + class Host(object): def __init__(self, uri, read_only=False, @@ -129,7 +321,14 @@ class Host(object): self._uri = uri self._read_only = read_only self._initial_connection = True + + self._event_handler = LibvirtEventHandler.create( + self._connection_closed, self._event_emit_delayed) + self._conn_event_handler = conn_event_handler + # This queue is just for the async handling of connection closed + # events. In eventlet mode this only pass events within the main + # native thread with the eventlet hub, so no proxying is needed. self._conn_event_handler_queue: queue.Queue[ Callable[[], None] ] = queue.Queue() @@ -141,9 +340,6 @@ class Host(object): self._wrapped_conn = None self._wrapped_conn_lock = threading.Lock() - self._event_queue: queue.Queue[ - virtevent.InstanceEvent | Mapping[str, ty.Any] - ] | None = None self._events_delayed = {} # Note(toabctl): During a reboot of a domain, STOPPED and @@ -195,38 +391,11 @@ class Host(object): # executing proxied calls in a native thread. return utils.tpool_wrap(obj, autowrap=self._libvirt_proxy_classes) - def _native_thread(self): - """Receives async events coming in from libvirtd. - - This is a native thread which runs the default - libvirt event loop implementation. This processes - any incoming async events from libvirtd and queues - them for later dispatch. This thread is only - permitted to use libvirt python APIs, and the - driver.queue_event method. In particular any use - of logging is forbidden, since it will confuse - eventlet's greenthread integration - """ - - while True: - libvirt.virEventRunDefaultImpl() - - def _dispatch_thread(self): - """Dispatches async events coming in from libvirtd. - - This is a green thread which waits for events to - arrive from the libvirt event loop thread. This - then dispatches the events to the compute manager. - """ - - while True: - self._dispatch_events() - def _conn_event_thread(self): """Dispatches async connection events""" # NOTE(mdbooth): This thread doesn't need to jump through the same - # hoops as _dispatch_thread because it doesn't interact directly - # with the libvirt native thread. + # hoops as the lifecycle event handling because it doesn't interact + # directly with the libvirt native thread. while True: self._dispatch_conn_event() @@ -248,12 +417,13 @@ class Host(object): NB: this method is executing in a native thread, not an eventlet coroutine. It can only invoke other libvirt - APIs, or use self._queue_event(). Any use of logging APIs - in particular is forbidden. + APIs, or use self._event_handler._queue_event(). Any use of logging + APIs in particular is forbidden. """ self = opaque uuid = dom.UUIDString() - self._queue_event(libvirtevent.DeviceRemovedEvent(uuid, dev)) + self._event_handler._queue_event( + libvirtevent.DeviceRemovedEvent(uuid, dev)) @staticmethod def _event_device_removal_failed_callback(conn, dom, dev, opaque): @@ -261,12 +431,13 @@ class Host(object): NB: this method is executing in a native thread, not an eventlet coroutine. It can only invoke other libvirt - APIs, or use self._queue_event(). Any use of logging APIs - in particular is forbidden. + APIs, or use self._event_handler._queue_event(). Any use of logging + APIs in particular is forbidden. """ self = opaque uuid = dom.UUIDString() - self._queue_event(libvirtevent.DeviceRemovalFailedEvent(uuid, dev)) + self._event_handler._queue_event( + libvirtevent.DeviceRemovalFailedEvent(uuid, dev)) @staticmethod def _event_lifecycle_callback(conn, dom, event, detail, opaque): @@ -274,8 +445,8 @@ class Host(object): NB: this method is executing in a native thread, not an eventlet coroutine. It can only invoke other libvirt - APIs, or use self._queue_event(). Any use of logging APIs - in particular is forbidden. + APIs, or use self._event_handler._queue_event(). Any use of logging + APIs in particular is forbidden. """ self = opaque @@ -316,11 +487,12 @@ class Host(object): transition = virtevent.EVENT_LIFECYCLE_RESUMED if transition is not None: - self._queue_event(virtevent.LifecycleEvent(uuid, transition)) + self._event_handler._queue_event( + virtevent.LifecycleEvent(uuid, transition)) def _close_callback(self, conn, reason, opaque): close_info = {'conn': conn, 'reason': reason} - self._queue_event(close_info) + self._event_handler._queue_event(close_info) @staticmethod def _test_connection(conn): @@ -359,77 +531,22 @@ class Host(object): flags = libvirt.VIR_CONNECT_RO return self._libvirt_proxy.openAuth(uri, auth, flags) - def _queue_event(self, event): - """Puts an event on the queue for dispatch. - - This method is called by the native event thread to - put events on the queue for later dispatch by the - green thread. Any use of logging APIs is forbidden. - """ - - if self._event_queue is None: - return - - # Queue the event... - self._event_queue.put(event) - - # ...then wakeup the green thread to dispatch it - c = ' '.encode() - self._event_notify_send.write(c) - self._event_notify_send.flush() - - def _dispatch_events(self): - """Wait for & dispatch events from native thread - - Blocks until native thread indicates some events - are ready. Then dispatches all queued events. - """ - - # Wait to be notified that there are some - # events pending - try: - _c = self._event_notify_recv.read(1) - assert _c - except ValueError: - return # will be raised when pipe is closed - - # Process as many events as possible without - # blocking - last_close_event = None - # required for mypy - if self._event_queue is None: - return - while not self._event_queue.empty(): - try: - event: virtevent.InstanceEvent | Mapping[str, ty.Any] = self._event_queue.get(block=False) # noqa: E501 - if issubclass(type(event), virtevent.InstanceEvent): - # call possibly with delay - self._event_emit_delayed(event) - - elif 'conn' in event and 'reason' in event: - last_close_event = event - except native_Queue.Empty: - pass - if last_close_event is None: - return - conn = last_close_event['conn'] + def _connection_closed(self, conn, *args, **kwargs): # get_new_connection may already have disabled the host, # in which case _wrapped_conn is None. with self._wrapped_conn_lock: if conn == self._wrapped_conn: - reason = str(last_close_event['reason']) - msg = _("Connection to libvirt lost: %s") % reason self._wrapped_conn = None - self._queue_conn_event_handler(False, msg) + + self._queue_conn_event_handler(*args, **kwargs) def _event_emit_delayed(self, event): """Emit events - possibly delayed.""" - def event_cleanup(gt, *args, **kwargs): + def event_cleanup(event): """Callback function for greenthread. Called to cleanup the _events_delayed dictionary when an event was called. """ - event = args[0] self._events_delayed.pop(event.uuid, None) # Cleanup possible delayed stop events. @@ -442,12 +559,12 @@ class Host(object): event.transition == virtevent.EVENT_LIFECYCLE_STOPPED): # Delay STOPPED event, as they may be followed by a STARTED # event in case the instance is rebooting - id_ = greenthread.spawn_after(self._lifecycle_delay, - self._event_emit, event) + id_ = utils.spawn_after( + self._lifecycle_delay, self._event_emit, event) self._events_delayed[event.uuid] = id_ # add callback to cleanup self._events_delayed dict after # event was called - id_.link(event_cleanup, event) + id_.add_done_callback(lambda _: event_cleanup(event)) else: self._event_emit(event) @@ -455,37 +572,14 @@ class Host(object): if self._lifecycle_event_handler is not None: self._lifecycle_event_handler(event) - def _init_events_pipe(self): - """Create a self-pipe for the native thread to synchronize on. - - This code is taken from the eventlet tpool module, under terms - of the Apache License v2.0. - """ - - self._event_queue = native_Queue.Queue() - rpipe, wpipe = os.pipe() - self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0) - self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0) - def _init_events(self): """Initializes the libvirt events subsystem. - - This requires running a native thread to provide the - libvirt event loop integration. This forwards events - to a green thread which does the actual dispatching. """ + self._event_handler.start() - self._init_events_pipe() - - LOG.debug("Starting native event thread") - self._event_thread = native_threading.Thread( - target=self._native_thread) - self._event_thread.daemon = True - self._event_thread.start() - - LOG.debug("Starting green dispatch thread") - utils.spawn(self._dispatch_thread) - + # This thread is just for async connection closed event handling. + # In eventlet mode it only handles tasks within the main thread with + # the eventlet hub. LOG.debug("Starting connection event dispatch thread") utils.spawn(self._conn_event_thread) @@ -605,7 +699,6 @@ class Host(object): # connection is used for the first time. Otherwise, the # handler does not get registered. libvirt.registerErrorHandler(self._libvirt_error_handler, None) - libvirt.virEventRegisterDefaultImpl() self._init_events() self._initialized = True