Merge "Restructure host filtering to be easier to use."

This commit is contained in:
Jenkins
2011-10-13 00:17:49 +00:00
committed by Gerrit Code Review
12 changed files with 131 additions and 120 deletions
+2 -2
View File
@@ -271,7 +271,7 @@ class AbstractScheduler(driver.Scheduler):
# weigh the selected hosts.
# weighted_hosts = [{weight=weight, hostname=hostname,
# capabilities=capabs}, ...]
weighted_hosts = self.weigh_hosts(topic, request_spec, filtered_hosts)
weighted_hosts = self.weigh_hosts(request_spec, filtered_hosts)
# Next, tack on the host weights from the child zones
json_spec = json.dumps(request_spec)
all_zones = db.zone_get_all(context.elevated())
@@ -306,7 +306,7 @@ class AbstractScheduler(driver.Scheduler):
return [(host, services) for host, services in host_list
if basic_ram_filter(host, services, request_spec)]
def weigh_hosts(self, topic, request_spec, hosts):
def weigh_hosts(self, request_spec, hosts):
"""This version assigns a weight of 1 to all hosts, making selection
of any host basically a random event. Override this method in your
subclass to add logic to prefer one potential host over another.
+20 -7
View File
@@ -38,9 +38,20 @@ class BaseScheduler(abstract_scheduler.AbstractScheduler):
"""
def filter_hosts(self, topic, request_spec, hosts=None):
"""Filter the full host list (from the ZoneManager)"""
filter_name = request_spec.get('filter', None)
# Make sure that the requested filter is legitimate.
selected_filter = host_filter.choose_host_filter(filter_name)
filters = request_spec.get('filter')
if filters is None:
# Not specified; use the default
filters = FLAGS.default_host_filters
if not isinstance(filters, (list, tuple)):
filters = [filters]
if hosts is None:
# Get the full list (only considering 'compute' services)
all_hosts = self.zone_manager.service_states.iteritems()
hosts = [(host, services["compute"])
for host, services in all_hosts
if "compute" in services]
# Make sure that the requested filters are legitimate.
selected_filters = host_filter.choose_host_filters(filters)
# TODO(sandy): We're only using InstanceType-based specs
# currently. Later we'll need to snoop for more detailed
@@ -48,11 +59,13 @@ class BaseScheduler(abstract_scheduler.AbstractScheduler):
instance_type = request_spec.get("instance_type", None)
if instance_type is None:
# No way to select; return the specified hosts
return hosts or []
name, query = selected_filter.instance_type_to_filter(instance_type)
return selected_filter.filter_hosts(self.zone_manager, query)
return hosts
for selected_filter in selected_filters:
query = selected_filter.instance_type_to_filter(instance_type)
hosts = selected_filter.filter_hosts(hosts, query)
return hosts
def weigh_hosts(self, topic, request_spec, hosts):
def weigh_hosts(self, request_spec, hosts):
"""Derived classes may override this to provide more sophisticated
scheduling objectives
"""
+1 -6
View File
@@ -15,11 +15,6 @@
import nova.scheduler
from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('default_host_filter', 'AllHostsFilter',
'Which filter to use for filtering hosts')
class AbstractHostFilter(object):
@@ -28,7 +23,7 @@ class AbstractHostFilter(object):
"""Convert instance_type into a filter for most common use-case."""
raise NotImplementedError()
def filter_hosts(self, zone_manager, query):
def filter_hosts(self, host_list, query):
"""Return a list of hosts that fulfill the filter."""
raise NotImplementedError()
+5 -6
View File
@@ -15,7 +15,7 @@
import nova.scheduler
from nova.scheduler.filters import abstract_filter
import abstract_filter
class AllHostsFilter(abstract_filter.AbstractHostFilter):
@@ -24,9 +24,8 @@ class AllHostsFilter(abstract_filter.AbstractHostFilter):
"""Return anything to prevent base-class from raising
exception.
"""
return (self._full_name(), instance_type)
return instance_type
def filter_hosts(self, zone_manager, query):
"""Return a list of hosts from ZoneManager list."""
return [(host, services)
for host, services in zone_manager.service_states.iteritems()]
def filter_hosts(self, host_list, query):
"""Return the entire list of supplied hosts."""
return list(host_list)
+10 -4
View File
@@ -22,7 +22,7 @@ class InstanceTypeFilter(abstract_filter.AbstractHostFilter):
"""HostFilter hard-coded to work with InstanceType records."""
def instance_type_to_filter(self, instance_type):
"""Use instance_type to filter hosts."""
return (self._full_name(), instance_type)
return instance_type
def _satisfies_extra_specs(self, capabilities, instance_type):
"""Check that the capabilities provided by the compute service
@@ -40,14 +40,19 @@ class InstanceTypeFilter(abstract_filter.AbstractHostFilter):
return False
return True
def filter_hosts(self, zone_manager, query):
def filter_hosts(self, host_list, query):
"""Return a list of hosts that can create instance_type."""
instance_type = query
selected_hosts = []
for host, services in zone_manager.service_states.iteritems():
capabilities = services.get('compute', {})
for host, capabilities in host_list:
# In case the capabilities have not yet been extracted from
# the zone manager's services dict...
capabilities = capabilities.get("compute", capabilities)
if not capabilities:
continue
if not capabilities.get("enabled", True):
# Host is disabled
continue
host_ram_mb = capabilities['host_memory_free']
disk_bytes = capabilities['disk_available']
spec_ram = instance_type['memory_mb']
@@ -70,6 +75,7 @@ class InstanceTypeFilter(abstract_filter.AbstractHostFilter):
# 'host_other_config': {},
# 'host_ip_address': '192.168.1.109',
# 'host_cpu_info': {},
# 'enabled': True,
# 'disk_available': 32954957824,
# 'disk_total': 50394562560,
# 'disk_used': 17439604736,
+12 -8
View File
@@ -94,7 +94,7 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
query = ['and',
['>=', '$compute.host_memory_free', required_ram],
['>=', '$compute.disk_available', required_disk]]
return (self._full_name(), json.dumps(query))
return json.dumps(query)
def _parse_string(self, string, host, services):
"""Strings prefixed with $ are capability lookups in the
@@ -112,7 +112,7 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
return None
return services
def _process_filter(self, zone_manager, query, host, services):
def _process_filter(self, query, host, services):
"""Recursively parse the query structure."""
if not query:
return True
@@ -121,7 +121,7 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
cooked_args = []
for arg in query[1:]:
if isinstance(arg, list):
arg = self._process_filter(zone_manager, arg, host, services)
arg = self._process_filter(arg, host, services)
elif isinstance(arg, basestring):
arg = self._parse_string(arg, host, services)
if arg is not None:
@@ -129,18 +129,22 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
result = method(self, cooked_args)
return result
def filter_hosts(self, zone_manager, query):
def filter_hosts(self, host_list, query):
"""Return a list of hosts that can fulfill the requirements
specified in the query.
"""
expanded = json.loads(query)
filtered_hosts = []
for host, services in zone_manager.service_states.iteritems():
result = self._process_filter(zone_manager, expanded, host,
services)
for host, capabilities in host_list:
if not capabilities:
continue
if not capabilities.get("enabled", True):
# Host is disabled
continue
result = self._process_filter(expanded, host, capabilities)
if isinstance(result, list):
# If any succeeded, include the host
result = any(result)
if result:
filtered_hosts.append((host, services))
filtered_hosts.append((host, capabilities))
return filtered_hosts
+27 -14
View File
@@ -32,17 +32,16 @@ from nova import exception
from nova import flags
import nova.scheduler
# NOTE(Vek): Even though we don't use filters in here anywhere, we
# depend on default_host_filter being available in FLAGS,
# and that happens only when filters/abstract_filter.py is
# imported.
from nova.scheduler import filters
FLAGS = flags.FLAGS
flags.DEFINE_list('default_host_filters', ['AllHostsFilter'],
'Which filters to use for filtering hosts when not specified '
'in the request.')
def _get_filters():
def _get_filter_classes():
# Imported here to avoid circular imports
from nova.scheduler import filters
@@ -55,15 +54,29 @@ def _get_filters():
and get_itm(itm) is not filters.AbstractHostFilter]
def choose_host_filter(filter_name=None):
"""Since the caller may specify which filter to use we need
def choose_host_filters(filters=None):
"""Since the caller may specify which filters to use we need
to have an authoritative list of what is permissible. This
function checks the filter name against a predefined set
function checks the filter names against a predefined set
of acceptable filters.
"""
if not filter_name:
filter_name = FLAGS.default_host_filter
for filter_class in _get_filters():
if filter_class.__name__ == filter_name:
return filter_class()
raise exception.SchedulerHostFilterNotFound(filter_name=filter_name)
if not filters:
filters = FLAGS.default_host_filters
if not isinstance(filters, (list, tuple)):
filters = [filters]
good_filters = []
bad_filters = []
filter_classes = _get_filter_classes()
for filter_name in filters:
found_class = False
for cls in filter_classes:
if cls.__name__ == filter_name:
good_filters.append(cls())
found_class = True
break
if not found_class:
bad_filters.append(filter_name)
if bad_filters:
msg = ", ".join(bad_filters)
raise exception.SchedulerHostFilterNotFound(filter_name=msg)
return good_filters
+6 -3
View File
@@ -114,10 +114,13 @@ class LeastCostScheduler(base_scheduler.BaseScheduler):
self.cost_fns_cache = {}
super(LeastCostScheduler, self).__init__(*args, **kwargs)
def get_cost_fns(self, topic):
def get_cost_fns(self, topic=None):
"""Returns a list of tuples containing weights and cost functions to
use for weighing hosts
"""
if topic is None:
# Schedulers only support compute right now.
topic = "compute"
if topic in self.cost_fns_cache:
return self.cost_fns_cache[topic]
cost_fns = []
@@ -151,11 +154,11 @@ class LeastCostScheduler(base_scheduler.BaseScheduler):
self.cost_fns_cache[topic] = cost_fns
return cost_fns
def weigh_hosts(self, topic, request_spec, hosts):
def weigh_hosts(self, request_spec, hosts):
"""Returns a list of dictionaries of form:
[ {weight: weight, hostname: hostname, capabilities: capabs} ]
"""
cost_fns = self.get_cost_fns(topic)
cost_fns = self.get_cost_fns()
costs = weighted_sum(domain=hosts, weighted_fns=cost_fns)
weighted = []
@@ -455,8 +455,7 @@ class BaseSchedulerTestCase(test.TestCase):
# Call weigh_hosts()
num_instances = len(hostlist) * 2 + len(hostlist) / 2
instlist = sched.weigh_hosts('compute',
dict(num_instances=num_instances),
instlist = sched.weigh_hosts(dict(num_instances=num_instances),
hostlist)
# Should be enough entries to cover all instances
+41 -33
View File
@@ -18,10 +18,10 @@ Tests For Scheduler Host Filters.
import json
import nova
from nova import exception
from nova import test
from nova.scheduler import host_filter
from nova.scheduler import filters
class FakeZoneManager:
@@ -52,12 +52,13 @@ class HostFilterTestCase(test.TestCase):
'disk_total': 1000,
'disk_used': 0,
'host_uuid': 'xxx-%d' % multiplier,
'host_name-label': 'xs-%s' % multiplier}
'host_name-label': 'xs-%s' % multiplier,
'enabled': True}
def setUp(self):
super(HostFilterTestCase, self).setUp()
default_host_filter = 'AllHostsFilter'
self.flags(default_host_filter=default_host_filter)
default_host_filters = ['AllHostsFilter']
self.flags(default_host_filters=default_host_filters)
self.instance_type = dict(name='tiny',
memory_mb=50,
vcpus=10,
@@ -96,34 +97,41 @@ class HostFilterTestCase(test.TestCase):
host09['xpu_arch'] = 'fermi'
host09['xpu_info'] = 'Tesla 2150'
def _get_all_hosts(self):
return self.zone_manager.service_states.items()
def test_choose_filter(self):
# Test default filter ...
hf = host_filter.choose_host_filter()
hfs = host_filter.choose_host_filters()
hf = hfs[0]
self.assertEquals(hf._full_name().split(".")[-1], 'AllHostsFilter')
# Test valid filter ...
hf = host_filter.choose_host_filter('InstanceTypeFilter')
hfs = host_filter.choose_host_filters('InstanceTypeFilter')
hf = hfs[0]
self.assertEquals(hf._full_name().split(".")[-1], 'InstanceTypeFilter')
# Test invalid filter ...
try:
host_filter.choose_host_filter('does not exist')
host_filter.choose_host_filters('does not exist')
self.fail("Should not find host filter.")
except exception.SchedulerHostFilterNotFound:
pass
def test_all_host_filter(self):
hf = filters.AllHostsFilter()
hfs = host_filter.choose_host_filters('AllHostsFilter')
hf = hfs[0]
all_hosts = self._get_all_hosts()
cooked = hf.instance_type_to_filter(self.instance_type)
hosts = hf.filter_hosts(self.zone_manager, cooked)
hosts = hf.filter_hosts(all_hosts, cooked)
self.assertEquals(10, len(hosts))
for host, capabilities in hosts:
self.assertTrue(host.startswith('host'))
def test_instance_type_filter(self):
hf = filters.InstanceTypeFilter()
hf = nova.scheduler.filters.InstanceTypeFilter()
# filter all hosts that can support 50 ram and 500 disk
name, cooked = hf.instance_type_to_filter(self.instance_type)
self.assertEquals(name.split(".")[-1], 'InstanceTypeFilter')
hosts = hf.filter_hosts(self.zone_manager, cooked)
cooked = hf.instance_type_to_filter(self.instance_type)
all_hosts = self._get_all_hosts()
hosts = hf.filter_hosts(all_hosts, cooked)
self.assertEquals(6, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
@@ -131,21 +139,21 @@ class HostFilterTestCase(test.TestCase):
self.assertEquals('host10', just_hosts[5])
def test_instance_type_filter_extra_specs(self):
hf = filters.InstanceTypeFilter()
hf = nova.scheduler.filters.InstanceTypeFilter()
# filter all hosts that can support 50 ram and 500 disk
name, cooked = hf.instance_type_to_filter(self.gpu_instance_type)
self.assertEquals(name.split(".")[-1], 'InstanceTypeFilter')
hosts = hf.filter_hosts(self.zone_manager, cooked)
cooked = hf.instance_type_to_filter(self.gpu_instance_type)
all_hosts = self._get_all_hosts()
hosts = hf.filter_hosts(all_hosts, cooked)
self.assertEquals(1, len(hosts))
just_hosts = [host for host, caps in hosts]
self.assertEquals('host07', just_hosts[0])
def test_json_filter(self):
hf = filters.JsonFilter()
hf = nova.scheduler.filters.JsonFilter()
# filter all hosts that can support 50 ram and 500 disk
name, cooked = hf.instance_type_to_filter(self.instance_type)
self.assertEquals(name.split(".")[-1], 'JsonFilter')
hosts = hf.filter_hosts(self.zone_manager, cooked)
cooked = hf.instance_type_to_filter(self.instance_type)
all_hosts = self._get_all_hosts()
hosts = hf.filter_hosts(all_hosts, cooked)
self.assertEquals(6, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
@@ -165,7 +173,7 @@ class HostFilterTestCase(test.TestCase):
]
]
cooked = json.dumps(raw)
hosts = hf.filter_hosts(self.zone_manager, cooked)
hosts = hf.filter_hosts(all_hosts, cooked)
self.assertEquals(5, len(hosts))
just_hosts = [host for host, caps in hosts]
@@ -177,7 +185,7 @@ class HostFilterTestCase(test.TestCase):
['=', '$compute.host_memory_free', 30],
]
cooked = json.dumps(raw)
hosts = hf.filter_hosts(self.zone_manager, cooked)
hosts = hf.filter_hosts(all_hosts, cooked)
self.assertEquals(9, len(hosts))
just_hosts = [host for host, caps in hosts]
@@ -187,7 +195,7 @@ class HostFilterTestCase(test.TestCase):
raw = ['in', '$compute.host_memory_free', 20, 40, 60, 80, 100]
cooked = json.dumps(raw)
hosts = hf.filter_hosts(self.zone_manager, cooked)
hosts = hf.filter_hosts(all_hosts, cooked)
self.assertEquals(5, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
@@ -198,32 +206,32 @@ class HostFilterTestCase(test.TestCase):
raw = ['unknown command', ]
cooked = json.dumps(raw)
try:
hf.filter_hosts(self.zone_manager, cooked)
hf.filter_hosts(all_hosts, cooked)
self.fail("Should give KeyError")
except KeyError, e:
pass
self.assertTrue(hf.filter_hosts(self.zone_manager, json.dumps([])))
self.assertTrue(hf.filter_hosts(self.zone_manager, json.dumps({})))
self.assertTrue(hf.filter_hosts(self.zone_manager, json.dumps(
self.assertTrue(hf.filter_hosts(all_hosts, json.dumps([])))
self.assertTrue(hf.filter_hosts(all_hosts, json.dumps({})))
self.assertTrue(hf.filter_hosts(all_hosts, json.dumps(
['not', True, False, True, False],
)))
try:
hf.filter_hosts(self.zone_manager, json.dumps(
hf.filter_hosts(all_hosts, json.dumps(
'not', True, False, True, False,
))
self.fail("Should give KeyError")
except KeyError, e:
pass
self.assertFalse(hf.filter_hosts(self.zone_manager,
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(['=', '$foo', 100])))
self.assertFalse(hf.filter_hosts(self.zone_manager,
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(['=', '$.....', 100])))
self.assertFalse(hf.filter_hosts(self.zone_manager,
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(
['>', ['and', ['or', ['not', ['<', ['>=', ['<=', ['in', ]]]]]]]])))
self.assertFalse(hf.filter_hosts(self.zone_manager,
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(['=', {}, ['>', '$missing....foo']])))
@@ -82,7 +82,7 @@ class LeastCostSchedulerTestCase(test.TestCase):
super(LeastCostSchedulerTestCase, self).tearDown()
def assertWeights(self, expected, num, request_spec, hosts):
weighted = self.sched.weigh_hosts("compute", request_spec, hosts)
weighted = self.sched.weigh_hosts(request_spec, hosts)
self.assertDictListMatch(weighted, expected, approx_equal=True)
def test_no_hosts(self):
@@ -97,50 +97,20 @@ class LeastCostSchedulerTestCase(test.TestCase):
self.flags(least_cost_scheduler_cost_functions=[
'nova.scheduler.least_cost.noop_cost_fn'],
noop_cost_fn_weight=1)
num = 1
request_spec = {}
hosts = self.sched.filter_hosts(num, request_spec)
expected = [dict(weight=1, hostname=hostname)
for hostname, caps in hosts]
expected = [{"hostname": hostname, "weight": 1, "capabilities": caps}
for hostname, caps in hosts]
self.assertWeights(expected, num, request_spec, hosts)
def test_cost_fn_weights(self):
self.flags(least_cost_scheduler_cost_functions=[
'nova.scheduler.least_cost.noop_cost_fn'],
noop_cost_fn_weight=2)
num = 1
request_spec = {}
hosts = self.sched.filter_hosts(num, request_spec)
expected = [dict(weight=2, hostname=hostname)
for hostname, caps in hosts]
self.assertWeights(expected, num, request_spec, hosts)
def test_compute_fill_first_cost_fn(self):
self.flags(least_cost_scheduler_cost_functions=[
'nova.scheduler.least_cost.compute_fill_first_cost_fn'],
compute_fill_first_cost_fn_weight=1)
num = 1
instance_type = {'memory_mb': 1024}
request_spec = {'instance_type': instance_type}
svc_states = self.sched.zone_manager.service_states.iteritems()
all_hosts = [(host, services["compute"])
for host, services in svc_states
if "compute" in services]
hosts = self.sched.filter_hosts('compute', request_spec, all_hosts)
expected = []
for idx, (hostname, services) in enumerate(hosts):
caps = copy.deepcopy(services)
# Costs are normalized so over 10 hosts, each host with increasing
# free ram will cost 1/N more. Since the lowest cost host has some
# free ram, we add in the 1/N for the base_cost
weight = 0.1 + (0.1 * idx)
wtd_dict = dict(hostname=hostname, weight=weight,
capabilities=caps)
expected.append(wtd_dict)
expected = [{"hostname": hostname, "weight": 2, "capabilities": caps}
for hostname, caps in hosts]
self.assertWeights(expected, num, request_spec, hosts)
@@ -271,6 +271,7 @@ def cleanup(dct):
# avv["major"] = safe_int(dct.get("API-version-major", ""))
# avv["minor"] = safe_int(dct.get("API-version-minor", ""))
out["enabled"] = dct.get("enabled", True)
out["host_uuid"] = dct.get("uuid", None)
out["host_name-label"] = dct.get("name-label", "")
out["host_name-description"] = dct.get("name-description", "")