Fix volume IO usage notifications been sent too often.
Each compute node was sending out volume IO usage notifications for all the volumes in use in the system. This include volumes hosted on the other compute nodes duplicating the events. Fix this by getting the conductor to send out the IO usage notification event immediately after it updates the volume usage cache table, and only for the effected volume. Fixes: bug 1182102 Change-Id: Id3216caa482110bdec2b7f5ea13050112e4da009
This commit is contained in:
@@ -3724,19 +3724,6 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
usage['instance'],
|
||||
last_refreshed=refreshed)
|
||||
|
||||
def _send_volume_usage_notifications(self, context, start_time):
|
||||
"""Queries vol usage cache table and sends a vol usage notification."""
|
||||
# We might have had a quick attach/detach that we missed in
|
||||
# the last run of get_all_volume_usage and this one
|
||||
# but detach stats will be recorded in db and returned from
|
||||
# vol_get_usage_by_time
|
||||
vol_usages = self.conductor_api.vol_get_usage_by_time(context,
|
||||
start_time)
|
||||
for vol_usage in vol_usages:
|
||||
notifier.notify(context, 'volume.%s' % self.host, 'volume.usage',
|
||||
notifier.INFO,
|
||||
compute_utils.usage_volume_info(vol_usage))
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _poll_volume_usage(self, context, start_time=None):
|
||||
if CONF.volume_usage_poll_interval == 0:
|
||||
@@ -3765,8 +3752,6 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
refreshed = timeutils.utcnow()
|
||||
self._update_volume_usage_cache(context, vol_usages, refreshed)
|
||||
|
||||
self._send_volume_usage_notifications(context, start_time)
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _report_driver_status(self, context):
|
||||
curr_time = time.time()
|
||||
|
||||
@@ -22,8 +22,10 @@ from nova import manager
|
||||
from nova import network
|
||||
from nova.network.security_group import openstack_driver
|
||||
from nova import notifications
|
||||
from nova.openstack.common.db.sqlalchemy import session as db_session
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common.notifier import api as notifier
|
||||
from nova.openstack.common.rpc import common as rpc_common
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import quota
|
||||
@@ -322,6 +324,7 @@ class ConductorManager(manager.Manager):
|
||||
result = self.db.instance_fault_create(context, values)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
# NOTE(kerrin): This method can be removed in v2.0 of the RPC API.
|
||||
def vol_get_usage_by_time(self, context, start_time):
|
||||
result = self.db.vol_get_usage_by_time(context, start_time)
|
||||
return jsonutils.to_primitive(result)
|
||||
@@ -329,11 +332,23 @@ class ConductorManager(manager.Manager):
|
||||
def vol_usage_update(self, context, vol_id, rd_req, rd_bytes, wr_req,
|
||||
wr_bytes, instance, last_refreshed=None,
|
||||
update_totals=False):
|
||||
self.db.vol_usage_update(context, vol_id, rd_req, rd_bytes, wr_req,
|
||||
wr_bytes, instance['uuid'],
|
||||
instance['project_id'], instance['user_id'],
|
||||
instance['availability_zone'],
|
||||
last_refreshed, update_totals)
|
||||
# The session object is needed here, as the vol_usage object returned
|
||||
# needs to bound to it in order to refresh its data
|
||||
session = db_session.get_session()
|
||||
vol_usage = self.db.vol_usage_update(context, vol_id,
|
||||
rd_req, rd_bytes,
|
||||
wr_req, wr_bytes,
|
||||
instance['uuid'],
|
||||
instance['project_id'],
|
||||
instance['user_id'],
|
||||
instance['availability_zone'],
|
||||
last_refreshed, update_totals,
|
||||
session)
|
||||
|
||||
# We have just updated the database, so send the notification now
|
||||
notifier.notify(context, 'conductor.%s' % self.host, 'volume.usage',
|
||||
notifier.INFO,
|
||||
compute_utils.usage_volume_info(vol_usage))
|
||||
|
||||
@rpc_common.client_exceptions(exception.ComputeHostNotFound,
|
||||
exception.HostBinaryNotFound)
|
||||
|
||||
+4
-2
@@ -1469,14 +1469,16 @@ def vol_get_usage_by_time(context, begin):
|
||||
|
||||
def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
instance_id, project_id, user_id, availability_zone,
|
||||
last_refreshed=None, update_totals=False):
|
||||
last_refreshed=None, update_totals=False,
|
||||
session=None):
|
||||
"""Update cached volume usage for a volume
|
||||
Creates new record if needed."""
|
||||
return IMPL.vol_usage_update(context, id, rd_req, rd_bytes, wr_req,
|
||||
wr_bytes, instance_id, project_id, user_id,
|
||||
availability_zone,
|
||||
last_refreshed=last_refreshed,
|
||||
update_totals=update_totals)
|
||||
update_totals=update_totals,
|
||||
session=session)
|
||||
|
||||
|
||||
###################
|
||||
|
||||
@@ -4456,7 +4456,7 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
current_usage['curr_write_bytes'] + wr_bytes)
|
||||
|
||||
current_usage.update(values)
|
||||
return
|
||||
return current_usage
|
||||
|
||||
vol_usage = models.VolumeUsage()
|
||||
vol_usage.tot_last_refreshed = timeutils.utcnow()
|
||||
@@ -4480,6 +4480,8 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
|
||||
|
||||
vol_usage.save(session=session)
|
||||
|
||||
return vol_usage
|
||||
|
||||
|
||||
####################
|
||||
|
||||
|
||||
@@ -386,8 +386,6 @@ class ComputeVolumeTestCase(BaseTestCase):
|
||||
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
|
||||
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
|
||||
self.mox.StubOutWithMock(self.compute.driver, 'get_all_volume_usage')
|
||||
self.mox.StubOutWithMock(self.compute,
|
||||
'_send_volume_usage_notifications')
|
||||
self.mox.StubOutWithMock(time, 'time')
|
||||
# Following methods will be called.
|
||||
utils.last_completed_audit_period().AndReturn((0, 0))
|
||||
@@ -405,8 +403,6 @@ class ComputeVolumeTestCase(BaseTestCase):
|
||||
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
|
||||
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
|
||||
self.mox.StubOutWithMock(self.compute.driver, 'get_all_volume_usage')
|
||||
self.mox.StubOutWithMock(self.compute,
|
||||
'_send_volume_usage_notifications')
|
||||
# Following methods are called.
|
||||
utils.last_completed_audit_period().AndReturn((0, 0))
|
||||
self.compute._get_host_volume_bdms(ctxt, 'MockHost').AndReturn([])
|
||||
@@ -425,8 +421,6 @@ class ComputeVolumeTestCase(BaseTestCase):
|
||||
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
|
||||
self.mox.StubOutWithMock(timeutils, 'utcnow')
|
||||
self.mox.StubOutWithMock(self.compute, '_update_volume_usage_cache')
|
||||
self.mox.StubOutWithMock(self.compute,
|
||||
'_send_volume_usage_notifications')
|
||||
self.stubs.Set(self.compute.driver, 'get_all_volume_usage',
|
||||
lambda x, y: [3, 4])
|
||||
# All the mocks are called
|
||||
@@ -434,7 +428,6 @@ class ComputeVolumeTestCase(BaseTestCase):
|
||||
self.compute._get_host_volume_bdms(ctxt, 'MockHost').AndReturn([1, 2])
|
||||
timeutils.utcnow().AndReturn(5)
|
||||
self.compute._update_volume_usage_cache(ctxt, [3, 4], 5)
|
||||
self.compute._send_volume_usage_notifications(ctxt, 20)
|
||||
self.mox.ReplayAll()
|
||||
CONF.volume_usage_poll_interval = 10
|
||||
self.compute._last_vol_usage_poll = 0
|
||||
@@ -444,48 +437,6 @@ class ComputeVolumeTestCase(BaseTestCase):
|
||||
self.compute._last_vol_usage_poll)
|
||||
self.mox.UnsetStubs()
|
||||
|
||||
def test_send_volume_usage_notifications(self):
|
||||
ctxt = 'MockContext'
|
||||
test_notifier.NOTIFICATIONS = []
|
||||
self.compute.host = 'MockHost'
|
||||
fake_usage = {'tot_last_refreshed': 20,
|
||||
'curr_last_refreshed': 10,
|
||||
'volume_id': 'fake_volume_id',
|
||||
'instance_uuid': 'fake_instance_uuid',
|
||||
'project_id': 'fake_project_id',
|
||||
'user_id': 'fake_user_id',
|
||||
'availability_zone': 'fake-az',
|
||||
'tot_reads': 11,
|
||||
'curr_reads': 22,
|
||||
'tot_read_bytes': 33,
|
||||
'curr_read_bytes': 44,
|
||||
'tot_writes': 55,
|
||||
'curr_writes': 66,
|
||||
'tot_write_bytes': 77,
|
||||
'curr_write_bytes': 88}
|
||||
|
||||
self.stubs.Set(self.compute.conductor_api,
|
||||
'vol_get_usage_by_time',
|
||||
lambda x, y:
|
||||
[db.sqlalchemy.models.VolumeUsage(**fake_usage)])
|
||||
self.stubs.Set(self.compute.conductor_api,
|
||||
'instance_get_all_by_filters',
|
||||
lambda x, y: [{'project_id': 'fake_project_id',
|
||||
'user_id': 'fake_user_id'}])
|
||||
|
||||
self.compute._send_volume_usage_notifications(ctxt, 20)
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['instance_id'], 'fake_instance_uuid')
|
||||
self.assertEquals(payload['user_id'], 'fake_user_id')
|
||||
self.assertEquals(payload['tenant_id'], 'fake_project_id')
|
||||
self.assertEquals(payload['reads'], 33)
|
||||
self.assertEquals(payload['read_bytes'], 77)
|
||||
self.assertEquals(payload['writes'], 121)
|
||||
self.assertEquals(payload['write_bytes'], 165)
|
||||
self.assertEquals(payload['availability_zone'], 'fake-az')
|
||||
|
||||
def test_detach_volume_usage(self):
|
||||
# Test that detach volume update the volume usage cache table correctly
|
||||
instance = self._create_fake_instance()
|
||||
@@ -526,8 +477,26 @@ class ComputeVolumeTestCase(BaseTestCase):
|
||||
# total fields in the volume usage cache.
|
||||
CONF.volume_usage_poll_interval = 10
|
||||
self.compute._poll_volume_usage(self.context)
|
||||
# Check that a volume.usage notification was sent
|
||||
self.assertEqual(1, len(test_notifier.NOTIFICATIONS))
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
|
||||
self.compute.detach_volume(self.context, 1, instance)
|
||||
|
||||
# Check that a volume.usage notification was sent
|
||||
self.assertEquals(2, len(test_notifier.NOTIFICATIONS))
|
||||
msg = test_notifier.NOTIFICATIONS[1]
|
||||
self.assertEquals('volume.usage', msg['event_type'])
|
||||
payload = msg['payload']
|
||||
self.assertEquals(instance['uuid'], payload['instance_id'])
|
||||
self.assertEquals('fake', payload['user_id'])
|
||||
self.assertEquals('fake', payload['tenant_id'])
|
||||
self.assertEquals(1, payload['reads'])
|
||||
self.assertEquals(30, payload['read_bytes'])
|
||||
self.assertEquals(1, payload['writes'])
|
||||
self.assertEquals(20, payload['write_bytes'])
|
||||
self.assertEquals(None, payload['availability_zone'])
|
||||
|
||||
# Check the database for the
|
||||
volume_usages = db.vol_get_usage_by_time(self.context, 0)
|
||||
self.assertEqual(1, len(volume_usages))
|
||||
|
||||
@@ -30,6 +30,8 @@ from nova.db.sqlalchemy import models
|
||||
from nova import exception as exc
|
||||
from nova import notifications
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common.notifier import api as notifier_api
|
||||
from nova.openstack.common.notifier import test_notifier
|
||||
from nova.openstack.common.rpc import common as rpc_common
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import quota
|
||||
@@ -55,6 +57,11 @@ class _BaseTestCase(object):
|
||||
self.project_id = 'fake'
|
||||
self.context = FakeContext(self.user_id, self.project_id)
|
||||
|
||||
notifier_api._reset_drivers()
|
||||
self.addCleanup(notifier_api._reset_drivers)
|
||||
self.flags(notification_driver=[test_notifier.__name__])
|
||||
test_notifier.NOTIFICATIONS = []
|
||||
|
||||
def stub_out_client_exceptions(self):
|
||||
def passthru(exceptions, func, *args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
@@ -368,20 +375,50 @@ class _BaseTestCase(object):
|
||||
self.assertEqual(result, 'fake-usage')
|
||||
|
||||
def test_vol_usage_update(self):
|
||||
# the vol_usage_update method sends the volume usage notifications
|
||||
# as well as updating the database
|
||||
self.mox.StubOutWithMock(db, 'vol_usage_update')
|
||||
inst = self._create_fake_instance({
|
||||
'project_id': 'fake-project_id',
|
||||
'user_id': 'fake-user_id',
|
||||
})
|
||||
fake_usage = {'tot_last_refreshed': 20,
|
||||
'curr_last_refreshed': 10,
|
||||
'volume_id': 'fake-vol',
|
||||
'instance_uuid': inst['uuid'],
|
||||
'project_id': 'fake-project_id',
|
||||
'user_id': 'fake-user_id',
|
||||
'availability_zone': 'fake-az',
|
||||
'tot_reads': 11,
|
||||
'curr_reads': 22,
|
||||
'tot_read_bytes': 33,
|
||||
'curr_read_bytes': 44,
|
||||
'tot_writes': 55,
|
||||
'curr_writes': 66,
|
||||
'tot_write_bytes': 77,
|
||||
'curr_write_bytes': 88}
|
||||
db.vol_usage_update(self.context, 'fake-vol', 'rd-req', 'rd-bytes',
|
||||
'wr-req', 'wr-bytes', inst['uuid'],
|
||||
'fake-project_id', 'fake-user_id', 'fake-az',
|
||||
'fake-refr', 'fake-bool')
|
||||
'fake-refr', 'fake-bool', mox.IgnoreArg()).\
|
||||
AndReturn(fake_usage)
|
||||
self.mox.ReplayAll()
|
||||
self.conductor.vol_usage_update(self.context, 'fake-vol', 'rd-req',
|
||||
'rd-bytes', 'wr-req', 'wr-bytes',
|
||||
inst, 'fake-refr', 'fake-bool')
|
||||
|
||||
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
|
||||
msg = test_notifier.NOTIFICATIONS[0]
|
||||
payload = msg['payload']
|
||||
self.assertEquals(payload['instance_id'], inst['uuid'])
|
||||
self.assertEquals(payload['user_id'], 'fake-user_id')
|
||||
self.assertEquals(payload['tenant_id'], 'fake-project_id')
|
||||
self.assertEquals(payload['reads'], 33)
|
||||
self.assertEquals(payload['read_bytes'], 77)
|
||||
self.assertEquals(payload['writes'], 121)
|
||||
self.assertEquals(payload['write_bytes'], 165)
|
||||
self.assertEquals(payload['availability_zone'], 'fake-az')
|
||||
|
||||
def test_compute_node_create(self):
|
||||
self.mox.StubOutWithMock(db, 'compute_node_create')
|
||||
db.compute_node_create(self.context, 'fake-values').AndReturn(
|
||||
|
||||
Reference in New Issue
Block a user