Fix volume IO usage notifications been sent too often.

Each compute node was sending out volume IO usage notifications
for all the volumes in use in the system. This include volumes
hosted on the other compute nodes duplicating the events.

Fix this by getting the conductor to send out the IO usage
notification event immediately after it updates the volume usage
cache table, and only for the effected volume.

Fixes: bug 1182102

Change-Id: Id3216caa482110bdec2b7f5ea13050112e4da009
This commit is contained in:
Michael Kerrin
2013-05-21 07:58:27 +00:00
parent db56c590e8
commit 3cf4cb2534
6 changed files with 83 additions and 73 deletions
-15
View File
@@ -3724,19 +3724,6 @@ class ComputeManager(manager.SchedulerDependentManager):
usage['instance'],
last_refreshed=refreshed)
def _send_volume_usage_notifications(self, context, start_time):
"""Queries vol usage cache table and sends a vol usage notification."""
# We might have had a quick attach/detach that we missed in
# the last run of get_all_volume_usage and this one
# but detach stats will be recorded in db and returned from
# vol_get_usage_by_time
vol_usages = self.conductor_api.vol_get_usage_by_time(context,
start_time)
for vol_usage in vol_usages:
notifier.notify(context, 'volume.%s' % self.host, 'volume.usage',
notifier.INFO,
compute_utils.usage_volume_info(vol_usage))
@periodic_task.periodic_task
def _poll_volume_usage(self, context, start_time=None):
if CONF.volume_usage_poll_interval == 0:
@@ -3765,8 +3752,6 @@ class ComputeManager(manager.SchedulerDependentManager):
refreshed = timeutils.utcnow()
self._update_volume_usage_cache(context, vol_usages, refreshed)
self._send_volume_usage_notifications(context, start_time)
@periodic_task.periodic_task
def _report_driver_status(self, context):
curr_time = time.time()
+20 -5
View File
@@ -22,8 +22,10 @@ from nova import manager
from nova import network
from nova.network.security_group import openstack_driver
from nova import notifications
from nova.openstack.common.db.sqlalchemy import session as db_session
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
from nova import quota
@@ -322,6 +324,7 @@ class ConductorManager(manager.Manager):
result = self.db.instance_fault_create(context, values)
return jsonutils.to_primitive(result)
# NOTE(kerrin): This method can be removed in v2.0 of the RPC API.
def vol_get_usage_by_time(self, context, start_time):
result = self.db.vol_get_usage_by_time(context, start_time)
return jsonutils.to_primitive(result)
@@ -329,11 +332,23 @@ class ConductorManager(manager.Manager):
def vol_usage_update(self, context, vol_id, rd_req, rd_bytes, wr_req,
wr_bytes, instance, last_refreshed=None,
update_totals=False):
self.db.vol_usage_update(context, vol_id, rd_req, rd_bytes, wr_req,
wr_bytes, instance['uuid'],
instance['project_id'], instance['user_id'],
instance['availability_zone'],
last_refreshed, update_totals)
# The session object is needed here, as the vol_usage object returned
# needs to bound to it in order to refresh its data
session = db_session.get_session()
vol_usage = self.db.vol_usage_update(context, vol_id,
rd_req, rd_bytes,
wr_req, wr_bytes,
instance['uuid'],
instance['project_id'],
instance['user_id'],
instance['availability_zone'],
last_refreshed, update_totals,
session)
# We have just updated the database, so send the notification now
notifier.notify(context, 'conductor.%s' % self.host, 'volume.usage',
notifier.INFO,
compute_utils.usage_volume_info(vol_usage))
@rpc_common.client_exceptions(exception.ComputeHostNotFound,
exception.HostBinaryNotFound)
+4 -2
View File
@@ -1469,14 +1469,16 @@ def vol_get_usage_by_time(context, begin):
def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
instance_id, project_id, user_id, availability_zone,
last_refreshed=None, update_totals=False):
last_refreshed=None, update_totals=False,
session=None):
"""Update cached volume usage for a volume
Creates new record if needed."""
return IMPL.vol_usage_update(context, id, rd_req, rd_bytes, wr_req,
wr_bytes, instance_id, project_id, user_id,
availability_zone,
last_refreshed=last_refreshed,
update_totals=update_totals)
update_totals=update_totals,
session=session)
###################
+3 -1
View File
@@ -4456,7 +4456,7 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
current_usage['curr_write_bytes'] + wr_bytes)
current_usage.update(values)
return
return current_usage
vol_usage = models.VolumeUsage()
vol_usage.tot_last_refreshed = timeutils.utcnow()
@@ -4480,6 +4480,8 @@ def vol_usage_update(context, id, rd_req, rd_bytes, wr_req, wr_bytes,
vol_usage.save(session=session)
return vol_usage
####################
+18 -49
View File
@@ -386,8 +386,6 @@ class ComputeVolumeTestCase(BaseTestCase):
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
self.mox.StubOutWithMock(self.compute.driver, 'get_all_volume_usage')
self.mox.StubOutWithMock(self.compute,
'_send_volume_usage_notifications')
self.mox.StubOutWithMock(time, 'time')
# Following methods will be called.
utils.last_completed_audit_period().AndReturn((0, 0))
@@ -405,8 +403,6 @@ class ComputeVolumeTestCase(BaseTestCase):
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
self.mox.StubOutWithMock(utils, 'last_completed_audit_period')
self.mox.StubOutWithMock(self.compute.driver, 'get_all_volume_usage')
self.mox.StubOutWithMock(self.compute,
'_send_volume_usage_notifications')
# Following methods are called.
utils.last_completed_audit_period().AndReturn((0, 0))
self.compute._get_host_volume_bdms(ctxt, 'MockHost').AndReturn([])
@@ -425,8 +421,6 @@ class ComputeVolumeTestCase(BaseTestCase):
self.mox.StubOutWithMock(self.compute, '_get_host_volume_bdms')
self.mox.StubOutWithMock(timeutils, 'utcnow')
self.mox.StubOutWithMock(self.compute, '_update_volume_usage_cache')
self.mox.StubOutWithMock(self.compute,
'_send_volume_usage_notifications')
self.stubs.Set(self.compute.driver, 'get_all_volume_usage',
lambda x, y: [3, 4])
# All the mocks are called
@@ -434,7 +428,6 @@ class ComputeVolumeTestCase(BaseTestCase):
self.compute._get_host_volume_bdms(ctxt, 'MockHost').AndReturn([1, 2])
timeutils.utcnow().AndReturn(5)
self.compute._update_volume_usage_cache(ctxt, [3, 4], 5)
self.compute._send_volume_usage_notifications(ctxt, 20)
self.mox.ReplayAll()
CONF.volume_usage_poll_interval = 10
self.compute._last_vol_usage_poll = 0
@@ -444,48 +437,6 @@ class ComputeVolumeTestCase(BaseTestCase):
self.compute._last_vol_usage_poll)
self.mox.UnsetStubs()
def test_send_volume_usage_notifications(self):
ctxt = 'MockContext'
test_notifier.NOTIFICATIONS = []
self.compute.host = 'MockHost'
fake_usage = {'tot_last_refreshed': 20,
'curr_last_refreshed': 10,
'volume_id': 'fake_volume_id',
'instance_uuid': 'fake_instance_uuid',
'project_id': 'fake_project_id',
'user_id': 'fake_user_id',
'availability_zone': 'fake-az',
'tot_reads': 11,
'curr_reads': 22,
'tot_read_bytes': 33,
'curr_read_bytes': 44,
'tot_writes': 55,
'curr_writes': 66,
'tot_write_bytes': 77,
'curr_write_bytes': 88}
self.stubs.Set(self.compute.conductor_api,
'vol_get_usage_by_time',
lambda x, y:
[db.sqlalchemy.models.VolumeUsage(**fake_usage)])
self.stubs.Set(self.compute.conductor_api,
'instance_get_all_by_filters',
lambda x, y: [{'project_id': 'fake_project_id',
'user_id': 'fake_user_id'}])
self.compute._send_volume_usage_notifications(ctxt, 20)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
payload = msg['payload']
self.assertEquals(payload['instance_id'], 'fake_instance_uuid')
self.assertEquals(payload['user_id'], 'fake_user_id')
self.assertEquals(payload['tenant_id'], 'fake_project_id')
self.assertEquals(payload['reads'], 33)
self.assertEquals(payload['read_bytes'], 77)
self.assertEquals(payload['writes'], 121)
self.assertEquals(payload['write_bytes'], 165)
self.assertEquals(payload['availability_zone'], 'fake-az')
def test_detach_volume_usage(self):
# Test that detach volume update the volume usage cache table correctly
instance = self._create_fake_instance()
@@ -526,8 +477,26 @@ class ComputeVolumeTestCase(BaseTestCase):
# total fields in the volume usage cache.
CONF.volume_usage_poll_interval = 10
self.compute._poll_volume_usage(self.context)
# Check that a volume.usage notification was sent
self.assertEqual(1, len(test_notifier.NOTIFICATIONS))
msg = test_notifier.NOTIFICATIONS[0]
self.compute.detach_volume(self.context, 1, instance)
# Check that a volume.usage notification was sent
self.assertEquals(2, len(test_notifier.NOTIFICATIONS))
msg = test_notifier.NOTIFICATIONS[1]
self.assertEquals('volume.usage', msg['event_type'])
payload = msg['payload']
self.assertEquals(instance['uuid'], payload['instance_id'])
self.assertEquals('fake', payload['user_id'])
self.assertEquals('fake', payload['tenant_id'])
self.assertEquals(1, payload['reads'])
self.assertEquals(30, payload['read_bytes'])
self.assertEquals(1, payload['writes'])
self.assertEquals(20, payload['write_bytes'])
self.assertEquals(None, payload['availability_zone'])
# Check the database for the
volume_usages = db.vol_get_usage_by_time(self.context, 0)
self.assertEqual(1, len(volume_usages))
+38 -1
View File
@@ -30,6 +30,8 @@ from nova.db.sqlalchemy import models
from nova import exception as exc
from nova import notifications
from nova.openstack.common import jsonutils
from nova.openstack.common.notifier import api as notifier_api
from nova.openstack.common.notifier import test_notifier
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
from nova import quota
@@ -55,6 +57,11 @@ class _BaseTestCase(object):
self.project_id = 'fake'
self.context = FakeContext(self.user_id, self.project_id)
notifier_api._reset_drivers()
self.addCleanup(notifier_api._reset_drivers)
self.flags(notification_driver=[test_notifier.__name__])
test_notifier.NOTIFICATIONS = []
def stub_out_client_exceptions(self):
def passthru(exceptions, func, *args, **kwargs):
return func(*args, **kwargs)
@@ -368,20 +375,50 @@ class _BaseTestCase(object):
self.assertEqual(result, 'fake-usage')
def test_vol_usage_update(self):
# the vol_usage_update method sends the volume usage notifications
# as well as updating the database
self.mox.StubOutWithMock(db, 'vol_usage_update')
inst = self._create_fake_instance({
'project_id': 'fake-project_id',
'user_id': 'fake-user_id',
})
fake_usage = {'tot_last_refreshed': 20,
'curr_last_refreshed': 10,
'volume_id': 'fake-vol',
'instance_uuid': inst['uuid'],
'project_id': 'fake-project_id',
'user_id': 'fake-user_id',
'availability_zone': 'fake-az',
'tot_reads': 11,
'curr_reads': 22,
'tot_read_bytes': 33,
'curr_read_bytes': 44,
'tot_writes': 55,
'curr_writes': 66,
'tot_write_bytes': 77,
'curr_write_bytes': 88}
db.vol_usage_update(self.context, 'fake-vol', 'rd-req', 'rd-bytes',
'wr-req', 'wr-bytes', inst['uuid'],
'fake-project_id', 'fake-user_id', 'fake-az',
'fake-refr', 'fake-bool')
'fake-refr', 'fake-bool', mox.IgnoreArg()).\
AndReturn(fake_usage)
self.mox.ReplayAll()
self.conductor.vol_usage_update(self.context, 'fake-vol', 'rd-req',
'rd-bytes', 'wr-req', 'wr-bytes',
inst, 'fake-refr', 'fake-bool')
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
payload = msg['payload']
self.assertEquals(payload['instance_id'], inst['uuid'])
self.assertEquals(payload['user_id'], 'fake-user_id')
self.assertEquals(payload['tenant_id'], 'fake-project_id')
self.assertEquals(payload['reads'], 33)
self.assertEquals(payload['read_bytes'], 77)
self.assertEquals(payload['writes'], 121)
self.assertEquals(payload['write_bytes'], 165)
self.assertEquals(payload['availability_zone'], 'fake-az')
def test_compute_node_create(self):
self.mox.StubOutWithMock(db, 'compute_node_create')
db.compute_node_create(self.context, 'fake-values').AndReturn(