diff --git a/nova/tests/unit/test_utils.py b/nova/tests/unit/test_utils.py index 897cbebb3e..a2d7f6d873 100644 --- a/nova/tests/unit/test_utils.py +++ b/nova/tests/unit/test_utils.py @@ -1559,6 +1559,18 @@ class SpawnOnTestCase(test.NoDBTestCase): 'test_spawn_on_warns_on_full_executor.cell_worker', task) +class SpawnAfterTestCase(test.NoDBTestCase): + @mock.patch.object(time, "sleep") + def test_spawn_after_submits_work_after_delay(self, mock_sleep): + task = mock.MagicMock() + + future = utils.spawn_after(0.1, task, 13, foo='bar') + future.result() + + task.assert_called_once_with(13, foo='bar') + mock_sleep.assert_called_once_with(0.1) + + class ExecutorStatsTestCase(test.NoDBTestCase): def setUp(self): diff --git a/nova/tests/unit/virt/libvirt/test_host.py b/nova/tests/unit/virt/libvirt/test_host.py index 2dca412cb5..e575d9a57f 100644 --- a/nova/tests/unit/virt/libvirt/test_host.py +++ b/nova/tests/unit/virt/libvirt/test_host.py @@ -19,7 +19,6 @@ from unittest import mock import ddt import eventlet -from eventlet import greenthread from eventlet import tpool from lxml import etree from oslo_serialization import jsonutils @@ -119,10 +118,18 @@ class HostTestCase(test.NoDBTestCase): self.assertEqual(0, len(log_mock.method_calls), 'LOG should not be used in _connect_auth_cb.') - @mock.patch.object(greenthread, 'spawn_after') + @mock.patch.object(utils, 'spawn_after') def test_event_dispatch(self, mock_spawn_after): # Validate that the libvirt self-pipe for forwarding # events between threads is working sanely + + # Simulate that the dispatch thread runs. + # In threading mode we don't have such thread as we don't need it + # so for threading this is noop. + def run_dispatch(hostimpl): + if not utils.concurrency_mode_threading(): + hostimpl._event_handler._dispatch_events() + def handler(event): got_events.append(event) @@ -130,17 +137,15 @@ class HostTestCase(test.NoDBTestCase): lifecycle_event_handler=handler) got_events = [] - hostimpl._init_events_pipe() - event1 = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STARTED) event2 = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_PAUSED) - hostimpl._queue_event(event1) - hostimpl._queue_event(event2) - hostimpl._dispatch_events() + hostimpl._event_handler._queue_event(event1) + hostimpl._event_handler._queue_event(event2) + run_dispatch(hostimpl) want_events = [event1, event2] self.assertEqual(want_events, got_events) @@ -152,9 +157,9 @@ class HostTestCase(test.NoDBTestCase): "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STOPPED) - hostimpl._queue_event(event3) - hostimpl._queue_event(event4) - hostimpl._dispatch_events() + hostimpl._event_handler._queue_event(event3) + hostimpl._event_handler._queue_event(event4) + run_dispatch(hostimpl) want_events = [event1, event2, event3] self.assertEqual(want_events, got_events) @@ -163,21 +168,13 @@ class HostTestCase(test.NoDBTestCase): mock_spawn_after.assert_called_once_with( hostimpl._lifecycle_delay, hostimpl._event_emit, event4) - def test_event_lifecycle(self): - got_events = [] - - # Validate that libvirt events are correctly translated - # to Nova events - def spawn_after(seconds, func, *args, **kwargs): - got_events.append(args[0]) - return mock.Mock(spec=greenthread.GreenThread) - - greenthread.spawn_after = mock.Mock(side_effect=spawn_after) + @mock.patch('nova.virt.libvirt.host.Host._event_emit_delayed') + def test_event_lifecycle(self, mock_emit): hostimpl = host.Host("qemu:///system", lifecycle_event_handler=lambda e: None) + conn = hostimpl.get_connection() - hostimpl._init_events_pipe() fake_dom_xml = """ cef19ce0-0ca2-11df-855d-b19fbce37686 @@ -194,13 +191,18 @@ class HostTestCase(test.NoDBTestCase): hostimpl._event_lifecycle_callback( conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_STOPPED, 0, hostimpl) - hostimpl._dispatch_events() - self.assertEqual(len(got_events), 1) - self.assertIsInstance(got_events[0], event.LifecycleEvent) - self.assertEqual(got_events[0].uuid, - "cef19ce0-0ca2-11df-855d-b19fbce37686") - self.assertEqual(got_events[0].transition, - event.EVENT_LIFECYCLE_STOPPED) + # Simulate that the dispatch thread runs. + # In threading mode we don't have such thread as we don't need it + if not utils.concurrency_mode_threading(): + hostimpl._event_handler._dispatch_events() + + mock_emit.assert_called_once() + args, _ = mock_emit.call_args + got_event = args[0] + self.assertIsInstance(got_event, event.LifecycleEvent) + self.assertEqual( + got_event.uuid, "cef19ce0-0ca2-11df-855d-b19fbce37686") + self.assertEqual(got_event.transition, event.EVENT_LIFECYCLE_STOPPED) def test_event_lifecycle_callback_suspended_postcopy(self): """Tests the suspended lifecycle event with libvirt with post-copy""" @@ -216,7 +218,7 @@ class HostTestCase(test.NoDBTestCase): conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_POSTCOPY, opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual(event.EVENT_LIFECYCLE_POSTCOPY_STARTED, expected_event.transition) @@ -238,7 +240,7 @@ class HostTestCase(test.NoDBTestCase): conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED, opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual(event.EVENT_LIFECYCLE_MIGRATION_COMPLETED, expected_event.transition) get_job_info.assert_called_once_with() @@ -265,7 +267,7 @@ class HostTestCase(test.NoDBTestCase): conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED, detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED, opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual(event.EVENT_LIFECYCLE_PAUSED, expected_event.transition) get_job_info.assert_called_once_with() @@ -273,30 +275,30 @@ class HostTestCase(test.NoDBTestCase): test.MatchType(libvirt_guest.Guest), instance=None, logging_ok=False) - def test_event_emit_delayed_call_delayed(self): + @mock.patch.object(utils, 'spawn_after') + def test_event_emit_delayed_call_delayed(self, mock_spawn_after): ev = event.LifecycleEvent( "cef19ce0-0ca2-11df-855d-b19fbce37686", event.EVENT_LIFECYCLE_STOPPED) - spawn_after_mock = mock.Mock() - greenthread.spawn_after = spawn_after_mock hostimpl = host.Host( 'qemu:///system', lifecycle_event_handler=lambda e: None) hostimpl._event_emit_delayed(ev) - spawn_after_mock.assert_called_once_with( + mock_spawn_after.assert_called_once_with( 15, hostimpl._event_emit, ev) - @mock.patch.object(greenthread, 'spawn_after') + @mock.patch.object(utils, 'spawn_after') def test_event_emit_delayed_call_delayed_pending(self, spawn_after_mock): hostimpl = host.Host( 'qemu:///system', lifecycle_event_handler=lambda e: None) uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686" - gt_mock = mock.Mock() - hostimpl._events_delayed[uuid] = gt_mock + ev = event.LifecycleEvent( uuid, event.EVENT_LIFECYCLE_STOPPED) hostimpl._event_emit_delayed(ev) - gt_mock.cancel.assert_called_once_with() + mock_future = spawn_after_mock.return_value + mock_future.add_done_callback.assert_called_once() self.assertTrue(spawn_after_mock.called) + self.assertIs(mock_future, hostimpl._events_delayed[uuid]) def test_event_delayed_cleanup(self): hostimpl = host.Host( @@ -321,7 +323,7 @@ class HostTestCase(test.NoDBTestCase): dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True) host.Host._event_device_removed_callback( conn, dom, dev='virtio-1', opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual( libvirtevent.DeviceRemovedEvent, type(expected_event)) self.assertEqual( @@ -339,7 +341,7 @@ class HostTestCase(test.NoDBTestCase): dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True) host.Host._event_device_removal_failed_callback( conn, dom, dev='virtio-1', opaque=hostimpl) - expected_event = hostimpl._queue_event.call_args[0][0] + expected_event = hostimpl._event_handler._queue_event.call_args[0][0] self.assertEqual( libvirtevent.DeviceRemovalFailedEvent, type(expected_event)) self.assertEqual( diff --git a/nova/utils.py b/nova/utils.py index aeee85b30c..a17aba5438 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -577,6 +577,16 @@ def spawn(func, *args, **kwargs) -> futurist.Future: return spawn_on(_get_default_executor(), func, *args, **kwargs) +def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future: + """Executing the function asynchronously after the given time.""" + + def delayed(*args, **kwargs): + time.sleep(seconds) + return func(*args, **kwargs) + + return spawn(delayed, *args, **kwargs) + + def _executor_is_full(executor): if concurrency_mode_threading(): # TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 17aced16fa..1b24f8bbbd 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -39,9 +39,6 @@ import queue import threading import typing as ty -from eventlet import greenio -from eventlet import greenthread -from eventlet import patcher from lxml import etree from oslo_log import log as logging from oslo_serialization import jsonutils @@ -78,10 +75,6 @@ except ImportError: CONF = nova.conf.CONF LOG = logging.getLogger(__name__) -native_socket = patcher.original('socket') -native_threading = patcher.original("threading") -native_Queue = patcher.original("queue") - # This list is for libvirt hypervisor drivers that need special handling. # This is *not* the complete list of supported hypervisor drivers. HV_DRIVER_QEMU = "QEMU" @@ -121,6 +114,205 @@ def _get_loaders(): return _loaders +class LibvirtEventHandler: + def __init__(self, conn_event_handler=None, lifecycle_event_handler=None): + self._lifecycle_event_handler = lifecycle_event_handler + self._conn_event_handler = conn_event_handler + + def _queue_event(self, event): + raise NotImplementedError() + + def start(self): + raise NotImplementedError() + + @classmethod + def create(cls, conn_event_handler=None, lifecycle_event_handler=None): + if utils.concurrency_mode_threading(): + return _ThreadingLibvirtEventHandler( + conn_event_handler, lifecycle_event_handler) + else: + return _EventletLibvirtEventHandler( + conn_event_handler, lifecycle_event_handler) + + +class _EventletLibvirtEventHandler(LibvirtEventHandler): + def __init__(self, conn_event_handler=None, lifecycle_event_handler=None): + super().__init__(conn_event_handler, lifecycle_event_handler) + + from eventlet import greenio + from eventlet import patcher + + self.native_threading = patcher.original("threading") + self.native_queue = patcher.original("queue") + + self._event_thread = None + # This is a Queue between the native libvirt event thread + # and the main thread with the eventlet hub. This needs to be + # a native queue. + self._event_queue: queue.Queue[ + virtevent.InstanceEvent | Mapping[str, ty.Any] + ] = self.native_queue.Queue() + + # Create a self-pipe for the native thread to synchronize on. + # + # This code is taken from the eventlet tpool module, under terms + # of the Apache License v2.0. + rpipe, wpipe = os.pipe() + self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0) + self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0) + + def start(self): + """Initializes the libvirt events subsystem. + + This requires running a native thread to provide the + libvirt event loop integration. This forwards events + to a green thread which does the actual dispatching. + """ + libvirt.virEventRegisterDefaultImpl() + + LOG.debug("Starting event thread") + self._event_thread = self.native_threading.Thread( + target=self._native_thread) + self._event_thread.daemon = True + self._event_thread.start() + + LOG.debug("Starting event dispatch greenthread") + utils.spawn(self._dispatch_thread) + + def _queue_event(self, event): + """Puts an event on the queue for dispatch. + + This method is called by the native event thread to + put events on the queue for later dispatch by the + green thread. Any use of logging APIs is forbidden. + """ + + if self._event_queue is None: + return + + # Queue the event... + self._event_queue.put(event) + + # ...then wakeup the green thread to dispatch it + c = ' '.encode() + self._event_notify_send.write(c) + self._event_notify_send.flush() + + def _native_thread(self): + """Receives async events coming in from libvirtd. + + This is a native thread which runs the default + libvirt event loop implementation. This processes + any incoming async events from libvirtd and queues + them for later dispatch. This thread is only + permitted to use libvirt python APIs, and the + driver.queue_event method. In particular any use + of logging is forbidden, since it will confuse + eventlet's greenthread integration + """ + + while True: + libvirt.virEventRunDefaultImpl() + + def _dispatch_thread(self): + """Dispatches async events coming in from libvirtd. + + This is a green thread which waits for events to + arrive from the libvirt event loop thread. This + then dispatches the events to the compute manager. + """ + + while True: + self._dispatch_events() + + def _dispatch_events(self): + """Wait for & dispatch events from native thread + + Blocks until native thread indicates some events + are ready. Then dispatches all queued events. + """ + + # Wait to be notified that there are some + # events pending + try: + _c = self._event_notify_recv.read(1) + assert _c + except ValueError: + return # will be raised when pipe is closed + + # Process as many events as possible without + # blocking + last_close_event = None + # required for mypy + if self._event_queue is None: + return + while not self._event_queue.empty(): + try: + event: virtevent.InstanceEvent | Mapping[str, ty.Any] = ( + self._event_queue.get(block=False)) + if issubclass(type(event), virtevent.InstanceEvent): + self._lifecycle_event_handler(event) + + elif 'conn' in event and 'reason' in event: + last_close_event = event + except self.native_queue.Empty: + pass + if last_close_event is None: + return + + conn = last_close_event['conn'] + reason = str(last_close_event['reason']) + msg = _("Connection to libvirt lost: %s") % reason + self._conn_event_handler(conn, False, msg) + + +class _ThreadingLibvirtEventHandler(LibvirtEventHandler): + def __init__(self, conn_event_handler=None, lifecycle_event_handler=None): + super().__init__(conn_event_handler, lifecycle_event_handler) + + self._event_thread = None + self._started = False + + def start(self): + """Initializes the libvirt events subsystem. + + This requires running a native thread that pools for libvirt events. + """ + libvirt.virEventRegisterDefaultImpl() + + LOG.debug("Starting event thread") + self._event_thread = threading.Thread(target=self._native_thread) + self._event_thread.daemon = True + self._event_thread.start() + + def _queue_event(self, event): + """Puts an event on the queue for dispatch. + + In native threading mode instead of queueing we directly dispatch on + the thread that receives the event as we don't have the requirement to + move the event handler to the main thread with the eventlet hub + """ + + if issubclass(type(event), virtevent.InstanceEvent): + self._lifecycle_event_handler(event) + elif 'conn' in event and 'reason' in event: + conn = event['conn'] + reason = str(event['reason']) + msg = _("Connection to libvirt lost: %s") % reason + self._conn_event_handler(conn, False, msg) + + def _native_thread(self): + """Receives async events coming in from libvirtd. + + This is a native thread which runs the default + libvirt event loop implementation. This processes + any incoming async events from libvirtd. + """ + + while True: + libvirt.virEventRunDefaultImpl() + + class Host(object): def __init__(self, uri, read_only=False, @@ -129,7 +321,14 @@ class Host(object): self._uri = uri self._read_only = read_only self._initial_connection = True + + self._event_handler = LibvirtEventHandler.create( + self._connection_closed, self._event_emit_delayed) + self._conn_event_handler = conn_event_handler + # This queue is just for the async handling of connection closed + # events. In eventlet mode this only pass events within the main + # native thread with the eventlet hub, so no proxying is needed. self._conn_event_handler_queue: queue.Queue[ Callable[[], None] ] = queue.Queue() @@ -141,9 +340,6 @@ class Host(object): self._wrapped_conn = None self._wrapped_conn_lock = threading.Lock() - self._event_queue: queue.Queue[ - virtevent.InstanceEvent | Mapping[str, ty.Any] - ] | None = None self._events_delayed = {} # Note(toabctl): During a reboot of a domain, STOPPED and @@ -195,38 +391,11 @@ class Host(object): # executing proxied calls in a native thread. return utils.tpool_wrap(obj, autowrap=self._libvirt_proxy_classes) - def _native_thread(self): - """Receives async events coming in from libvirtd. - - This is a native thread which runs the default - libvirt event loop implementation. This processes - any incoming async events from libvirtd and queues - them for later dispatch. This thread is only - permitted to use libvirt python APIs, and the - driver.queue_event method. In particular any use - of logging is forbidden, since it will confuse - eventlet's greenthread integration - """ - - while True: - libvirt.virEventRunDefaultImpl() - - def _dispatch_thread(self): - """Dispatches async events coming in from libvirtd. - - This is a green thread which waits for events to - arrive from the libvirt event loop thread. This - then dispatches the events to the compute manager. - """ - - while True: - self._dispatch_events() - def _conn_event_thread(self): """Dispatches async connection events""" # NOTE(mdbooth): This thread doesn't need to jump through the same - # hoops as _dispatch_thread because it doesn't interact directly - # with the libvirt native thread. + # hoops as the lifecycle event handling because it doesn't interact + # directly with the libvirt native thread. while True: self._dispatch_conn_event() @@ -248,12 +417,13 @@ class Host(object): NB: this method is executing in a native thread, not an eventlet coroutine. It can only invoke other libvirt - APIs, or use self._queue_event(). Any use of logging APIs - in particular is forbidden. + APIs, or use self._event_handler._queue_event(). Any use of logging + APIs in particular is forbidden. """ self = opaque uuid = dom.UUIDString() - self._queue_event(libvirtevent.DeviceRemovedEvent(uuid, dev)) + self._event_handler._queue_event( + libvirtevent.DeviceRemovedEvent(uuid, dev)) @staticmethod def _event_device_removal_failed_callback(conn, dom, dev, opaque): @@ -261,12 +431,13 @@ class Host(object): NB: this method is executing in a native thread, not an eventlet coroutine. It can only invoke other libvirt - APIs, or use self._queue_event(). Any use of logging APIs - in particular is forbidden. + APIs, or use self._event_handler._queue_event(). Any use of logging + APIs in particular is forbidden. """ self = opaque uuid = dom.UUIDString() - self._queue_event(libvirtevent.DeviceRemovalFailedEvent(uuid, dev)) + self._event_handler._queue_event( + libvirtevent.DeviceRemovalFailedEvent(uuid, dev)) @staticmethod def _event_lifecycle_callback(conn, dom, event, detail, opaque): @@ -274,8 +445,8 @@ class Host(object): NB: this method is executing in a native thread, not an eventlet coroutine. It can only invoke other libvirt - APIs, or use self._queue_event(). Any use of logging APIs - in particular is forbidden. + APIs, or use self._event_handler._queue_event(). Any use of logging + APIs in particular is forbidden. """ self = opaque @@ -316,11 +487,12 @@ class Host(object): transition = virtevent.EVENT_LIFECYCLE_RESUMED if transition is not None: - self._queue_event(virtevent.LifecycleEvent(uuid, transition)) + self._event_handler._queue_event( + virtevent.LifecycleEvent(uuid, transition)) def _close_callback(self, conn, reason, opaque): close_info = {'conn': conn, 'reason': reason} - self._queue_event(close_info) + self._event_handler._queue_event(close_info) @staticmethod def _test_connection(conn): @@ -359,77 +531,22 @@ class Host(object): flags = libvirt.VIR_CONNECT_RO return self._libvirt_proxy.openAuth(uri, auth, flags) - def _queue_event(self, event): - """Puts an event on the queue for dispatch. - - This method is called by the native event thread to - put events on the queue for later dispatch by the - green thread. Any use of logging APIs is forbidden. - """ - - if self._event_queue is None: - return - - # Queue the event... - self._event_queue.put(event) - - # ...then wakeup the green thread to dispatch it - c = ' '.encode() - self._event_notify_send.write(c) - self._event_notify_send.flush() - - def _dispatch_events(self): - """Wait for & dispatch events from native thread - - Blocks until native thread indicates some events - are ready. Then dispatches all queued events. - """ - - # Wait to be notified that there are some - # events pending - try: - _c = self._event_notify_recv.read(1) - assert _c - except ValueError: - return # will be raised when pipe is closed - - # Process as many events as possible without - # blocking - last_close_event = None - # required for mypy - if self._event_queue is None: - return - while not self._event_queue.empty(): - try: - event: virtevent.InstanceEvent | Mapping[str, ty.Any] = self._event_queue.get(block=False) # noqa: E501 - if issubclass(type(event), virtevent.InstanceEvent): - # call possibly with delay - self._event_emit_delayed(event) - - elif 'conn' in event and 'reason' in event: - last_close_event = event - except native_Queue.Empty: - pass - if last_close_event is None: - return - conn = last_close_event['conn'] + def _connection_closed(self, conn, *args, **kwargs): # get_new_connection may already have disabled the host, # in which case _wrapped_conn is None. with self._wrapped_conn_lock: if conn == self._wrapped_conn: - reason = str(last_close_event['reason']) - msg = _("Connection to libvirt lost: %s") % reason self._wrapped_conn = None - self._queue_conn_event_handler(False, msg) + + self._queue_conn_event_handler(*args, **kwargs) def _event_emit_delayed(self, event): """Emit events - possibly delayed.""" - def event_cleanup(gt, *args, **kwargs): + def event_cleanup(event): """Callback function for greenthread. Called to cleanup the _events_delayed dictionary when an event was called. """ - event = args[0] self._events_delayed.pop(event.uuid, None) # Cleanup possible delayed stop events. @@ -442,12 +559,12 @@ class Host(object): event.transition == virtevent.EVENT_LIFECYCLE_STOPPED): # Delay STOPPED event, as they may be followed by a STARTED # event in case the instance is rebooting - id_ = greenthread.spawn_after(self._lifecycle_delay, - self._event_emit, event) + id_ = utils.spawn_after( + self._lifecycle_delay, self._event_emit, event) self._events_delayed[event.uuid] = id_ # add callback to cleanup self._events_delayed dict after # event was called - id_.link(event_cleanup, event) + id_.add_done_callback(lambda _: event_cleanup(event)) else: self._event_emit(event) @@ -455,37 +572,14 @@ class Host(object): if self._lifecycle_event_handler is not None: self._lifecycle_event_handler(event) - def _init_events_pipe(self): - """Create a self-pipe for the native thread to synchronize on. - - This code is taken from the eventlet tpool module, under terms - of the Apache License v2.0. - """ - - self._event_queue = native_Queue.Queue() - rpipe, wpipe = os.pipe() - self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0) - self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0) - def _init_events(self): """Initializes the libvirt events subsystem. - - This requires running a native thread to provide the - libvirt event loop integration. This forwards events - to a green thread which does the actual dispatching. """ + self._event_handler.start() - self._init_events_pipe() - - LOG.debug("Starting native event thread") - self._event_thread = native_threading.Thread( - target=self._native_thread) - self._event_thread.daemon = True - self._event_thread.start() - - LOG.debug("Starting green dispatch thread") - utils.spawn(self._dispatch_thread) - + # This thread is just for async connection closed event handling. + # In eventlet mode it only handles tasks within the main thread with + # the eventlet hub. LOG.debug("Starting connection event dispatch thread") utils.spawn(self._conn_event_thread) @@ -605,7 +699,6 @@ class Host(object): # connection is used for the first time. Otherwise, the # handler does not get registered. libvirt.registerErrorHandler(self._libvirt_error_handler, None) - libvirt.virEventRegisterDefaultImpl() self._init_events() self._initialized = True