Libvirt event handling without eventlet

Our libvirt interface is not eventlet aware and not pure python. So
eventlet monkey patching is not enough. So the libvirt driver
implemented a native polling thread for libvirt and the queue + pipe
mechanism to push event from the native polling thread to the main
thread with the eventlet event loop.

We don't need all of these complications in native thread mode. There we
only need a single thread that poll libvirt for the events. The received
events can be executed directly on the polling thread as that is no
different from any other threads in the system now.

To make the change more understandable the event handling logic is moved
behind an abstraction that is implemented twice, once for eventlet with
the existing implementation just moved around, and once for native
threading with the simplified handling.

Change-Id: If479574cd91975810098afa8e3c220c7316a9431
Signed-off-by: Balazs Gibizer <gibi@redhat.com>
This commit is contained in:
Balazs Gibizer
2025-11-03 17:28:40 +01:00
parent 59a7093915
commit a89c1b44c5
4 changed files with 297 additions and 180 deletions
+12
View File
@@ -1559,6 +1559,18 @@ class SpawnOnTestCase(test.NoDBTestCase):
'test_spawn_on_warns_on_full_executor.cell_worker', task)
class SpawnAfterTestCase(test.NoDBTestCase):
@mock.patch.object(time, "sleep")
def test_spawn_after_submits_work_after_delay(self, mock_sleep):
task = mock.MagicMock()
future = utils.spawn_after(0.1, task, 13, foo='bar')
future.result()
task.assert_called_once_with(13, foo='bar')
mock_sleep.assert_called_once_with(0.1)
class ExecutorStatsTestCase(test.NoDBTestCase):
def setUp(self):
+43 -41
View File
@@ -19,7 +19,6 @@ from unittest import mock
import ddt
import eventlet
from eventlet import greenthread
from eventlet import tpool
from lxml import etree
from oslo_serialization import jsonutils
@@ -119,10 +118,18 @@ class HostTestCase(test.NoDBTestCase):
self.assertEqual(0, len(log_mock.method_calls),
'LOG should not be used in _connect_auth_cb.')
@mock.patch.object(greenthread, 'spawn_after')
@mock.patch.object(utils, 'spawn_after')
def test_event_dispatch(self, mock_spawn_after):
# Validate that the libvirt self-pipe for forwarding
# events between threads is working sanely
# Simulate that the dispatch thread runs.
# In threading mode we don't have such thread as we don't need it
# so for threading this is noop.
def run_dispatch(hostimpl):
if not utils.concurrency_mode_threading():
hostimpl._event_handler._dispatch_events()
def handler(event):
got_events.append(event)
@@ -130,17 +137,15 @@ class HostTestCase(test.NoDBTestCase):
lifecycle_event_handler=handler)
got_events = []
hostimpl._init_events_pipe()
event1 = event.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_STARTED)
event2 = event.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_PAUSED)
hostimpl._queue_event(event1)
hostimpl._queue_event(event2)
hostimpl._dispatch_events()
hostimpl._event_handler._queue_event(event1)
hostimpl._event_handler._queue_event(event2)
run_dispatch(hostimpl)
want_events = [event1, event2]
self.assertEqual(want_events, got_events)
@@ -152,9 +157,9 @@ class HostTestCase(test.NoDBTestCase):
"cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_STOPPED)
hostimpl._queue_event(event3)
hostimpl._queue_event(event4)
hostimpl._dispatch_events()
hostimpl._event_handler._queue_event(event3)
hostimpl._event_handler._queue_event(event4)
run_dispatch(hostimpl)
want_events = [event1, event2, event3]
self.assertEqual(want_events, got_events)
@@ -163,21 +168,13 @@ class HostTestCase(test.NoDBTestCase):
mock_spawn_after.assert_called_once_with(
hostimpl._lifecycle_delay, hostimpl._event_emit, event4)
def test_event_lifecycle(self):
got_events = []
# Validate that libvirt events are correctly translated
# to Nova events
def spawn_after(seconds, func, *args, **kwargs):
got_events.append(args[0])
return mock.Mock(spec=greenthread.GreenThread)
greenthread.spawn_after = mock.Mock(side_effect=spawn_after)
@mock.patch('nova.virt.libvirt.host.Host._event_emit_delayed')
def test_event_lifecycle(self, mock_emit):
hostimpl = host.Host("qemu:///system",
lifecycle_event_handler=lambda e: None)
conn = hostimpl.get_connection()
hostimpl._init_events_pipe()
fake_dom_xml = """
<domain type='kvm'>
<uuid>cef19ce0-0ca2-11df-855d-b19fbce37686</uuid>
@@ -194,13 +191,18 @@ class HostTestCase(test.NoDBTestCase):
hostimpl._event_lifecycle_callback(
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_STOPPED, 0, hostimpl)
hostimpl._dispatch_events()
self.assertEqual(len(got_events), 1)
self.assertIsInstance(got_events[0], event.LifecycleEvent)
self.assertEqual(got_events[0].uuid,
"cef19ce0-0ca2-11df-855d-b19fbce37686")
self.assertEqual(got_events[0].transition,
event.EVENT_LIFECYCLE_STOPPED)
# Simulate that the dispatch thread runs.
# In threading mode we don't have such thread as we don't need it
if not utils.concurrency_mode_threading():
hostimpl._event_handler._dispatch_events()
mock_emit.assert_called_once()
args, _ = mock_emit.call_args
got_event = args[0]
self.assertIsInstance(got_event, event.LifecycleEvent)
self.assertEqual(
got_event.uuid, "cef19ce0-0ca2-11df-855d-b19fbce37686")
self.assertEqual(got_event.transition, event.EVENT_LIFECYCLE_STOPPED)
def test_event_lifecycle_callback_suspended_postcopy(self):
"""Tests the suspended lifecycle event with libvirt with post-copy"""
@@ -216,7 +218,7 @@ class HostTestCase(test.NoDBTestCase):
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED,
detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_POSTCOPY,
opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0]
expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(event.EVENT_LIFECYCLE_POSTCOPY_STARTED,
expected_event.transition)
@@ -238,7 +240,7 @@ class HostTestCase(test.NoDBTestCase):
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED,
detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED,
opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0]
expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(event.EVENT_LIFECYCLE_MIGRATION_COMPLETED,
expected_event.transition)
get_job_info.assert_called_once_with()
@@ -265,7 +267,7 @@ class HostTestCase(test.NoDBTestCase):
conn, dom, fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED,
detail=fakelibvirt.VIR_DOMAIN_EVENT_SUSPENDED_MIGRATED,
opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0]
expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(event.EVENT_LIFECYCLE_PAUSED,
expected_event.transition)
get_job_info.assert_called_once_with()
@@ -273,30 +275,30 @@ class HostTestCase(test.NoDBTestCase):
test.MatchType(libvirt_guest.Guest), instance=None,
logging_ok=False)
def test_event_emit_delayed_call_delayed(self):
@mock.patch.object(utils, 'spawn_after')
def test_event_emit_delayed_call_delayed(self, mock_spawn_after):
ev = event.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
event.EVENT_LIFECYCLE_STOPPED)
spawn_after_mock = mock.Mock()
greenthread.spawn_after = spawn_after_mock
hostimpl = host.Host(
'qemu:///system', lifecycle_event_handler=lambda e: None)
hostimpl._event_emit_delayed(ev)
spawn_after_mock.assert_called_once_with(
mock_spawn_after.assert_called_once_with(
15, hostimpl._event_emit, ev)
@mock.patch.object(greenthread, 'spawn_after')
@mock.patch.object(utils, 'spawn_after')
def test_event_emit_delayed_call_delayed_pending(self, spawn_after_mock):
hostimpl = host.Host(
'qemu:///system', lifecycle_event_handler=lambda e: None)
uuid = "cef19ce0-0ca2-11df-855d-b19fbce37686"
gt_mock = mock.Mock()
hostimpl._events_delayed[uuid] = gt_mock
ev = event.LifecycleEvent(
uuid, event.EVENT_LIFECYCLE_STOPPED)
hostimpl._event_emit_delayed(ev)
gt_mock.cancel.assert_called_once_with()
mock_future = spawn_after_mock.return_value
mock_future.add_done_callback.assert_called_once()
self.assertTrue(spawn_after_mock.called)
self.assertIs(mock_future, hostimpl._events_delayed[uuid])
def test_event_delayed_cleanup(self):
hostimpl = host.Host(
@@ -321,7 +323,7 @@ class HostTestCase(test.NoDBTestCase):
dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True)
host.Host._event_device_removed_callback(
conn, dom, dev='virtio-1', opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0]
expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(
libvirtevent.DeviceRemovedEvent, type(expected_event))
self.assertEqual(
@@ -339,7 +341,7 @@ class HostTestCase(test.NoDBTestCase):
dom = fakelibvirt.Domain(conn, fake_dom_xml, running=True)
host.Host._event_device_removal_failed_callback(
conn, dom, dev='virtio-1', opaque=hostimpl)
expected_event = hostimpl._queue_event.call_args[0][0]
expected_event = hostimpl._event_handler._queue_event.call_args[0][0]
self.assertEqual(
libvirtevent.DeviceRemovalFailedEvent, type(expected_event))
self.assertEqual(
+10
View File
@@ -577,6 +577,16 @@ def spawn(func, *args, **kwargs) -> futurist.Future:
return spawn_on(_get_default_executor(), func, *args, **kwargs)
def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future:
"""Executing the function asynchronously after the given time."""
def delayed(*args, **kwargs):
time.sleep(seconds)
return func(*args, **kwargs)
return spawn(delayed, *args, **kwargs)
def _executor_is_full(executor):
if concurrency_mode_threading():
# TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor
+232 -139
View File
@@ -39,9 +39,6 @@ import queue
import threading
import typing as ty
from eventlet import greenio
from eventlet import greenthread
from eventlet import patcher
from lxml import etree
from oslo_log import log as logging
from oslo_serialization import jsonutils
@@ -78,10 +75,6 @@ except ImportError:
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
native_socket = patcher.original('socket')
native_threading = patcher.original("threading")
native_Queue = patcher.original("queue")
# This list is for libvirt hypervisor drivers that need special handling.
# This is *not* the complete list of supported hypervisor drivers.
HV_DRIVER_QEMU = "QEMU"
@@ -121,6 +114,205 @@ def _get_loaders():
return _loaders
class LibvirtEventHandler:
def __init__(self, conn_event_handler=None, lifecycle_event_handler=None):
self._lifecycle_event_handler = lifecycle_event_handler
self._conn_event_handler = conn_event_handler
def _queue_event(self, event):
raise NotImplementedError()
def start(self):
raise NotImplementedError()
@classmethod
def create(cls, conn_event_handler=None, lifecycle_event_handler=None):
if utils.concurrency_mode_threading():
return _ThreadingLibvirtEventHandler(
conn_event_handler, lifecycle_event_handler)
else:
return _EventletLibvirtEventHandler(
conn_event_handler, lifecycle_event_handler)
class _EventletLibvirtEventHandler(LibvirtEventHandler):
def __init__(self, conn_event_handler=None, lifecycle_event_handler=None):
super().__init__(conn_event_handler, lifecycle_event_handler)
from eventlet import greenio
from eventlet import patcher
self.native_threading = patcher.original("threading")
self.native_queue = patcher.original("queue")
self._event_thread = None
# This is a Queue between the native libvirt event thread
# and the main thread with the eventlet hub. This needs to be
# a native queue.
self._event_queue: queue.Queue[
virtevent.InstanceEvent | Mapping[str, ty.Any]
] = self.native_queue.Queue()
# Create a self-pipe for the native thread to synchronize on.
#
# This code is taken from the eventlet tpool module, under terms
# of the Apache License v2.0.
rpipe, wpipe = os.pipe()
self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0)
self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0)
def start(self):
"""Initializes the libvirt events subsystem.
This requires running a native thread to provide the
libvirt event loop integration. This forwards events
to a green thread which does the actual dispatching.
"""
libvirt.virEventRegisterDefaultImpl()
LOG.debug("Starting event thread")
self._event_thread = self.native_threading.Thread(
target=self._native_thread)
self._event_thread.daemon = True
self._event_thread.start()
LOG.debug("Starting event dispatch greenthread")
utils.spawn(self._dispatch_thread)
def _queue_event(self, event):
"""Puts an event on the queue for dispatch.
This method is called by the native event thread to
put events on the queue for later dispatch by the
green thread. Any use of logging APIs is forbidden.
"""
if self._event_queue is None:
return
# Queue the event...
self._event_queue.put(event)
# ...then wakeup the green thread to dispatch it
c = ' '.encode()
self._event_notify_send.write(c)
self._event_notify_send.flush()
def _native_thread(self):
"""Receives async events coming in from libvirtd.
This is a native thread which runs the default
libvirt event loop implementation. This processes
any incoming async events from libvirtd and queues
them for later dispatch. This thread is only
permitted to use libvirt python APIs, and the
driver.queue_event method. In particular any use
of logging is forbidden, since it will confuse
eventlet's greenthread integration
"""
while True:
libvirt.virEventRunDefaultImpl()
def _dispatch_thread(self):
"""Dispatches async events coming in from libvirtd.
This is a green thread which waits for events to
arrive from the libvirt event loop thread. This
then dispatches the events to the compute manager.
"""
while True:
self._dispatch_events()
def _dispatch_events(self):
"""Wait for & dispatch events from native thread
Blocks until native thread indicates some events
are ready. Then dispatches all queued events.
"""
# Wait to be notified that there are some
# events pending
try:
_c = self._event_notify_recv.read(1)
assert _c
except ValueError:
return # will be raised when pipe is closed
# Process as many events as possible without
# blocking
last_close_event = None
# required for mypy
if self._event_queue is None:
return
while not self._event_queue.empty():
try:
event: virtevent.InstanceEvent | Mapping[str, ty.Any] = (
self._event_queue.get(block=False))
if issubclass(type(event), virtevent.InstanceEvent):
self._lifecycle_event_handler(event)
elif 'conn' in event and 'reason' in event:
last_close_event = event
except self.native_queue.Empty:
pass
if last_close_event is None:
return
conn = last_close_event['conn']
reason = str(last_close_event['reason'])
msg = _("Connection to libvirt lost: %s") % reason
self._conn_event_handler(conn, False, msg)
class _ThreadingLibvirtEventHandler(LibvirtEventHandler):
def __init__(self, conn_event_handler=None, lifecycle_event_handler=None):
super().__init__(conn_event_handler, lifecycle_event_handler)
self._event_thread = None
self._started = False
def start(self):
"""Initializes the libvirt events subsystem.
This requires running a native thread that pools for libvirt events.
"""
libvirt.virEventRegisterDefaultImpl()
LOG.debug("Starting event thread")
self._event_thread = threading.Thread(target=self._native_thread)
self._event_thread.daemon = True
self._event_thread.start()
def _queue_event(self, event):
"""Puts an event on the queue for dispatch.
In native threading mode instead of queueing we directly dispatch on
the thread that receives the event as we don't have the requirement to
move the event handler to the main thread with the eventlet hub
"""
if issubclass(type(event), virtevent.InstanceEvent):
self._lifecycle_event_handler(event)
elif 'conn' in event and 'reason' in event:
conn = event['conn']
reason = str(event['reason'])
msg = _("Connection to libvirt lost: %s") % reason
self._conn_event_handler(conn, False, msg)
def _native_thread(self):
"""Receives async events coming in from libvirtd.
This is a native thread which runs the default
libvirt event loop implementation. This processes
any incoming async events from libvirtd.
"""
while True:
libvirt.virEventRunDefaultImpl()
class Host(object):
def __init__(self, uri, read_only=False,
@@ -129,7 +321,14 @@ class Host(object):
self._uri = uri
self._read_only = read_only
self._initial_connection = True
self._event_handler = LibvirtEventHandler.create(
self._connection_closed, self._event_emit_delayed)
self._conn_event_handler = conn_event_handler
# This queue is just for the async handling of connection closed
# events. In eventlet mode this only pass events within the main
# native thread with the eventlet hub, so no proxying is needed.
self._conn_event_handler_queue: queue.Queue[
Callable[[], None]
] = queue.Queue()
@@ -141,9 +340,6 @@ class Host(object):
self._wrapped_conn = None
self._wrapped_conn_lock = threading.Lock()
self._event_queue: queue.Queue[
virtevent.InstanceEvent | Mapping[str, ty.Any]
] | None = None
self._events_delayed = {}
# Note(toabctl): During a reboot of a domain, STOPPED and
@@ -195,38 +391,11 @@ class Host(object):
# executing proxied calls in a native thread.
return utils.tpool_wrap(obj, autowrap=self._libvirt_proxy_classes)
def _native_thread(self):
"""Receives async events coming in from libvirtd.
This is a native thread which runs the default
libvirt event loop implementation. This processes
any incoming async events from libvirtd and queues
them for later dispatch. This thread is only
permitted to use libvirt python APIs, and the
driver.queue_event method. In particular any use
of logging is forbidden, since it will confuse
eventlet's greenthread integration
"""
while True:
libvirt.virEventRunDefaultImpl()
def _dispatch_thread(self):
"""Dispatches async events coming in from libvirtd.
This is a green thread which waits for events to
arrive from the libvirt event loop thread. This
then dispatches the events to the compute manager.
"""
while True:
self._dispatch_events()
def _conn_event_thread(self):
"""Dispatches async connection events"""
# NOTE(mdbooth): This thread doesn't need to jump through the same
# hoops as _dispatch_thread because it doesn't interact directly
# with the libvirt native thread.
# hoops as the lifecycle event handling because it doesn't interact
# directly with the libvirt native thread.
while True:
self._dispatch_conn_event()
@@ -248,12 +417,13 @@ class Host(object):
NB: this method is executing in a native thread, not
an eventlet coroutine. It can only invoke other libvirt
APIs, or use self._queue_event(). Any use of logging APIs
in particular is forbidden.
APIs, or use self._event_handler._queue_event(). Any use of logging
APIs in particular is forbidden.
"""
self = opaque
uuid = dom.UUIDString()
self._queue_event(libvirtevent.DeviceRemovedEvent(uuid, dev))
self._event_handler._queue_event(
libvirtevent.DeviceRemovedEvent(uuid, dev))
@staticmethod
def _event_device_removal_failed_callback(conn, dom, dev, opaque):
@@ -261,12 +431,13 @@ class Host(object):
NB: this method is executing in a native thread, not
an eventlet coroutine. It can only invoke other libvirt
APIs, or use self._queue_event(). Any use of logging APIs
in particular is forbidden.
APIs, or use self._event_handler._queue_event(). Any use of logging
APIs in particular is forbidden.
"""
self = opaque
uuid = dom.UUIDString()
self._queue_event(libvirtevent.DeviceRemovalFailedEvent(uuid, dev))
self._event_handler._queue_event(
libvirtevent.DeviceRemovalFailedEvent(uuid, dev))
@staticmethod
def _event_lifecycle_callback(conn, dom, event, detail, opaque):
@@ -274,8 +445,8 @@ class Host(object):
NB: this method is executing in a native thread, not
an eventlet coroutine. It can only invoke other libvirt
APIs, or use self._queue_event(). Any use of logging APIs
in particular is forbidden.
APIs, or use self._event_handler._queue_event(). Any use of logging
APIs in particular is forbidden.
"""
self = opaque
@@ -316,11 +487,12 @@ class Host(object):
transition = virtevent.EVENT_LIFECYCLE_RESUMED
if transition is not None:
self._queue_event(virtevent.LifecycleEvent(uuid, transition))
self._event_handler._queue_event(
virtevent.LifecycleEvent(uuid, transition))
def _close_callback(self, conn, reason, opaque):
close_info = {'conn': conn, 'reason': reason}
self._queue_event(close_info)
self._event_handler._queue_event(close_info)
@staticmethod
def _test_connection(conn):
@@ -359,77 +531,22 @@ class Host(object):
flags = libvirt.VIR_CONNECT_RO
return self._libvirt_proxy.openAuth(uri, auth, flags)
def _queue_event(self, event):
"""Puts an event on the queue for dispatch.
This method is called by the native event thread to
put events on the queue for later dispatch by the
green thread. Any use of logging APIs is forbidden.
"""
if self._event_queue is None:
return
# Queue the event...
self._event_queue.put(event)
# ...then wakeup the green thread to dispatch it
c = ' '.encode()
self._event_notify_send.write(c)
self._event_notify_send.flush()
def _dispatch_events(self):
"""Wait for & dispatch events from native thread
Blocks until native thread indicates some events
are ready. Then dispatches all queued events.
"""
# Wait to be notified that there are some
# events pending
try:
_c = self._event_notify_recv.read(1)
assert _c
except ValueError:
return # will be raised when pipe is closed
# Process as many events as possible without
# blocking
last_close_event = None
# required for mypy
if self._event_queue is None:
return
while not self._event_queue.empty():
try:
event: virtevent.InstanceEvent | Mapping[str, ty.Any] = self._event_queue.get(block=False) # noqa: E501
if issubclass(type(event), virtevent.InstanceEvent):
# call possibly with delay
self._event_emit_delayed(event)
elif 'conn' in event and 'reason' in event:
last_close_event = event
except native_Queue.Empty:
pass
if last_close_event is None:
return
conn = last_close_event['conn']
def _connection_closed(self, conn, *args, **kwargs):
# get_new_connection may already have disabled the host,
# in which case _wrapped_conn is None.
with self._wrapped_conn_lock:
if conn == self._wrapped_conn:
reason = str(last_close_event['reason'])
msg = _("Connection to libvirt lost: %s") % reason
self._wrapped_conn = None
self._queue_conn_event_handler(False, msg)
self._queue_conn_event_handler(*args, **kwargs)
def _event_emit_delayed(self, event):
"""Emit events - possibly delayed."""
def event_cleanup(gt, *args, **kwargs):
def event_cleanup(event):
"""Callback function for greenthread. Called
to cleanup the _events_delayed dictionary when an event
was called.
"""
event = args[0]
self._events_delayed.pop(event.uuid, None)
# Cleanup possible delayed stop events.
@@ -442,12 +559,12 @@ class Host(object):
event.transition == virtevent.EVENT_LIFECYCLE_STOPPED):
# Delay STOPPED event, as they may be followed by a STARTED
# event in case the instance is rebooting
id_ = greenthread.spawn_after(self._lifecycle_delay,
self._event_emit, event)
id_ = utils.spawn_after(
self._lifecycle_delay, self._event_emit, event)
self._events_delayed[event.uuid] = id_
# add callback to cleanup self._events_delayed dict after
# event was called
id_.link(event_cleanup, event)
id_.add_done_callback(lambda _: event_cleanup(event))
else:
self._event_emit(event)
@@ -455,37 +572,14 @@ class Host(object):
if self._lifecycle_event_handler is not None:
self._lifecycle_event_handler(event)
def _init_events_pipe(self):
"""Create a self-pipe for the native thread to synchronize on.
This code is taken from the eventlet tpool module, under terms
of the Apache License v2.0.
"""
self._event_queue = native_Queue.Queue()
rpipe, wpipe = os.pipe()
self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0)
self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0)
def _init_events(self):
"""Initializes the libvirt events subsystem.
This requires running a native thread to provide the
libvirt event loop integration. This forwards events
to a green thread which does the actual dispatching.
"""
self._event_handler.start()
self._init_events_pipe()
LOG.debug("Starting native event thread")
self._event_thread = native_threading.Thread(
target=self._native_thread)
self._event_thread.daemon = True
self._event_thread.start()
LOG.debug("Starting green dispatch thread")
utils.spawn(self._dispatch_thread)
# This thread is just for async connection closed event handling.
# In eventlet mode it only handles tasks within the main thread with
# the eventlet hub.
LOG.debug("Starting connection event dispatch thread")
utils.spawn(self._conn_event_thread)
@@ -605,7 +699,6 @@ class Host(object):
# connection is used for the first time. Otherwise, the
# handler does not get registered.
libvirt.registerErrorHandler(self._libvirt_error_handler, None)
libvirt.virEventRegisterDefaultImpl()
self._init_events()
self._initialized = True