Merge "Add new default roles in servers policies"

This commit is contained in:
Zuul
2020-04-16 17:10:46 +00:00
committed by Gerrit Code Review
3 changed files with 330 additions and 195 deletions
+30 -24
View File
@@ -26,7 +26,7 @@ CROSS_CELL_RESIZE = 'compute:servers:resize:cross_cell'
rules = [
policy.DocumentedRuleDefault(
name=SERVERS % 'index',
check_str=RULE_AOO,
check_str=base.PROJECT_READER_OR_SYSTEM_READER,
description="List all servers",
operations=[
{
@@ -37,7 +37,7 @@ rules = [
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'detail',
check_str=RULE_AOO,
check_str=base.PROJECT_READER_OR_SYSTEM_READER,
description="List all servers with detailed information",
operations=[
{
@@ -48,7 +48,7 @@ rules = [
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'index:get_all_tenants',
check_str=base.RULE_ADMIN_API,
check_str=base.SYSTEM_READER,
description="List all servers for all projects",
operations=[
{
@@ -59,7 +59,7 @@ rules = [
scope_types=['system']),
policy.DocumentedRuleDefault(
name=SERVERS % 'detail:get_all_tenants',
check_str=base.RULE_ADMIN_API,
check_str=base.SYSTEM_READER,
description="List all servers with detailed information for "
" all projects",
operations=[
@@ -71,7 +71,7 @@ rules = [
scope_types=['system']),
policy.DocumentedRuleDefault(
name=SERVERS % 'allow_all_filters',
check_str=base.RULE_ADMIN_API,
check_str=base.SYSTEM_READER,
description="Allow all filters when listing servers",
operations=[
{
@@ -86,7 +86,7 @@ rules = [
scope_types=['system']),
policy.DocumentedRuleDefault(
name=SERVERS % 'show',
check_str=RULE_AOO,
check_str=base.PROJECT_READER_OR_SYSTEM_READER,
description="Show a server",
operations=[
{
@@ -156,7 +156,7 @@ allow everyone.
]),
policy.DocumentedRuleDefault(
name=SERVERS % 'create',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER,
description="Create a server",
operations=[
{
@@ -167,6 +167,12 @@ allow everyone.
scope_types=['project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'create:forced_host',
# TODO(gmann): Do we need PROJECT_ADMIN for this?
# PROJECT_ADMIN is not used in policies yet and this
# can be first one. This policy is checked after 'create' policy
# which is PROJECT_MEMBER so making this as SYSTEM_ADMIN
# does not make sense as system scoped role cannot
# pass the 'create' policy. opinion ?
check_str=base.RULE_ADMIN_API,
description="""
Create a server on the specified host and/or node.
@@ -181,7 +187,7 @@ host and/or node by bypassing the scheduler filters unlike the
'path': '/servers'
}
],
scope_types=['project']),
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
REQUESTED_DESTINATION,
base.RULE_ADMIN_API,
@@ -201,7 +207,7 @@ validated by the scheduler filters unlike the
]),
policy.DocumentedRuleDefault(
name=SERVERS % 'create:attach_volume',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER,
description="Create a server with the requested volume attached to it",
operations=[
{
@@ -212,7 +218,7 @@ validated by the scheduler filters unlike the
scope_types=['project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'create:attach_network',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER,
description="Create a server with the requested network attached "
" to it",
operations=[
@@ -224,7 +230,7 @@ validated by the scheduler filters unlike the
scope_types=['project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'create:trusted_certs',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER,
description="Create a server with trusted image certificate IDs",
operations=[
{
@@ -275,7 +281,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
]),
policy.DocumentedRuleDefault(
name=SERVERS % 'delete',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Delete a server",
operations=[
{
@@ -286,7 +292,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'update',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Update a server",
operations=[
{
@@ -297,7 +303,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'confirm_resize',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Confirm a server resize",
operations=[
{
@@ -308,7 +314,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'revert_resize',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Revert a server resize",
operations=[
{
@@ -319,7 +325,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'reboot',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Reboot a server",
operations=[
{
@@ -330,7 +336,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'resize',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Resize a server",
operations=[
{
@@ -354,7 +360,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
]),
policy.DocumentedRuleDefault(
name=SERVERS % 'rebuild',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Rebuild a server",
operations=[
{
@@ -365,7 +371,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'rebuild:trusted_certs',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Rebuild a server with trusted image certificate IDs",
operations=[
{
@@ -376,7 +382,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'create_image',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Create an image from a server",
operations=[
{
@@ -387,7 +393,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'create_image:allow_volume_backed',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Create an image from a volume backed server",
operations=[
{
@@ -398,7 +404,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'start',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Start a server",
operations=[
{
@@ -409,7 +415,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'stop',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Stop a server",
operations=[
{
@@ -420,7 +426,7 @@ https://bugs.launchpad.net/nova/+bug/1739646 for details.
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=SERVERS % 'trigger_crash_dump',
check_str=RULE_AOO,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
description="Trigger crash dump in a server",
operations=[
{
+22
View File
@@ -17,6 +17,28 @@ policy_data = """
{
"context_is_admin": "role:admin or role:administrator",
"os_compute_api:servers:create": "",
"os_compute_api:servers:create:attach_volume": "",
"os_compute_api:servers:create:attach_network": "",
"os_compute_api:servers:create:trusted_certs": "",
"os_compute_api:servers:create_image": "",
"os_compute_api:servers:create_image:allow_volume_backed": "",
"os_compute_api:servers:update": "",
"os_compute_api:servers:index": "",
"os_compute_api:servers:index:get_all_tenants": "",
"os_compute_api:servers:delete": "",
"os_compute_api:servers:detail": "",
"os_compute_api:servers:detail:get_all_tenants": "",
"os_compute_api:servers:show": "",
"os_compute_api:servers:rebuild": "",
"os_compute_api:servers:rebuild:trusted_certs": "",
"os_compute_api:servers:reboot": "",
"os_compute_api:servers:resize": "",
"os_compute_api:servers:revert_resize": "",
"os_compute_api:servers:confirm_resize": "",
"os_compute_api:servers:start": "",
"os_compute_api:servers:stop": "",
"os_compute_api:servers:trigger_crash_dump": "",
"os_compute_api:servers:show:host_status": "",
"os_compute_api:servers:show": "",
"os_compute_api:servers:show:host_status:unknown-only": "",
+278 -171
View File
@@ -37,6 +37,10 @@ class ServersPolicyTest(base.BasePolicyTest):
def setUp(self):
super(ServersPolicyTest, self).setUp()
self.controller = servers.ServersController()
self.rule_trusted_certs = policies.SERVERS % 'create:trusted_certs'
self.rule_attach_network = policies.SERVERS % 'create:attach_network'
self.rule_attach_volume = policies.SERVERS % 'create:attach_volume'
self.req = fakes.HTTPRequest.blank('')
user_id = self.req.environ['nova.context'].user_id
@@ -75,7 +79,6 @@ class ServersPolicyTest(base.BasePolicyTest):
self, security_groups=[{'name': 'default'}])
self.mock_get_all = self.useFixture(fixtures.MockPatchObject(
self.controller.compute_api, 'get_all')).mock
self.body = {
'server': {
'name': 'server_test',
@@ -84,18 +87,33 @@ class ServersPolicyTest(base.BasePolicyTest):
},
}
# Check that admin or and owner is able to get and delete
# the server.
# Check that admin or and owner is able to update, delete
# or perform server action.
self.admin_or_owner_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to get and delete
# the server.
# Check that non-admin/owner is not able to update, delete
# or perform server action.
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context
self.other_project_member_context,
self.other_project_reader_context
]
# Check that system reader or owner is able to get
# the server.
self.system_reader_or_owner_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.system_member_context,
self.system_reader_context, self.project_foo_context
]
self.system_reader_or_owner_unauthorized_contexts = [
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context
]
# Check that everyone is able to list their own server.
@@ -105,30 +123,44 @@ class ServersPolicyTest(base.BasePolicyTest):
self.project_reader_context, self.project_foo_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context]
self.other_project_member_context,
self.other_project_reader_context]
self.everyone_unauthorized_contexts = [
]
# Check that admin is able to list the server
# for all projects.
# Check that admin is able to create server with host request.
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to list the server
# for all projects.
# Check that non-admin is not able to create server with host request.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context
self.other_project_member_context,
self.other_project_reader_context
]
# Check that sustem reader is able to list the server
# for all projects.
self.system_reader_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system reader is not able to list the server
# for all projects.
self.system_reader_unauthorized_contexts = [
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context
]
# Check that project member is able to create serve
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.system_member_context,
self.other_project_member_context,
self.system_member_context, self.system_reader_context,
self.other_project_member_context, self.system_foo_context,
self.project_reader_context, self.project_foo_context,
self.system_reader_context, self.system_foo_context]
self.other_project_reader_context]
# Check that non-project member is not able to create server
self.project_member_unauthorized_contexts = [
]
@@ -148,13 +180,19 @@ class ServersPolicyTest(base.BasePolicyTest):
self.mock_get_all.side_effect = fake_get_all
rule_name = policies.SERVERS % 'index'
self.common_policy_check(self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.index,
self.req)
self.common_policy_check(
self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.index,
self.req)
def test_index_all_project_server_policy(self):
# 'index' policy is checked before 'index:get_all_tenants' so
# we have to allow it for everyone otherwise it will
# fail for unauthorized contexts here.
rule = policies.SERVERS % 'index'
self.policy.set_rules({rule: "@"}, overwrite=False)
rule_name = policies.SERVERS % 'index:get_all_tenants'
req = fakes.HTTPRequest.blank('/servers?all_tenants')
@@ -168,8 +206,8 @@ class ServersPolicyTest(base.BasePolicyTest):
self.mock_get_all.side_effect = fake_get_all
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
self.common_policy_check(self.system_reader_authorized_contexts,
self.system_reader_unauthorized_contexts,
rule_name,
self.controller.index,
req)
@@ -190,13 +228,19 @@ class ServersPolicyTest(base.BasePolicyTest):
self.mock_get_all.side_effect = fake_get_all
rule_name = policies.SERVERS % 'detail'
self.common_policy_check(self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.detail,
self.req)
self.common_policy_check(
self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.detail,
self.req)
def test_detail_list_all_project_server_policy(self):
# 'detail' policy is checked before 'detail:get_all_tenants' so
# we have to allow it for everyone otherwise it will
# fail for unauthorized contexts here.
rule = policies.SERVERS % 'detail'
self.policy.set_rules({rule: "@"}, overwrite=False)
rule_name = policies.SERVERS % 'detail:get_all_tenants'
req = fakes.HTTPRequest.blank('/servers?all_tenants')
@@ -210,22 +254,27 @@ class ServersPolicyTest(base.BasePolicyTest):
self.mock_get_all.side_effect = fake_get_all
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
self.common_policy_check(self.system_reader_authorized_contexts,
self.system_reader_unauthorized_contexts,
rule_name,
self.controller.detail,
req)
def test_index_server_allow_all_filters_policy(self):
# 'index' policy is checked before 'allow_all_filters' so
# we have to allow it for everyone otherwise it will
# fail for unauthorized contexts here.
rule = policies.SERVERS % 'index'
self.policy.set_rules({rule: "@"}, overwrite=False)
def fake_get_all(context, search_opts=None,
limit=None, marker=None,
expected_attrs=None, sort_keys=None, sort_dirs=None,
cell_down_support=False, all_tenants=False):
self.assertIsNotNone(search_opts)
if context in self.admin_unauthorized_contexts:
if context in self.system_reader_unauthorized_contexts:
self.assertNotIn('host', search_opts)
if context in self.admin_authorized_contexts:
if context in self.system_reader_authorized_contexts:
self.assertIn('host', search_opts)
return objects.InstanceList(objects=self.servers)
@@ -234,19 +283,49 @@ class ServersPolicyTest(base.BasePolicyTest):
req = fakes.HTTPRequest.blank('/servers?host=1')
rule_name = policies.SERVERS % 'allow_all_filters'
self.common_policy_check(
self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
self.system_reader_authorized_contexts,
self.system_reader_unauthorized_contexts,
rule_name,
self.controller.index,
req, fatal=False)
def test_show_server_policy(self):
def test_detail_server_allow_all_filters_policy(self):
# 'detail' policy is checked before 'allow_all_filters' so
# we have to allow it for everyone otherwise it will
# fail for unauthorized contexts here.
rule = policies.SERVERS % 'detail'
self.policy.set_rules({rule: "@"}, overwrite=False)
def fake_get_all(context, search_opts=None,
limit=None, marker=None,
expected_attrs=None, sort_keys=None, sort_dirs=None,
cell_down_support=False, all_tenants=False):
self.assertIsNotNone(search_opts)
if context in self.system_reader_unauthorized_contexts:
self.assertNotIn('host', search_opts)
if context in self.system_reader_authorized_contexts:
self.assertIn('host', search_opts)
return objects.InstanceList(objects=self.servers)
self.mock_get_all.side_effect = fake_get_all
req = fakes.HTTPRequest.blank('/servers?host=1')
rule_name = policies.SERVERS % 'allow_all_filters'
self.common_policy_check(
self.system_reader_authorized_contexts,
self.system_reader_unauthorized_contexts,
rule_name,
self.controller.detail,
req, fatal=False)
@mock.patch('nova.objects.BlockDeviceMappingList.bdms_by_instance_uuid')
def test_show_server_policy(self, mock_bdm):
rule_name = policies.SERVERS % 'show'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller.show,
self.req, self.instance.uuid)
self.common_policy_check(
self.system_reader_or_owner_authorized_contexts,
self.system_reader_or_owner_unauthorized_contexts,
rule_name,
self.controller.show,
self.req, self.instance.uuid)
@mock.patch('nova.compute.api.API.create')
def test_create_server_policy(self, mock_create):
@@ -283,7 +362,6 @@ class ServersPolicyTest(base.BasePolicyTest):
rule = policies.SERVERS % 'create'
self.policy.set_rules({rule: "@"}, overwrite=False)
mock_create.return_value = ([self.instance], '')
rule_name = policies.SERVERS % 'create:attach_volume'
body = {
'server': {
'name': 'server_test',
@@ -294,7 +372,7 @@ class ServersPolicyTest(base.BasePolicyTest):
}
self.common_policy_check(self.project_member_authorized_contexts,
self.project_member_unauthorized_contexts,
rule_name,
self.rule_attach_volume,
self.controller.create,
self.req, body=body)
@@ -306,7 +384,6 @@ class ServersPolicyTest(base.BasePolicyTest):
rule = policies.SERVERS % 'create'
self.policy.set_rules({rule: "@"}, overwrite=False)
mock_create.return_value = ([self.instance], '')
rule_name = policies.SERVERS % 'create:attach_network'
body = {
'server': {
'name': 'server_test',
@@ -319,7 +396,7 @@ class ServersPolicyTest(base.BasePolicyTest):
}
self.common_policy_check(self.project_member_authorized_contexts,
self.project_member_unauthorized_contexts,
rule_name,
self.rule_attach_network,
self.controller.create,
self.req, body=body)
@@ -332,7 +409,6 @@ class ServersPolicyTest(base.BasePolicyTest):
self.policy.set_rules({rule: "@"}, overwrite=False)
req = fakes.HTTPRequest.blank('', version='2.63')
mock_create.return_value = ([self.instance], '')
rule_name = policies.SERVERS % 'create:trusted_certs'
body = {
'server': {
'name': 'server_test',
@@ -347,7 +423,7 @@ class ServersPolicyTest(base.BasePolicyTest):
}
self.common_policy_check(self.project_member_authorized_contexts,
self.project_member_unauthorized_contexts,
rule_name,
self.rule_trusted_certs,
self.controller.create,
req, body=body)
@@ -365,7 +441,8 @@ class ServersPolicyTest(base.BasePolicyTest):
req = fakes.HTTPRequest.blank('')
req.environ['nova.context'].user_id = 'other-user'
rule_name = policies.SERVERS % 'delete'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller.delete,
req, self.instance.uuid)
@@ -377,7 +454,8 @@ class ServersPolicyTest(base.BasePolicyTest):
def test_delete_server_overridden_policy_pass_with_same_user(
self, mock_delete):
rule_name = policies.SERVERS % 'delete'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
self.controller.delete(self.req,
self.instance.uuid)
@@ -398,7 +476,8 @@ class ServersPolicyTest(base.BasePolicyTest):
req.environ['nova.context'].user_id = 'other-user'
rule_name = policies.SERVERS % 'update'
body = {'server': {'name': 'test'}}
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller.update,
req, self.instance.uuid, body=body)
@@ -410,7 +489,8 @@ class ServersPolicyTest(base.BasePolicyTest):
def test_update_server_overridden_policy_pass_with_same_user(
self, mock_update):
rule_name = policies.SERVERS % 'update'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
body = {'server': {'name': 'test'}}
self.controller.update(self.req,
self.instance.uuid, body=body)
@@ -467,7 +547,8 @@ class ServersPolicyTest(base.BasePolicyTest):
req.environ['nova.context'].user_id = 'other-user'
rule_name = policies.SERVERS % 'resize'
body = {'resize': {'flavorRef': 'f1'}}
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller._action_resize,
req, self.instance.uuid, body=body)
@@ -482,7 +563,8 @@ class ServersPolicyTest(base.BasePolicyTest):
self, mock_resize, mock_port):
rule_name = policies.SERVERS % 'resize'
mock_port.return_value = False
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
body = {'resize': {'flavorRef': 'f1'}}
self.controller._action_resize(self.req,
self.instance.uuid, body=body)
@@ -515,7 +597,8 @@ class ServersPolicyTest(base.BasePolicyTest):
req.environ['nova.context'].user_id = 'other-user'
rule_name = policies.SERVERS % 'stop'
body = {'os-stop': 'null'}
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller._stop_server,
req, self.instance.uuid, body=body)
@@ -527,7 +610,8 @@ class ServersPolicyTest(base.BasePolicyTest):
def test_stop_server_overridden_policy_pass_with_same_user(
self, mock_stop):
rule_name = policies.SERVERS % 'stop'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
body = {'os-stop': 'null'}
self.controller._stop_server(self.req,
self.instance.uuid, body=body)
@@ -548,7 +632,8 @@ class ServersPolicyTest(base.BasePolicyTest):
req.environ['nova.context'].user_id = 'other-user'
rule_name = policies.SERVERS % 'rebuild'
body = {'rebuild': {"imageRef": uuids.fake_id}}
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller._action_rebuild,
req, self.instance.uuid, body=body)
@@ -560,7 +645,8 @@ class ServersPolicyTest(base.BasePolicyTest):
def test_rebuild_server_overridden_policy_pass_with_same_user(
self, mock_rebuild):
rule_name = policies.SERVERS % 'rebuild'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
body = {'rebuild': {"imageRef": uuids.fake_id}}
self.controller._action_rebuild(self.req,
self.instance.uuid, body=body)
@@ -600,7 +686,8 @@ class ServersPolicyTest(base.BasePolicyTest):
}
self.policy.set_rules(
{rule: "@",
rule_name: "user_id:%(user_id)s"})
rule_name: "user_id:%(user_id)s"},
overwrite=False)
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller._action_rebuild,
req, self.instance.uuid, body=body)
@@ -622,7 +709,7 @@ class ServersPolicyTest(base.BasePolicyTest):
}
self.policy.set_rules(
{rule: "@",
rule_name: "user_id:%(user_id)s"})
rule_name: "user_id:%(user_id)s"}, overwrite=False)
self.controller._action_rebuild(req,
self.instance.uuid, body=body)
@@ -676,7 +763,8 @@ class ServersPolicyTest(base.BasePolicyTest):
req.environ['nova.context'].user_id = 'other-user'
rule_name = policies.SERVERS % 'trigger_crash_dump'
body = {'trigger_crash_dump': None}
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller._action_trigger_crash_dump,
@@ -690,7 +778,8 @@ class ServersPolicyTest(base.BasePolicyTest):
self, mock_crash):
req = fakes.HTTPRequest.blank('', version='2.17')
rule_name = policies.SERVERS % 'trigger_crash_dump'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
self.policy.set_rules({rule_name: "user_id:%(user_id)s"},
overwrite=False)
body = {'trigger_crash_dump': None}
self.controller._action_trigger_crash_dump(req,
self.instance.uuid, body=body)
@@ -710,142 +799,160 @@ class ServersScopeTypePolicyTest(ServersPolicyTest):
super(ServersScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system admin is able to list the servers
# for all projects.
# These policy are project scoped only and 'create' policy is checked
# first so even we allow it for everyone the system scoped context
# cannot validate these as they fail on 'create' policy due to
# scope_type. So we need to set rule name as None to skip the policy
# error message assertion in base class. These rule name are only used
# for error message assertion.
self.rule_trusted_certs = None
self.rule_attach_network = None
self.rule_attach_volume = None
# Check that system admin is able to create server with host request.
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that non-system/admin is not able to list the servers
# for all projects.
self.system_admin_context
]
# Check that non-system/admin is not able to create server with
# host request.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_admin_context, self.legacy_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context
self.other_project_member_context,
self.other_project_reader_context
]
# Check that system reader is able to list the server
# for all projects.
self.system_reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system reader is not able to list the server
# for all projects.
self.system_reader_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context
]
# Check if project member can create the server.
self.project_member_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.other_project_member_context, self.project_reader_context,
self.project_foo_context
self.other_project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_reader_context
]
# Check if non-project member cannot create the server.
self.project_member_unauthorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
]
# Check if project admin can create the server with host.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context
]
# Check if non-project admin cannot create the server with host.
self.project_admin_unauthorized_contexts = [
self.project_member_context, self.other_project_member_context,
self.project_reader_context, self.project_foo_context,
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.system_reader_context, self.system_foo_context
]
@mock.patch('nova.compute.api.API.create')
@mock.patch('nova.compute.api.API.parse_availability_zone')
def test_create_forced_host_server_policy(self, mock_az, mock_create):
# These policy are project scoped only and 'create' policy is checked
# before 'create:forced_host' so we need to allow 'create' policy
# for everyone. Also skip the error message assertion because for
# TODO(gmann): Uncomment this once we figure out the right defaults
# for 'create:forced_host'
# The 'create' policy is checked before 'create:forced_host' so
# we need to allow 'create' policy for everyone. Also skip the
# error message assertion because for
# system scoped unauth context 'create' policy fail and for project
# scoped unauth context 'create:forced_host' fail.
rule = policies.SERVERS % 'create'
self.policy.set_rules({rule: "@"}, overwrite=False)
mock_create.return_value = ([self.instance], '')
mock_az.return_value = ('test', 'host', None)
# rule = policies.SERVERS % 'create'
# self.policy.set_rules({rule: "@"})
# mock_create.return_value = ([self.instance], '')
# mock_az.return_value = ('test', 'host', None)
# rule_name = policies.SERVERS % 'create:forced_host'
self.common_policy_check(self.project_admin_authorized_contexts,
self.project_admin_unauthorized_contexts,
None,
self.controller.create,
self.req, body=self.body)
# self.common_policy_check(self.admin_authorized_contexts,
# self.admin_unauthorized_contexts,
# None,
# self.controller.create,
# self.req, body=self.body)
pass
@mock.patch('nova.compute.api.API.create')
def test_create_attach_volume_server_policy(self, mock_create):
# These policy are project scoped only and 'create' policy is checked
# before 'create:attach_volume' so even we allow it for everyone
# system scoped context cannot validate theese as they fail
# on 'create' policy itself.
# So sending the 'create' rule for policy error assertion.
rule = policies.SERVERS % 'create'
self.policy.set_rules({rule: "@"}, overwrite=False)
mock_create.return_value = ([self.instance], '')
# rule_name = policies.SERVERS % 'create:attach_volume'
body = {
'server': {
'name': 'server_test',
'imageRef': uuids.fake_id,
'flavorRef': uuids.fake_id,
'block_device_mapping': [{'device_name': 'foo'}],
},
}
self.common_policy_check(self.project_member_authorized_contexts,
self.project_member_unauthorized_contexts,
rule,
self.controller.create,
self.req, body=body)
@mock.patch('nova.compute.api.API.create')
def test_create_attach_network_server_policy(self, mock_create):
# These policy are project scoped only and 'create' policy is checked
# before 'create:attach_network' so even we allow it for everyone
# system scoped context cannot validate theese as they fail
# on 'create' policy itself.
# So sending the 'create' rule for policy error assertion.
rule = policies.SERVERS % 'create'
self.policy.set_rules({rule: "@"}, overwrite=False)
mock_create.return_value = ([self.instance], '')
# rule_name = policies.SERVERS % 'create:attach_network'
body = {
'server': {
'name': 'server_test',
'imageRef': uuids.fake_id,
'flavorRef': uuids.fake_id,
'networks': [{
'uuid': uuids.fake_id
}],
},
}
self.common_policy_check(self.project_member_authorized_contexts,
self.project_member_unauthorized_contexts,
rule,
self.controller.create,
self.req, body=body)
class ServersNoLegacyPolicyTest(ServersScopeTypePolicyTest):
"""Test Servers APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system_admin_or_owner APIs.
"""
without_deprecated_rules = True
@mock.patch('nova.compute.api.API.create')
def test_create_trusted_certs_server_policy(self, mock_create):
# These policy are project scoped only and 'create' policy is checked
# before 'create:trusted_certs' so even we allow it for everyone
# system scoped context cannot validate theese as they fail
# on 'create' policy itself.
# So sending the 'create' rule for policy error assertion.
rule = policies.SERVERS % 'create'
self.policy.set_rules({rule: "@"}, overwrite=False)
req = fakes.HTTPRequest.blank('', version='2.63')
mock_create.return_value = ([self.instance], '')
# rule_name = policies.SERVERS % 'create:trusted_certs'
body = {
'server': {
'name': 'server_test',
'imageRef': uuids.fake_id,
'flavorRef': uuids.fake_id,
'trusted_image_certificates': [uuids.fake_id],
'networks': [{
'uuid': uuids.fake_id
}],
def setUp(self):
super(ServersNoLegacyPolicyTest, self).setUp()
},
}
self.common_policy_check(self.project_member_authorized_contexts,
self.project_member_unauthorized_contexts,
rule,
self.controller.create,
req, body=body)
# Check that system admin or owner is able to update, delete
# or perform server action.
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
self.project_admin_context, self.project_member_context,
]
# Check that non-system and non-admin/owner is not able to update,
# delete or perform server action.
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.project_reader_context,
self.project_foo_context,
self.system_foo_context, self.other_project_member_context,
self.other_project_reader_context]
# Check that system reader or projct owner is able to get
# server.
self.system_reader_or_owner_authorized_contexts = [
self.system_admin_context,
self.project_admin_context, self.system_member_context,
self.system_reader_context, self.project_reader_context,
self.project_member_context,
]
# Check that non-system reader nd non-admin/owner is not able to get
# server.
self.system_reader_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.project_foo_context,
self.system_foo_context, self.other_project_member_context,
self.other_project_reader_context
]
self.everyone_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.system_member_context, self.system_reader_context,
self.other_project_member_context
]
self.everyone_unauthorized_contexts = [
self.project_foo_context,
self.system_foo_context
]
# Check that sustem reader is able to list the server
# for all projects.
# self.system_reader_authorized_contexts = [
# self.system_admin_context,
# self.project_admin_context, self.system_member_context,
# self.system_reader_context]
# Check that non-system reader is not able to list the server
# for all projects.
# self.system_reader_unauthorized_contexts = [
# self.legacy_admin_context, self.system_foo_context,
# self.project_member_context,
# self.project_reader_context, self.project_foo_context,
# self.other_project_member_context,
# self.other_project_reader_context
# ]
# Check if project member can create the server.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context,
self.other_project_member_context
]
# Check if non-project member cannot create the server.
self.project_member_unauthorized_contexts = [
self.system_admin_context,
self.system_member_context, self.project_reader_context,
self.project_foo_context, self.other_project_reader_context,
self.system_reader_context, self.system_foo_context
]