From 95c74a26d40883faa053febc72657a8b8e6c5bd3 Mon Sep 17 00:00:00 2001 From: Ghanshyam Mann Date: Tue, 14 Apr 2020 14:10:52 -0500 Subject: [PATCH] Add test coverage of existing remaining servers policies Current tests do not have good test coverage of existing policies. Either tests for policies do not exist or if they exist then they do not cover the actual negative and positive testing. For Example, if any policy with default rule as admin only then test should verify: - policy check pass with context having admin or server owner - policy check fail with context having non-admin and not server owner As discussed in policy-defaults-refresh, to change the policies with new default roles and scope_type, we need to have the enough testing coverage of existing policy behavior. When we will add the scope_type in policies or new default roles, then these test coverage will be extended to adopt the new changes and also make sure we do not break the existing behavior. This commit covers the testing coverage of existing remaining server policies. Partial implement blueprint policy-defaults-refresh Change-Id: Ibfc1917d35ab9e7657b13ec32582891f06d88176 --- nova/tests/unit/policies/test_servers.py | 180 +++++++++++++++++++++++ 1 file changed, 180 insertions(+) diff --git a/nova/tests/unit/policies/test_servers.py b/nova/tests/unit/policies/test_servers.py index 10be313028..2e261956db 100644 --- a/nova/tests/unit/policies/test_servers.py +++ b/nova/tests/unit/policies/test_servers.py @@ -14,10 +14,14 @@ import fixtures import mock from oslo_utils.fixture import uuidsentinel as uuids +from oslo_utils import timeutils +from nova.api.openstack.compute import migrate_server from nova.api.openstack.compute import servers +from nova.compute import api as compute from nova.compute import vm_states from nova import exception +from nova.network import neutron from nova import objects from nova.objects import fields from nova.objects.instance_group import InstanceGroup @@ -40,9 +44,11 @@ class ServersPolicyTest(base.BasePolicyTest): def setUp(self): super(ServersPolicyTest, self).setUp() self.controller = servers.ServersController() + self.m_controller = migrate_server.MigrateServerController() self.rule_trusted_certs = policies.SERVERS % 'create:trusted_certs' self.rule_attach_network = policies.SERVERS % 'create:attach_network' self.rule_attach_volume = policies.SERVERS % 'create:attach_volume' + self.rule_requested_destination = policies.REQUESTED_DESTINATION self.req = fakes.HTTPRequest.blank('') user_id = self.req.environ['nova.context'].user_id @@ -181,6 +187,44 @@ class ServersPolicyTest(base.BasePolicyTest): # Check that non-project member is not able to create server self.project_member_unauthorized_contexts = [ ] + # Check that project admin is able to create server with requested + # destination. + self.project_admin_authorized_contexts = [ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context] + # Check that non-project admin is not able to create server with + # requested destination + self.project_admin_unauthorized_contexts = [ + self.system_member_context, self.system_reader_context, + self.system_foo_context, self.project_member_context, + self.project_reader_context, self.project_foo_context, + self.other_project_member_context, + self.other_project_reader_context + ] + # Check that no one is able to resize cross cell. + self.cross_cell_authorized_contexts = [] + self.cross_cell_unauthorized_contexts = [ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context, self.project_member_context, + self.project_reader_context, self.project_foo_context, + self.system_member_context, self.system_reader_context, + self.system_foo_context, + self.other_project_member_context, + self.other_project_reader_context] + # Check that admin is able to access the zero disk flavor + # and external network policies. + self.zero_disk_external_net_authorized_contexts = [ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context] + # Check that non-admin is not able to caccess the zero disk flavor + # and external network policies. + self.zero_disk_external_net_unauthorized_contexts = [ + self.system_member_context, self.system_reader_context, + self.system_foo_context, self.project_member_context, + self.project_reader_context, self.project_foo_context, + self.other_project_member_context, + self.other_project_reader_context + ] # Check that admin is able to get server extended attributes # or host status. self.server_attr_admin_authorized_contexts = [ @@ -1136,6 +1180,126 @@ class ServersPolicyTest(base.BasePolicyTest): for resp in unauthorize_res: self.assertNotIn('host_status', resp['server']) + @mock.patch('nova.compute.api.API.create') + def test_create_requested_destination_server_policy(self, + mock_create): + # 'create' policy is checked before 'create:requested_destination' so + # we have to allow it for everyone otherwise it will + # fail for unauthorized contexts here. + rule = policies.SERVERS % 'create' + self.policy.set_rules({rule: "@"}, overwrite=False) + req = fakes.HTTPRequest.blank('', version='2.74') + + def fake_create(context, *args, **kwargs): + for attr in ['requested_host', 'requested_hypervisor_hostname']: + if context in self.project_admin_authorized_contexts: + self.assertIn(attr, kwargs) + if context in self.project_admin_unauthorized_contexts: + self.assertNotIn(attr, kwargs) + return ([self.instance], '') + mock_create.side_effect = fake_create + + body = { + 'server': { + 'name': 'server_test', + 'imageRef': uuids.fake_id, + 'flavorRef': uuids.fake_id, + 'networks': [{ + 'uuid': uuids.fake_id + }], + 'host': 'fake', + 'hypervisor_hostname': 'fake' + }, + } + + self.common_policy_check(self.project_admin_authorized_contexts, + self.project_admin_unauthorized_contexts, + self.rule_requested_destination, + self.controller.create, + req, body=body) + + @mock.patch('nova.compute.api.API._check_requested_networks') + @mock.patch('nova.compute.api.API._allow_resize_to_same_host') + @mock.patch('nova.objects.RequestSpec.get_by_instance_uuid') + @mock.patch('nova.objects.Instance.save') + @mock.patch('nova.api.openstack.common.get_instance') + @mock.patch('nova.api.openstack.common.' + 'instance_has_port_with_resource_request') + @mock.patch('nova.conductor.ComputeTaskAPI.resize_instance') + def test_cross_cell_resize_server_policy(self, + mock_resize, mock_port, mock_get, mock_save, mock_rs, + mock_allow, m_net): + self.stub_out('nova.compute.api.API.get_instance_host_status', + lambda x, y: "UP") + + # 'migrate' policy is checked before 'resize:cross_cell' so + # we have to allow it for everyone otherwise it will + # fail for unauthorized contexts here. + rule = 'os_compute_api:os-migrate-server:migrate' + self.policy.set_rules({rule: "@"}, overwrite=False) + rule_name = policies.CROSS_CELL_RESIZE + mock_port.return_value = False + req = fakes.HTTPRequest.blank('', version='2.56') + + def fake_get(*args, **kwargs): + return fake_instance.fake_instance_obj( + self.project_member_context, + id=1, uuid=uuids.fake_id, project_id=self.project_id, + user_id='fake-user', vm_state=vm_states.ACTIVE, + launched_at=timeutils.utcnow()) + + mock_get.side_effect = fake_get + + def fake_validate(context, instance, + host_name, allow_cross_cell_resize): + if context in self.cross_cell_authorized_contexts: + self.assertTrue(allow_cross_cell_resize) + if context in self.cross_cell_unauthorized_contexts: + self.assertFalse(allow_cross_cell_resize) + return objects.ComputeNode(host=1, hypervisor_hostname=2) + + self.stub_out( + 'nova.compute.api.API._validate_host_for_cold_migrate', + fake_validate) + + self.common_policy_check(self.cross_cell_authorized_contexts, + self.cross_cell_unauthorized_contexts, + rule_name, + self.m_controller._migrate, + req, self.instance.uuid, + body={'migrate': {'host': 'fake'}}, + fatal=False) + + def test_network_attach_external_network_policy(self): + # NOTE(gmann): Testing policy 'network:attach_external_network' + # which raise different error then PolicyNotAuthorized + # if not allowed. + neutron_api = neutron.API() + for context in self.zero_disk_external_net_authorized_contexts: + neutron_api._check_external_network_attach(context, + [{'id': 1, 'router:external': 'ext'}]) + for context in self.zero_disk_external_net_unauthorized_contexts: + self.assertRaises(exception.ExternalNetworkAttachForbidden, + neutron_api._check_external_network_attach, + context, [{'id': 1, 'router:external': 'ext'}]) + + def test_zero_disk_flavor_policy(self): + # NOTE(gmann): Testing policy 'create:zero_disk_flavor' + # which raise different error then PolicyNotAuthorized + # if not allowed. + image = {'id': uuids.image_id, 'status': 'foo'} + flavor = objects.Flavor( + vcpus=1, memory_mb=512, root_gb=0, extra_specs={'hw:pmu': "true"}) + compute_api = compute.API() + for context in self.zero_disk_external_net_authorized_contexts: + compute_api._validate_flavor_image_nostatus(context, + image, flavor, None) + for context in self.zero_disk_external_net_unauthorized_contexts: + self.assertRaises( + exception.BootFromVolumeRequiredForZeroDiskFlavor, + compute_api._validate_flavor_image_nostatus, + context, image, flavor, None) + class ServersScopeTypePolicyTest(ServersPolicyTest): """Test Servers APIs policies with system scope enabled. @@ -1160,6 +1324,7 @@ class ServersScopeTypePolicyTest(ServersPolicyTest): self.rule_trusted_certs = None self.rule_attach_network = None self.rule_attach_volume = None + self.rule_requested_destination = None # Check that system admin is able to create server with host request # and get server extended attributes or host status. @@ -1205,6 +1370,21 @@ class ServersScopeTypePolicyTest(ServersPolicyTest): self.system_reader_context, self.system_foo_context ] + # Check that project admin is able to create server with requested + # destination. + self.project_admin_authorized_contexts = [ + self.legacy_admin_context, self.project_admin_context] + # Check that non-project admin is not able to create server with + # requested destination + self.project_admin_unauthorized_contexts = [ + self.system_admin_context, + self.system_member_context, self.system_reader_context, + self.system_foo_context, self.project_member_context, + self.project_reader_context, self.project_foo_context, + self.other_project_member_context, + self.other_project_reader_context + ] + @mock.patch('nova.compute.api.API.create') @mock.patch('nova.compute.api.API.parse_availability_zone') def test_create_forced_host_server_policy(self, mock_az, mock_create):