Merge "Fix quantum security group driver to accept none for from/to_port"
This commit is contained in:
@@ -79,15 +79,17 @@ class SecurityGroupAPI(security_group_base.SecurityGroupBase):
|
||||
nova_rule['id'] = rule['id']
|
||||
nova_rule['parent_group_id'] = rule['security_group_id']
|
||||
nova_rule['protocol'] = rule['protocol']
|
||||
if rule['port_range_min'] is None:
|
||||
nova_rule['from_port'] = -1
|
||||
if (nova_rule['protocol'] and rule.get('port_range_min') is None and
|
||||
rule.get('port_range_max') is None):
|
||||
if nova_rule['protocol'].upper() == 'ICMP':
|
||||
nova_rule['from_port'] = -1
|
||||
nova_rule['to_port'] = -1
|
||||
elif rule['protocol'].upper() in ['TCP', 'UDP']:
|
||||
nova_rule['from_port'] = 1
|
||||
nova_rule['to_port'] = 65535
|
||||
else:
|
||||
nova_rule['from_port'] = rule['port_range_min']
|
||||
|
||||
if rule['port_range_max'] is None:
|
||||
nova_rule['to_port'] = -1
|
||||
else:
|
||||
nova_rule['to_port'] = rule['port_range_max']
|
||||
nova_rule['from_port'] = rule.get('port_range_min')
|
||||
nova_rule['to_port'] = rule.get('port_range_max')
|
||||
nova_rule['group_id'] = rule['remote_group_id']
|
||||
nova_rule['cidr'] = rule['remote_ip_prefix']
|
||||
return nova_rule
|
||||
@@ -207,9 +209,9 @@ class SecurityGroupAPI(security_group_base.SecurityGroupBase):
|
||||
new_rule['remote_ip_prefix'] = rule.get('cidr')
|
||||
new_rule['security_group_id'] = rule.get('parent_group_id')
|
||||
new_rule['remote_group_id'] = rule.get('group_id')
|
||||
if rule['from_port'] != -1:
|
||||
if 'from_port' in rule and rule['from_port'] != -1:
|
||||
new_rule['port_range_min'] = rule['from_port']
|
||||
if rule['to_port'] != -1:
|
||||
if 'to_port' in rule and rule['to_port'] != -1:
|
||||
new_rule['port_range_max'] = rule['to_port']
|
||||
new_rules.append(new_rule)
|
||||
return {'security_group_rules': new_rules}
|
||||
|
||||
@@ -727,6 +727,46 @@ class TestSecurityGroupRules(test.TestCase):
|
||||
self.assertEquals(security_group_rule['from_port'], 81)
|
||||
self.assertEquals(security_group_rule['to_port'], 81)
|
||||
|
||||
def test_create_none_value_from_to_port(self):
|
||||
rule = {'parent_group_id': self.sg1['id'],
|
||||
'group_id': self.sg1['id']}
|
||||
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
|
||||
res_dict = self.controller.create(req, {'security_group_rule': rule})
|
||||
security_group_rule = res_dict['security_group_rule']
|
||||
self.assertEquals(security_group_rule['from_port'], None)
|
||||
self.assertEquals(security_group_rule['to_port'], None)
|
||||
self.assertEquals(security_group_rule['group']['name'], 'test')
|
||||
self.assertEquals(security_group_rule['parent_group_id'],
|
||||
self.sg1['id'])
|
||||
|
||||
def test_create_none_value_from_to_port_icmp(self):
|
||||
rule = {'parent_group_id': self.sg1['id'],
|
||||
'group_id': self.sg1['id'],
|
||||
'ip_protocol': 'ICMP'}
|
||||
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
|
||||
res_dict = self.controller.create(req, {'security_group_rule': rule})
|
||||
security_group_rule = res_dict['security_group_rule']
|
||||
self.assertEquals(security_group_rule['ip_protocol'], 'ICMP')
|
||||
self.assertEquals(security_group_rule['from_port'], -1)
|
||||
self.assertEquals(security_group_rule['to_port'], -1)
|
||||
self.assertEquals(security_group_rule['group']['name'], 'test')
|
||||
self.assertEquals(security_group_rule['parent_group_id'],
|
||||
self.sg1['id'])
|
||||
|
||||
def test_create_none_value_from_to_port_tcp(self):
|
||||
rule = {'parent_group_id': self.sg1['id'],
|
||||
'group_id': self.sg1['id'],
|
||||
'ip_protocol': 'TCP'}
|
||||
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
|
||||
res_dict = self.controller.create(req, {'security_group_rule': rule})
|
||||
security_group_rule = res_dict['security_group_rule']
|
||||
self.assertEquals(security_group_rule['ip_protocol'], 'TCP')
|
||||
self.assertEquals(security_group_rule['from_port'], 1)
|
||||
self.assertEquals(security_group_rule['to_port'], 65535)
|
||||
self.assertEquals(security_group_rule['group']['name'], 'test')
|
||||
self.assertEquals(security_group_rule['parent_group_id'],
|
||||
self.sg1['id'])
|
||||
|
||||
def test_create_by_invalid_cidr_json(self):
|
||||
rule = security_group_rule_template(
|
||||
ip_protocol="tcp",
|
||||
|
||||
Reference in New Issue
Block a user