Merge "Replace eventlet sleep with time.sleep"

This commit is contained in:
Zuul
2025-04-28 10:17:55 +00:00
committed by Gerrit Code Review
6 changed files with 51 additions and 30 deletions
+3 -3
View File
@@ -40,7 +40,6 @@ import typing as ty
from cinderclient import exceptions as cinder_exception from cinderclient import exceptions as cinder_exception
from cursive import exception as cursive_exception from cursive import exception as cursive_exception
import eventlet.event import eventlet.event
from eventlet import greenthread
import eventlet.semaphore import eventlet.semaphore
import eventlet.timeout import eventlet.timeout
import futurist import futurist
@@ -1824,7 +1823,8 @@ class ComputeManager(manager.Manager):
{'vol_id': vol_id, {'vol_id': vol_id,
'vol_status': volume_status}) 'vol_status': volume_status})
break break
greenthread.sleep(CONF.block_device_allocate_retries_interval) time.sleep(
CONF.block_device_allocate_retries_interval)
raise exception.VolumeNotCreated(volume_id=vol_id, raise exception.VolumeNotCreated(volume_id=vol_id,
seconds=int(time.time() - start), seconds=int(time.time() - start),
attempts=attempt, attempts=attempt,
@@ -10849,7 +10849,7 @@ class ComputeManager(manager.Manager):
"""Updates the volume usage cache table with a list of stats.""" """Updates the volume usage cache table with a list of stats."""
for usage in vol_usages: for usage in vol_usages:
# Allow switching of greenthreads between queries. # Allow switching of greenthreads between queries.
greenthread.sleep(0) utils.cooperative_yield()
vol_usage = objects.VolumeUsage(context) vol_usage = objects.VolumeUsage(context)
vol_usage.volume_id = usage['volume'] vol_usage.volume_id = usage['volume']
vol_usage.instance_uuid = usage['instance'].uuid vol_usage.instance_uuid = usage['instance'].uuid
+3 -2
View File
@@ -16,10 +16,11 @@
import re import re
from eventlet import greenthread
from oslo_concurrency import processutils from oslo_concurrency import processutils
from oslo_log import log as logging from oslo_log import log as logging
from nova import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_fake_execute_repliers = [] _fake_execute_repliers = []
@@ -96,7 +97,7 @@ def fake_execute(*cmd_parts, **kwargs):
"stderr='%(stderr)s'", {'stdout': reply[0], 'stderr': reply[1]}) "stderr='%(stderr)s'", {'stdout': reply[0], 'stderr': reply[1]})
# Replicate the sleep call in the real function # Replicate the sleep call in the real function
greenthread.sleep(0) utils.cooperative_yield()
return reply return reply
+27 -16
View File
@@ -121,6 +121,13 @@ from nova.virt.libvirt.volume import volume as volume_drivers
CONF = nova.conf.CONF CONF = nova.conf.CONF
_real_sleep = time.sleep
def sleep_zero(*args, **kwargs):
return _real_sleep(0)
_fake_network_info = fake_network.fake_get_instance_nw_info _fake_network_info = fake_network.fake_get_instance_nw_info
# TODO(sean-k-mooney): move the rest of the static data to fake_libvirt_data # TODO(sean-k-mooney): move the rest of the static data to fake_libvirt_data
@@ -565,7 +572,7 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
'name').cache, 'name').cache,
_concurrency, 'fname', None, _concurrency, 'fname', None,
signal=sig1, wait=wait1, done=done1) signal=sig1, wait=wait1, done=done1)
eventlet.sleep(0) utils.cooperative_yield()
# Thread 1 should run before thread 2. # Thread 1 should run before thread 2.
sig1.wait() sig1.wait()
@@ -578,13 +585,13 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
signal=sig2, wait=wait2, done=done2) signal=sig2, wait=wait2, done=done2)
wait2.send() wait2.send()
eventlet.sleep(0) utils.cooperative_yield()
try: try:
self.assertFalse(done2.ready()) self.assertFalse(done2.ready())
finally: finally:
wait1.send() wait1.send()
done1.wait() done1.wait()
eventlet.sleep(0) utils.cooperative_yield()
self.assertTrue(done2.ready()) self.assertTrue(done2.ready())
# Wait on greenthreads to assert they didn't raise exceptions # Wait on greenthreads to assert they didn't raise exceptions
# during execution # during execution
@@ -603,7 +610,7 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
'name').cache, 'name').cache,
_concurrency, 'fname2', None, _concurrency, 'fname2', None,
signal=sig1, wait=wait1, done=done1) signal=sig1, wait=wait1, done=done1)
eventlet.sleep(0) utils.cooperative_yield()
# Thread 1 should run before thread 2. # Thread 1 should run before thread 2.
sig1.wait() sig1.wait()
@@ -614,20 +621,20 @@ class CacheConcurrencyTestCase(test.NoDBTestCase):
'name').cache, 'name').cache,
_concurrency, 'fname1', None, _concurrency, 'fname1', None,
signal=sig2, wait=wait2, done=done2) signal=sig2, wait=wait2, done=done2)
eventlet.sleep(0) utils.cooperative_yield()
# Wait for thread 2 to start. # Wait for thread 2 to start.
sig2.wait() sig2.wait()
wait2.send() wait2.send()
tries = 0 tries = 0
while not done2.ready() and tries < 10: while not done2.ready() and tries < 10:
eventlet.sleep(0) utils.cooperative_yield()
tries += 1 tries += 1
try: try:
self.assertTrue(done2.ready()) self.assertTrue(done2.ready())
finally: finally:
wait1.send() wait1.send()
eventlet.sleep(0) utils.cooperative_yield()
# Wait on greenthreads to assert they didn't raise exceptions # Wait on greenthreads to assert they didn't raise exceptions
# during execution # during execution
thr1.wait() thr1.wait()
@@ -14117,7 +14124,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
@mock.patch.object(libvirt_guest.Guest, "migrate_start_postcopy") @mock.patch.object(libvirt_guest.Guest, "migrate_start_postcopy")
@mock.patch.object(time, "time") @mock.patch.object(time, "time")
@mock.patch.object(time, "sleep", @mock.patch.object(time, "sleep",
side_effect=lambda x: eventlet.sleep(0)) side_effect=lambda x: sleep_zero())
@mock.patch.object(host.Host, "get_connection") @mock.patch.object(host.Host, "get_connection")
@mock.patch.object(libvirt_guest.Guest, "get_job_info") @mock.patch.object(libvirt_guest.Guest, "get_job_info")
@mock.patch.object(objects.Instance, "save") @mock.patch.object(objects.Instance, "save")
@@ -15606,8 +15613,8 @@ class LibvirtConnTestCase(test.NoDBTestCase,
def test_pre_live_migration_volume_backed_encrypted(self): def test_pre_live_migration_volume_backed_encrypted(self):
self._test_pre_live_migration_volume_backed(encrypted_volumes=True) self._test_pre_live_migration_volume_backed(encrypted_volumes=True)
@mock.patch.object(eventlet.greenthread, 'sleep', @mock.patch.object(time, "sleep",
side_effect=eventlet.sleep(0)) side_effect=lambda x: sleep_zero())
@mock.patch.object(libvirt_driver.LibvirtDriver, 'plug_vifs', @mock.patch.object(libvirt_driver.LibvirtDriver, 'plug_vifs',
side_effect=processutils.ProcessExecutionError) side_effect=processutils.ProcessExecutionError)
def test_pre_live_migration_plug_vifs_retry_fails(self, mock_plug, def test_pre_live_migration_plug_vifs_retry_fails(self, mock_plug,
@@ -15631,11 +15638,13 @@ class LibvirtConnTestCase(test.NoDBTestCase,
mock_plug.assert_has_calls([mock.call(instance, [])] * 3) mock_plug.assert_has_calls([mock.call(instance, [])] * 3)
self.assertEqual(3, mock_plug.call_count) self.assertEqual(3, mock_plug.call_count)
# Called 'live_migration_retry_count - 1' times # Called 'live_migration_retry_count - 1' times
mock_sleep.assert_has_calls([mock.call(1)] * 2) calls_with_1 = [
self.assertEqual(2, mock_sleep.call_count) call for call in mock_sleep.call_args_list if call == mock.call(1)]
# Assert there are exactly two such calls
assert len(calls_with_1) == 2
@mock.patch.object(eventlet.greenthread, 'sleep', @mock.patch.object(time, "sleep",
side_effect=eventlet.sleep(0)) side_effect=lambda x: sleep_zero())
@mock.patch.object(libvirt_driver.LibvirtDriver, 'plug_vifs') @mock.patch.object(libvirt_driver.LibvirtDriver, 'plug_vifs')
def test_pre_live_migration_plug_vifs_retry_works(self, mock_plug, def test_pre_live_migration_plug_vifs_retry_works(self, mock_plug,
mock_sleep): mock_sleep):
@@ -15659,8 +15668,10 @@ class LibvirtConnTestCase(test.NoDBTestCase,
mock_plug.assert_has_calls([mock.call(instance, [])] * 3) mock_plug.assert_has_calls([mock.call(instance, [])] * 3)
self.assertEqual(3, mock_plug.call_count) self.assertEqual(3, mock_plug.call_count)
# Called 2 times because the third 'plug_vifs' call is successful. # Called 2 times because the third 'plug_vifs' call is successful.
mock_sleep.assert_has_calls([mock.call(1)] * 2) calls_with_1 = [
self.assertEqual(2, mock_sleep.call_count) call for call in mock_sleep.call_args_list if call == mock.call(1)]
# Assert there are exactly two such calls
assert len(calls_with_1) == 2
def test_pre_live_migration_plug_vifs_with_dest_port_bindings(self): def test_pre_live_migration_plug_vifs_with_dest_port_bindings(self):
"""Tests that we use the LibvirtLiveMigrateData.vifs destination host """Tests that we use the LibvirtLiveMigrateData.vifs destination host
+4 -3
View File
@@ -35,6 +35,7 @@ from nova import test
from nova.tests import fixtures as nova_fixtures from nova.tests import fixtures as nova_fixtures
from nova.tests.fixtures import libvirt as fakelibvirt from nova.tests.fixtures import libvirt as fakelibvirt
from nova.tests.fixtures import libvirt_data as fake_libvirt_data from nova.tests.fixtures import libvirt_data as fake_libvirt_data
from nova import utils
from nova.virt import event from nova.virt import event
from nova.virt.libvirt import config as vconfig from nova.virt.libvirt import config as vconfig
from nova.virt.libvirt import event as libvirtevent from nova.virt.libvirt import event as libvirtevent
@@ -350,7 +351,7 @@ class HostTestCase(test.NoDBTestCase):
def connect_with_block(*a, **k): def connect_with_block(*a, **k):
# enough to allow another connect to run # enough to allow another connect to run
eventlet.sleep(0) utils.cooperative_yield()
self.connect_calls += 1 self.connect_calls += 1
return fakelibvirt.openAuth("qemu:///system", return fakelibvirt.openAuth("qemu:///system",
[[], lambda: 1, None], 0) [[], lambda: 1, None], 0)
@@ -378,7 +379,7 @@ class HostTestCase(test.NoDBTestCase):
def connect_with_block(*a, **k): def connect_with_block(*a, **k):
# enough to allow another connect to run # enough to allow another connect to run
eventlet.sleep(0) utils.cooperative_yield()
self.connect_calls += 1 self.connect_calls += 1
return fakelibvirt.openAuth("qemu:///system", return fakelibvirt.openAuth("qemu:///system",
[[], lambda: 1, None], 0) [[], lambda: 1, None], 0)
@@ -397,7 +398,7 @@ class HostTestCase(test.NoDBTestCase):
thr2 = eventlet.spawn(get_conn_currency, self.host) thr2 = eventlet.spawn(get_conn_currency, self.host)
# let threads run # let threads run
eventlet.sleep(0) utils.cooperative_yield()
thr1.wait() thr1.wait()
thr2.wait() thr2.wait()
+9
View File
@@ -27,6 +27,7 @@ import random
import re import re
import shutil import shutil
import tempfile import tempfile
import time
import eventlet import eventlet
from eventlet import tpool from eventlet import tpool
@@ -81,6 +82,14 @@ _SERVICE_TYPES = service_types.ServiceTypes()
DEFAULT_GREEN_POOL = None DEFAULT_GREEN_POOL = None
# TODO(ksambor) Make this a no-op in threading mode once a
# threading-compatible service is available.Also, remove all
# cooperative_yield calls after dropping Eventlet support.
def cooperative_yield():
time.sleep(0)
def _get_default_green_pool(): def _get_default_green_pool():
global DEFAULT_GREEN_POOL global DEFAULT_GREEN_POOL
+5 -6
View File
@@ -49,7 +49,6 @@ import uuid
from castellan import key_manager from castellan import key_manager
from copy import deepcopy from copy import deepcopy
import eventlet import eventlet
from eventlet import greenthread
from eventlet import tpool from eventlet import tpool
from lxml import etree from lxml import etree
from os_brick import encryptors from os_brick import encryptors
@@ -4114,7 +4113,7 @@ class LibvirtDriver(driver.ComputeDriver):
LOG.info("Instance may have been rebooted during soft " LOG.info("Instance may have been rebooted during soft "
"reboot, so return now.", instance=instance) "reboot, so return now.", instance=instance)
return True return True
greenthread.sleep(1) time.sleep(1)
return False return False
def _hard_reboot(self, context, instance, network_info, share_info, def _hard_reboot(self, context, instance, network_info, share_info,
@@ -8401,7 +8400,7 @@ class LibvirtDriver(driver.ComputeDriver):
except libvirt.libvirtError: except libvirt.libvirtError:
total += 1 total += 1
# NOTE(gtt116): give other tasks a chance. # NOTE(gtt116): give other tasks a chance.
greenthread.sleep(0) utils.cooperative_yield()
return total return total
def _get_supported_vgpu_types(self): def _get_supported_vgpu_types(self):
@@ -11516,7 +11515,7 @@ class LibvirtDriver(driver.ComputeDriver):
'%(max_retry)d.', '%(max_retry)d.',
{'cnt': cnt, 'max_retry': max_retry}, {'cnt': cnt, 'max_retry': max_retry},
instance=instance) instance=instance)
greenthread.sleep(1) time.sleep(1)
def pre_live_migration(self, context, instance, block_device_info, def pre_live_migration(self, context, instance, block_device_info,
network_info, disk_info, migrate_data): network_info, disk_info, migrate_data):
@@ -11832,7 +11831,7 @@ class LibvirtDriver(driver.ComputeDriver):
# Only use announce_pause after the first attempt to avoid # Only use announce_pause after the first attempt to avoid
# pausing before calling announce_self for the first attempt # pausing before calling announce_self for the first attempt
if current_attempt != 1: if current_attempt != 1:
greenthread.sleep(announce_pause) time.sleep(announce_pause)
LOG.info('Sending announce-self command to QEMU monitor. ' LOG.info('Sending announce-self command to QEMU monitor. '
'Attempt %(current_attempt)s of %(max_attempts)s', 'Attempt %(current_attempt)s of %(max_attempts)s',
@@ -12097,7 +12096,7 @@ class LibvirtDriver(driver.ComputeDriver):
{'i_name': guest.name, 'thing': thing, 'error': e}) {'i_name': guest.name, 'thing': thing, 'error': e})
# NOTE(gtt116): give other tasks a chance. # NOTE(gtt116): give other tasks a chance.
greenthread.sleep(0) utils.cooperative_yield()
return disk_over_committed_size return disk_over_committed_size
def get_available_nodes(self, refresh=False): def get_available_nodes(self, refresh=False):