Merge "Return 409 on creating/importing same name keypair"

This commit is contained in:
Jenkins
2013-04-01 21:55:14 +00:00
committed by Gerrit Code Review
7 changed files with 175 additions and 17 deletions
-8
View File
@@ -2718,13 +2718,6 @@ class KeypairAPI(base.Base):
msg = _('Keypair name must be between 1 and 255 characters long')
raise exception.InvalidKeypair(explanation=msg)
# NOTE: check for existing keypairs of same name
try:
self.db.key_pair_get(context, user_id, key_name)
raise exception.KeyPairExists(key_name=key_name)
except exception.NotFound:
pass
def import_key_pair(self, context, user_id, key_name, public_key):
"""Import a key pair using an existing public key."""
self._validate_keypair_name(context, user_id, key_name)
@@ -2767,7 +2760,6 @@ class KeypairAPI(base.Base):
'public_key': public_key,
'private_key': private_key}
self.db.key_pair_create(context, keypair)
return keypair
def delete_key_pair(self, context, user_id, key_name):
+8 -5
View File
@@ -1953,10 +1953,13 @@ def instance_info_cache_delete(context, instance_uuid):
@require_context
def key_pair_create(context, values):
key_pair_ref = models.KeyPair()
key_pair_ref.update(values)
key_pair_ref.save()
return key_pair_ref
try:
key_pair_ref = models.KeyPair()
key_pair_ref.update(values)
key_pair_ref.save()
return key_pair_ref
except db_exc.DBDuplicateEntry:
raise exception.KeyPairExists(key_name=values['name'])
@require_context
@@ -1965,7 +1968,7 @@ def key_pair_destroy(context, user_id, name):
result = model_query(context, models.KeyPair).\
filter_by(user_id=user_id).\
filter_by(name=name).\
delete()
soft_delete()
if not result:
raise exception.KeypairNotFound(user_id=user_id, name=name)
@@ -0,0 +1,60 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate.changeset import UniqueConstraint
from sqlalchemy import Index, MetaData, Table
from nova.db.sqlalchemy import utils
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
OLD_IDX_NAME = 'key_pair_user_id_name_idx'
UC_NAME = 'key_pairs_uniq_name_and_user_id'
TABLE_NAME = 'key_pairs'
UC_COLUMNS = ['user_id', 'name', 'deleted']
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
key_pairs = Table(TABLE_NAME, meta, autoload=True)
utils.drop_old_duplicate_entries_from_table(migrate_engine,
TABLE_NAME, True,
*UC_COLUMNS)
old_idx = None
#Drop old index because the new UniqueConstraint can be used instead.
for index in key_pairs.indexes:
if index.name == OLD_IDX_NAME:
index.drop()
old_idx = index
#index.drop() in SQLAlchemy-migrate will issue a DROP INDEX statement to
#the DB but WILL NOT update the table metadata to remove the `Index`
#object. This can cause subsequent calls like drop or create constraint
#on that table to fail.The solution is to update the table metadata to
#reflect the now dropped column.
if old_idx:
key_pairs.indexes.remove(old_idx)
uc = UniqueConstraint(*(UC_COLUMNS), table=key_pairs, name=UC_NAME)
uc.create()
def downgrade(migrate_engine):
utils.drop_unique_constraint(migrate_engine, TABLE_NAME, UC_NAME,
*(UC_COLUMNS))
meta = MetaData(bind=migrate_engine)
key_pairs = Table(TABLE_NAME, meta, autoload=True)
Index(OLD_IDX_NAME, key_pairs.c.user_id,
key_pairs.c.name).create(migrate_engine)
@@ -40,7 +40,7 @@ def db_key_pair_get_all_by_user(self, user_id):
def db_key_pair_create(self, keypair):
pass
return keypair
def db_key_pair_destroy(context, user_id, name):
@@ -48,8 +48,8 @@ def db_key_pair_destroy(context, user_id, name):
raise Exception()
def db_key_pair_get(context, user_id, name):
pass
def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))
class KeypairsTest(test.TestCase):
@@ -205,7 +205,7 @@ class KeypairsTest(test.TestCase):
self.assertEqual(res.status_int, 413)
def test_keypair_create_duplicate(self):
self.stubs.Set(db, "key_pair_get", db_key_pair_get)
self.stubs.Set(db, "key_pair_create", db_key_pair_create_duplicate)
body = {'keypair': {'name': 'create_duplicate'}}
req = webob.Request.blank('/v2/fake/os-keypairs')
req.method = 'POST'
+3
View File
@@ -7315,6 +7315,9 @@ class KeypairAPITestCase(BaseTestCase):
self.ctxt, self.ctxt.user_id, '* BAD CHARACTERS! *')
def test_create_keypair_already_exists(self):
def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))
self.stubs.Set(db, "key_pair_create", db_key_pair_create_duplicate)
self.assertRaises(exception.KeyPairExists,
self.keypair_api.create_key_pair,
self.ctxt, self.ctxt.user_id,
+61
View File
@@ -1066,6 +1066,67 @@ class DbApiTestCase(DbTestCase):
_compare(bw_usages[2], expected_bw_usages[2])
timeutils.clear_time_override()
def test_key_pair_create(self):
ctxt = context.get_admin_context()
values = {'name': 'test_keypair', 'public_key': 'test-public-key',
'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
keypair = db.key_pair_create(ctxt, values)
self.assertNotEqual(None, keypair)
for name, value in values.iteritems():
self.assertEqual(keypair.get(name), value)
def test_key_pair_create_with_duplicate_name(self):
ctxt = context.get_admin_context()
values = {'name': 'test_keypair', 'public_key': 'test-public-key',
'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
keypair = db.key_pair_create(ctxt, values)
self.assertRaises(exception.KeyPairExists,
db.key_pair_create, ctxt, values)
def test_admin_get_deleted_keypair(self):
# Test deleted keypair can be read by admin user.
ctxt = context.get_admin_context()
values = {'name': 'test_keypair', 'public_key': 'test-public-key',
'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
keypair = db.key_pair_create(ctxt, values)
db.key_pair_destroy(ctxt, keypair['user_id'], keypair['name'])
# Raise exception when read_deleted is 'no'.
self.assertRaises(exception.KeypairNotFound, db.key_pair_get, ctxt,
keypair['user_id'], keypair['name'])
ctxt = ctxt.elevated(read_deleted='yes')
db_keypair = db.key_pair_get(ctxt, keypair['user_id'],
keypair['name'])
self.assertEqual(db_keypair['name'], keypair['name'])
self.assertEqual(db_keypair['deleted'], keypair['id'])
def test_admin_get_all_keypairs_including_deleted(self):
# Test all deleted/non-deleted keypairs can be read by admin user.
ctxt = context.get_admin_context()
keypair1_values = {'name': 'test_keypair1',
'public_key': 'test-public-key1',
'user_id': 'test_user_id',
'fingerprint': 'test_fingerprint1'}
keypair2_values = {'name': 'test_keypair2',
'public_key': 'test-public-key2',
'user_id': 'test_user_id',
'fingerprint': 'test_fingerprint2'}
keypair1 = db.key_pair_create(ctxt, keypair1_values)
keypair2 = db.key_pair_create(ctxt, keypair2_values)
db.key_pair_destroy(ctxt, keypair1['user_id'], keypair1['name'])
db.key_pair_destroy(ctxt, keypair2['user_id'], keypair2['name'])
# Returns non-deleted keypairs.
result = db.key_pair_get_all_by_user(ctxt, keypair1['user_id'])
self.assertEqual(result, [])
ctxt = ctxt.elevated(read_deleted='yes')
# Returns deleted and non-deleted keypairs.
db_keypairs = db.key_pair_get_all_by_user(ctxt, keypair1['user_id'])
expected_deleted_ids = [keypair1['id'], keypair2['id']]
expected_keypair_names = [keypair1['name'], keypair2['name']]
for keypair in db_keypairs:
self.assertTrue(keypair['name'] in expected_keypair_names)
self.assertTrue(keypair['deleted'] in expected_deleted_ids)
def _get_fake_aggr_values():
return {'name': 'fake_aggregate'}
+39
View File
@@ -1151,6 +1151,45 @@ class TestNovaMigrations(BaseMigrationTestCase, CommonTestsMixIn):
fetchall()
self.assertEqual(1, len(uc_name2_rows))
# migration 173, add unique constraint to keypairs
def _pre_upgrade_173(self, engine):
created_at = [datetime.datetime.now() for x in range(0, 7)]
fake_keypairs = [dict(name='key1', user_id='1a',
created_at=created_at[0],
deleted=0),
dict(name='key1', user_id='1a',
created_at=created_at[1],
deleted=0),
dict(name='key1', user_id='1a',
created_at=created_at[2],
deleted=0)
]
keypairs = get_table(engine, 'key_pairs')
engine.execute(keypairs.insert(), fake_keypairs)
return fake_keypairs
def _check_173(self, engine, data):
keypairs = get_table(engine, 'key_pairs')
# Unique constraints are not listed in table.constraints for any db.
# So, simply add a duplicate keypair to check if unique constraint
# is applied to the key_pairs table or not.
insert = keypairs.insert()
duplicate_keypair = dict(name='key4', user_id='4a',
created_at=datetime.datetime.now(),
deleted=0)
insert.execute(duplicate_keypair)
# Insert again
self.assertRaises(sqlalchemy.exc.IntegrityError, insert.execute,
duplicate_keypair)
# Get all unique records
rows = keypairs.select().\
where(keypairs.c.deleted != keypairs.c.id).\
execute().\
fetchall()
# Ensure the number of unique keypairs is correct
self.assertEqual(len(rows), 2)
class TestBaremetalMigrations(BaseMigrationTestCase, CommonTestsMixIn):
"""Test sqlalchemy-migrate migrations."""