diff --git a/nova/tests/unit/api/openstack/compute/test_lock_server.py b/nova/tests/unit/api/openstack/compute/test_lock_server.py index 29075d1aa4..c5bed64cc5 100644 --- a/nova/tests/unit/api/openstack/compute/test_lock_server.py +++ b/nova/tests/unit/api/openstack/compute/test_lock_server.py @@ -19,18 +19,14 @@ import six from nova.api.openstack import api_version_request from nova.api.openstack import common from nova.api.openstack.compute import lock_server as lock_server_v21 -from nova import context from nova import exception -from nova import test from nova.tests.unit.api.openstack.compute import admin_only_action_common -from nova.tests.unit.api.openstack import fakes from nova.tests.unit import fake_instance class LockServerTestsV21(admin_only_action_common.CommonTests): lock_server = lock_server_v21 controller_name = 'LockServerController' - authorization_error = exception.PolicyNotAuthorized _api_version = '2.1' def setUp(self): @@ -52,44 +48,6 @@ class LockServerTestsV21(admin_only_action_common.CommonTests): self._test_actions_with_non_existed_instance(['_lock', '_unlock'], body_map=body_map) - def test_unlock_not_authorized(self): - instance = self._stub_instance_get() - - body = {} - with mock.patch.object( - self.compute_api, 'unlock', - side_effect=exception.PolicyNotAuthorized( - action='unlock')) as mock_unlock: - self.assertRaises(self.authorization_error, - self.controller._unlock, - self.req, instance.uuid, body) - mock_unlock.assert_called_once_with(self.context, instance) - self.mock_get.assert_called_once_with(self.context, instance.uuid, - expected_attrs=None, - cell_down_support=False) - - @mock.patch.object(common, 'get_instance') - def test_unlock_override_not_authorized_with_non_admin_user( - self, mock_get_instance): - instance = fake_instance.fake_instance_obj(self.context) - instance.locked_by = "owner" - mock_get_instance.return_value = instance - self.assertRaises(self.authorization_error, - self.controller._unlock, self.req, - instance.uuid, - {'unlock': None}) - - @mock.patch.object(common, 'get_instance') - def test_unlock_override_with_admin_user(self, mock_get_instance): - admin_req = fakes.HTTPRequest.blank('', use_admin_context=True) - admin_ctxt = admin_req.environ['nova.context'] - instance = fake_instance.fake_instance_obj(admin_ctxt) - instance.locked_by = "owner" - mock_get_instance.return_value = instance - with mock.patch.object(self.compute_api, 'unlock') as mock_unlock: - self.controller._unlock(admin_req, instance.uuid, {'unlock': None}) - mock_unlock.assert_called_once_with(admin_ctxt, instance) - @mock.patch.object(common, 'get_instance') def test_unlock_with_any_body(self, get_instance_mock): instance = fake_instance.fake_instance_obj( @@ -167,96 +125,3 @@ class LockServerTestsV273(LockServerTestsV21): exp = self.assertRaises(exception.ValidationError, self.controller._lock, self.req, instance.uuid, body=body) self.assertIn("('blah' was unexpected)", six.text_type(exp)) - - -class LockServerPolicyEnforcementV21(test.NoDBTestCase): - - def setUp(self): - super(LockServerPolicyEnforcementV21, self).setUp() - self.controller = lock_server_v21.LockServerController() - self.req = fakes.HTTPRequest.blank('') - - @mock.patch('nova.api.openstack.common.get_instance') - def test_lock_policy_failed_with_other_project(self, get_instance_mock): - get_instance_mock.return_value = fake_instance.fake_instance_obj( - self.req.environ['nova.context'], - project_id=self.req.environ['nova.context'].project_id) - rule_name = "os_compute_api:os-lock-server:lock" - self.policy.set_rules({rule_name: "project_id:%(project_id)s"}) - # Change the project_id in request context. - self.req.environ['nova.context'].project_id = 'other-project' - exc = self.assertRaises( - exception.PolicyNotAuthorized, - self.controller._lock, self.req, - fakes.FAKE_UUID, - body={'lock': {}}) - self.assertEqual( - "Policy doesn't allow %s to be performed." % rule_name, - exc.format_message()) - - @mock.patch('nova.api.openstack.common.get_instance') - def test_lock_overridden_policy_failed_with_other_user_in_same_project( - self, get_instance_mock): - get_instance_mock.return_value = ( - fake_instance.fake_instance_obj(self.req.environ['nova.context'])) - rule_name = "os_compute_api:os-lock-server:lock" - self.policy.set_rules({rule_name: "user_id:%(user_id)s"}) - # Change the user_id in request context. - self.req.environ['nova.context'].user_id = 'other-user' - exc = self.assertRaises(exception.PolicyNotAuthorized, - self.controller._lock, self.req, - fakes.FAKE_UUID, body={'lock': {}}) - self.assertEqual( - "Policy doesn't allow %s to be performed." % rule_name, - exc.format_message()) - - @mock.patch('nova.compute.api.API.lock') - @mock.patch('nova.api.openstack.common.get_instance') - def test_lock_overridden_policy_pass_with_same_user(self, - get_instance_mock, - lock_mock): - instance = fake_instance.fake_instance_obj( - self.req.environ['nova.context'], - user_id=self.req.environ['nova.context'].user_id) - get_instance_mock.return_value = instance - rule_name = "os_compute_api:os-lock-server:lock" - self.policy.set_rules({rule_name: "user_id:%(user_id)s"}) - self.controller._lock(self.req, fakes.FAKE_UUID, body={'lock': {}}) - lock_mock.assert_called_once_with(self.req.environ['nova.context'], - instance, reason=None) - - @mock.patch('nova.api.openstack.common.get_instance') - def test_unlock_policy_failed(self, get_instance_mock): - instance = fake_instance.fake_instance_obj( - self.req.environ['nova.context'], - user_id=self.req.environ['nova.context'].user_id) - get_instance_mock.return_value = instance - rule_name = "os_compute_api:os-lock-server:unlock" - self.policy.set_rules({rule_name: "project:non_fake"}) - exc = self.assertRaises( - exception.PolicyNotAuthorized, - self.controller._unlock, self.req, - fakes.FAKE_UUID, - body={'unlock': {}}) - self.assertEqual( - "Policy doesn't allow %s to be performed." % rule_name, - exc.format_message()) - - @mock.patch.object(common, 'get_instance') - def test_unlock_policy_failed_with_unlock_override(self, - get_instance_mock): - ctxt = context.RequestContext('fake', 'fake') - instance = fake_instance.fake_instance_obj(ctxt) - instance.locked_by = "fake" - get_instance_mock.return_value = instance - rule_name = ("os_compute_api:os-lock-server:" - "unlock:unlock_override") - rules = {"os_compute_api:os-lock-server:unlock": "@", - rule_name: "project:non_fake"} - self.policy.set_rules(rules) - exc = self.assertRaises( - exception.PolicyNotAuthorized, self.controller._unlock, - self.req, fakes.FAKE_UUID, body={'unlock': {}}) - self.assertEqual( - "Policy doesn't allow %s to be performed." % rule_name, - exc.format_message()) diff --git a/nova/tests/unit/policies/test_lock_server.py b/nova/tests/unit/policies/test_lock_server.py new file mode 100644 index 0000000000..576fdf92eb --- /dev/null +++ b/nova/tests/unit/policies/test_lock_server.py @@ -0,0 +1,146 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import fixtures +import mock +from nova.policies import lock_server as ls_policies +from oslo_utils.fixture import uuidsentinel as uuids +from oslo_utils import timeutils + +from nova.api.openstack.compute import lock_server +from nova.compute import vm_states +from nova import exception +from nova.tests.unit.api.openstack import fakes +from nova.tests.unit import fake_instance +from nova.tests.unit.policies import base + + +class LockServerPolicyTest(base.BasePolicyTest): + """Test Lock server APIs policies with all possible context. + This class defines the set of context with different roles + which are allowed and not allowed to pass the policy checks. + With those set of context, it will call the API operation and + verify the expected behaviour. + """ + + def setUp(self): + super(LockServerPolicyTest, self).setUp() + self.controller = lock_server.LockServerController() + self.req = fakes.HTTPRequest.blank('') + user_id = self.req.environ['nova.context'].user_id + self.mock_get = self.useFixture( + fixtures.MockPatch('nova.api.openstack.common.get_instance')).mock + uuid = uuids.fake_id + self.instance = fake_instance.fake_instance_obj( + self.project_member_context, + id=1, uuid=uuid, project_id=self.project_id, + user_id=user_id, vm_state=vm_states.ACTIVE, + task_state=None, launched_at=timeutils.utcnow()) + self.mock_get.return_value = self.instance + + # Check that admin or and server owner is able to lock/unlock + # the sevrer + self.admin_or_owner_authorized_contexts = [ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context, self.project_member_context, + self.project_reader_context, self.project_foo_context] + # Check that non-admin/owner is not able to lock/unlock + # the server + self.admin_or_owner_unauthorized_contexts = [ + self.system_member_context, self.system_reader_context, + self.system_foo_context, + self.other_project_member_context + ] + # Check that admin is able to unlock the server which is + # locked by other + self.admin_authorized_contexts = [ + self.legacy_admin_context, self.system_admin_context, + self.project_admin_context] + # Check that non-admin is not able to unlock the server + # which is locked by other + self.admin_unauthorized_contexts = [ + self.system_member_context, self.system_reader_context, + self.system_foo_context, self.project_member_context, + self.project_reader_context, self.project_foo_context, + self.other_project_member_context + ] + + @mock.patch('nova.compute.api.API.lock') + def test_lock_server_policy(self, mock_lock): + rule_name = ls_policies.POLICY_ROOT % 'lock' + self.common_policy_check(self.admin_or_owner_authorized_contexts, + self.admin_or_owner_unauthorized_contexts, + rule_name, + self.controller._lock, + self.req, self.instance.uuid, + body={'lock': {}}) + + @mock.patch('nova.compute.api.API.unlock') + def test_unlock_server_policy(self, mock_unlock): + rule_name = ls_policies.POLICY_ROOT % 'unlock' + self.common_policy_check(self.admin_or_owner_authorized_contexts, + self.admin_or_owner_unauthorized_contexts, + rule_name, + self.controller._unlock, + self.req, self.instance.uuid, + body={'unlock': {}}) + + @mock.patch('nova.compute.api.API.unlock') + @mock.patch('nova.compute.api.API.is_expected_locked_by') + def test_unlock_override_server_policy(self, mock_expected, mock_unlock): + mock_expected.return_value = False + rule = ls_policies.POLICY_ROOT % 'unlock' + self.policy.set_rules({rule: "@"}, overwrite=False) + rule_name = ls_policies.POLICY_ROOT % 'unlock:unlock_override' + self.common_policy_check(self.admin_authorized_contexts, + self.admin_unauthorized_contexts, + rule_name, + self.controller._unlock, + self.req, self.instance.uuid, + body={'unlock': {}}) + + def test_lock_server_policy_failed_with_other_user(self): + # Change the user_id in request context. + req = fakes.HTTPRequest.blank('') + req.environ['nova.context'].user_id = 'other-user' + rule_name = ls_policies.POLICY_ROOT % 'lock' + self.policy.set_rules({rule_name: "user_id:%(user_id)s"}) + exc = self.assertRaises( + exception.PolicyNotAuthorized, self.controller._lock, + req, fakes.FAKE_UUID, body={'lock': {}}) + self.assertEqual( + "Policy doesn't allow %s to be performed." % rule_name, + exc.format_message()) + + @mock.patch('nova.compute.api.API.lock') + def test_lock_sevrer_overridden_policy_pass_with_same_user( + self, mock_lock): + rule_name = ls_policies.POLICY_ROOT % 'lock' + self.policy.set_rules({rule_name: "user_id:%(user_id)s"}) + self.controller._lock(self.req, + fakes.FAKE_UUID, + body={'lock': {}}) + + +class LockServerScopeTypePolicyTest(LockServerPolicyTest): + """Test Lock Server APIs policies with system scope enabled. + This class set the nova.conf [oslo_policy] enforce_scope to True + so that we can switch on the scope checking on oslo policy side. + It defines the set of context with scoped token + which are allowed and not allowed to pass the policy checks. + With those set of context, it will run the API operation and + verify the expected behaviour. + """ + + def setUp(self): + super(LockServerScopeTypePolicyTest, self).setUp() + self.flags(enforce_scope=True, group="oslo_policy")