Add CLI for V2 image create, update, and upload
Provides command line support for image-create, image-update, and image-upload using the Glance V2 API. This includes building help text for create and update based on the image jsonschema as fetched from the server. Also fixes bug caused by default warlock patch generation not matching what Glance expects when updating a core property which had not originally been set when the image was created. Related to bp glance-client-v2 Change-Id: I841f9e3d05802f4b794cb6f4849abe03ff0324d9
This commit is contained in:
@@ -18,6 +18,11 @@ import os
|
||||
import sys
|
||||
import uuid
|
||||
|
||||
if os.name == 'nt':
|
||||
import msvcrt
|
||||
else:
|
||||
msvcrt = None
|
||||
|
||||
import prettytable
|
||||
|
||||
from glanceclient import exc
|
||||
@@ -35,6 +40,61 @@ def arg(*args, **kwargs):
|
||||
return _decorator
|
||||
|
||||
|
||||
def schema_args(schema_getter, omit=[]):
|
||||
typemap = {
|
||||
'string': str,
|
||||
'integer': int,
|
||||
'boolean': string_to_bool,
|
||||
'array': list
|
||||
}
|
||||
|
||||
def _decorator(func):
|
||||
schema = schema_getter()
|
||||
if schema is None:
|
||||
param = '<unavailable>'
|
||||
kwargs = {
|
||||
'help': ("Please run with connection parameters set to "
|
||||
"retrieve the schema for generating help for this "
|
||||
"command")
|
||||
}
|
||||
func.__dict__.setdefault('arguments', []).insert(0, ((param, ),
|
||||
kwargs))
|
||||
else:
|
||||
properties = schema.get('properties', {})
|
||||
for name, property in properties.iteritems():
|
||||
if name in omit:
|
||||
continue
|
||||
param = '--' + name.replace('_', '-')
|
||||
kwargs = {}
|
||||
|
||||
type_str = property.get('type', 'string')
|
||||
if type_str == 'array':
|
||||
items = property.get('items')
|
||||
kwargs['type'] = typemap.get(items.get('type'))
|
||||
kwargs['nargs'] = '+'
|
||||
else:
|
||||
kwargs['type'] = typemap.get(type_str)
|
||||
|
||||
if type_str == 'boolean':
|
||||
kwargs['metavar'] = '[True|False]'
|
||||
else:
|
||||
kwargs['metavar'] = '<%s>' % name.upper()
|
||||
|
||||
description = property.get('description', "")
|
||||
if 'enum' in property:
|
||||
if len(description):
|
||||
description += " "
|
||||
description += ("Valid values: " +
|
||||
', '.join(property.get('enum')))
|
||||
kwargs['help'] = description
|
||||
|
||||
func.__dict__.setdefault('arguments',
|
||||
[]).insert(0, ((param, ), kwargs))
|
||||
return func
|
||||
|
||||
return _decorator
|
||||
|
||||
|
||||
def pretty_choice_list(l):
|
||||
return ', '.join("'%s'" % i for i in l)
|
||||
|
||||
@@ -224,3 +284,29 @@ def get_file_size(file_obj):
|
||||
return
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def get_data_file(args):
|
||||
if args.file:
|
||||
return open(args.file, 'rb')
|
||||
else:
|
||||
# distinguish cases where:
|
||||
# (1) stdin is not valid (as in cron jobs):
|
||||
# glance ... <&-
|
||||
# (2) image data is provided through standard input:
|
||||
# glance ... < /tmp/file or cat /tmp/file | glance ...
|
||||
# (3) no image data provided:
|
||||
# glance ...
|
||||
try:
|
||||
os.fstat(0)
|
||||
except OSError:
|
||||
# (1) stdin is not valid (closed...)
|
||||
return None
|
||||
if not sys.stdin.isatty():
|
||||
# (2) image data is provided through standard input
|
||||
if msvcrt:
|
||||
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
|
||||
return sys.stdin
|
||||
else:
|
||||
# (3) no image data provided
|
||||
return None
|
||||
|
||||
+80
-33
@@ -18,7 +18,10 @@ Command-line interface to the OpenStack Images API.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from os.path import expanduser
|
||||
import re
|
||||
import sys
|
||||
|
||||
@@ -62,6 +65,13 @@ class OpenStackImagesShell(object):
|
||||
default=False, action="store_true",
|
||||
help="Print more verbose output")
|
||||
|
||||
parser.add_argument('--get-schema',
|
||||
default=False, action="store_true",
|
||||
dest='get_schema',
|
||||
help='Force retrieving the schema used to generate'
|
||||
' portions of the help text rather than using'
|
||||
' a cached copy. Ignored with api version 1')
|
||||
|
||||
parser.add_argument('-k', '--insecure',
|
||||
default=False,
|
||||
action='store_true',
|
||||
@@ -89,6 +99,7 @@ class OpenStackImagesShell(object):
|
||||
'verify the remote server\'s certificate. '
|
||||
'Without this option glance looks for the '
|
||||
'default system CA certificates.')
|
||||
|
||||
parser.add_argument('--ca-file',
|
||||
dest='os_cacert',
|
||||
help='DEPRECATED! Use --os-cacert.')
|
||||
@@ -356,37 +367,12 @@ class OpenStackImagesShell(object):
|
||||
else:
|
||||
return None
|
||||
|
||||
def main(self, argv):
|
||||
# Parse args once to find version
|
||||
parser = self.get_base_parser()
|
||||
(options, args) = parser.parse_known_args(argv)
|
||||
|
||||
# build available subcommands based on version
|
||||
api_version = options.os_image_api_version
|
||||
subcommand_parser = self.get_subcommand_parser(api_version)
|
||||
self.parser = subcommand_parser
|
||||
|
||||
# Handle top-level --help/-h before attempting to parse
|
||||
# a command off the command line
|
||||
if options.help or not argv:
|
||||
self.do_help(options)
|
||||
return 0
|
||||
|
||||
# Parse args again and call whatever callback was selected
|
||||
args = subcommand_parser.parse_args(argv)
|
||||
|
||||
# Short-circuit and deal with help command right away.
|
||||
if args.func == self.do_help:
|
||||
self.do_help(args)
|
||||
return 0
|
||||
|
||||
LOG = logging.getLogger('glanceclient')
|
||||
LOG.addHandler(logging.StreamHandler())
|
||||
LOG.setLevel(logging.DEBUG if args.debug else logging.INFO)
|
||||
|
||||
def _get_endpoint_and_token(self, args, force_auth=False):
|
||||
image_url = self._get_image_url(args)
|
||||
auth_reqd = (utils.is_authentication_required(args.func) and
|
||||
not (args.os_auth_token and image_url))
|
||||
auth_token = args.os_auth_token
|
||||
|
||||
auth_reqd = force_auth or (utils.is_authentication_required(args.func)
|
||||
and not (auth_token and image_url))
|
||||
|
||||
if not auth_reqd:
|
||||
endpoint = image_url
|
||||
@@ -426,8 +412,14 @@ class OpenStackImagesShell(object):
|
||||
_ksclient = self._get_ksclient(**kwargs)
|
||||
token = args.os_auth_token or _ksclient.auth_token
|
||||
|
||||
endpoint = args.os_image_url or \
|
||||
self._get_endpoint(_ksclient, **kwargs)
|
||||
endpoint = args.os_image_url or self._get_endpoint(_ksclient,
|
||||
**kwargs)
|
||||
|
||||
return endpoint, token
|
||||
|
||||
def _get_versioned_client(self, api_version, args, force_auth=False):
|
||||
endpoint, token = self._get_endpoint_and_token(args,
|
||||
force_auth=force_auth)
|
||||
|
||||
kwargs = {
|
||||
'token': token,
|
||||
@@ -438,8 +430,63 @@ class OpenStackImagesShell(object):
|
||||
'key_file': args.key_file,
|
||||
'ssl_compression': args.ssl_compression
|
||||
}
|
||||
|
||||
client = glanceclient.Client(api_version, endpoint, **kwargs)
|
||||
return client
|
||||
|
||||
def _cache_schema(self, options, home_dir='~/.glanceclient'):
|
||||
homedir = expanduser(home_dir)
|
||||
if not os.path.exists(homedir):
|
||||
os.makedirs(homedir)
|
||||
|
||||
schema_file_path = homedir + os.sep + "image_schema.json"
|
||||
|
||||
if (not os.path.exists(schema_file_path)) or options.get_schema:
|
||||
try:
|
||||
client = self._get_versioned_client('2', options,
|
||||
force_auth=True)
|
||||
schema = client.schemas.get("image")
|
||||
|
||||
with file(schema_file_path, 'w') as f:
|
||||
f.write(json.dumps(schema.raw()))
|
||||
except Exception as e:
|
||||
#NOTE(esheffield) do nothing here, we'll get a message later
|
||||
#if the schema is missing
|
||||
pass
|
||||
|
||||
def main(self, argv):
|
||||
# Parse args once to find version
|
||||
parser = self.get_base_parser()
|
||||
(options, args) = parser.parse_known_args(argv)
|
||||
|
||||
# build available subcommands based on version
|
||||
api_version = options.os_image_api_version
|
||||
|
||||
if api_version == '2':
|
||||
self._cache_schema(options)
|
||||
|
||||
subcommand_parser = self.get_subcommand_parser(api_version)
|
||||
self.parser = subcommand_parser
|
||||
|
||||
# Handle top-level --help/-h before attempting to parse
|
||||
# a command off the command line
|
||||
if options.help or not argv:
|
||||
self.do_help(options)
|
||||
return 0
|
||||
|
||||
# Parse args again and call whatever callback was selected
|
||||
args = subcommand_parser.parse_args(argv)
|
||||
|
||||
# Short-circuit and deal with help command right away.
|
||||
if args.func == self.do_help:
|
||||
self.do_help(args)
|
||||
return 0
|
||||
|
||||
LOG = logging.getLogger('glanceclient')
|
||||
LOG.addHandler(logging.StreamHandler())
|
||||
LOG.setLevel(logging.DEBUG if args.debug else logging.INFO)
|
||||
|
||||
client = self._get_versioned_client(api_version, args,
|
||||
force_auth=False)
|
||||
|
||||
try:
|
||||
args.func(client, args)
|
||||
|
||||
@@ -15,14 +15,8 @@
|
||||
|
||||
import argparse
|
||||
import copy
|
||||
import os
|
||||
import sys
|
||||
|
||||
if os.name == 'nt':
|
||||
import msvcrt
|
||||
else:
|
||||
msvcrt = None
|
||||
|
||||
from glanceclient import exc
|
||||
from glanceclient.common import utils
|
||||
from glanceclient.common import progressbar
|
||||
@@ -125,30 +119,7 @@ def _image_show(image, human_readable=False):
|
||||
|
||||
def _set_data_field(fields, args):
|
||||
if 'location' not in fields and 'copy_from' not in fields:
|
||||
if args.file:
|
||||
fields['data'] = open(args.file, 'rb')
|
||||
else:
|
||||
# distinguish cases where:
|
||||
# (1) stdin is not valid (as in cron jobs):
|
||||
# glance ... <&-
|
||||
# (2) image data is provided through standard input:
|
||||
# glance ... < /tmp/file or cat /tmp/file | glance ...
|
||||
# (3) no image data provided:
|
||||
# glance ...
|
||||
try:
|
||||
os.fstat(0)
|
||||
except OSError:
|
||||
# (1) stdin is not valid (closed...)
|
||||
fields['data'] = None
|
||||
return
|
||||
if not sys.stdin.isatty():
|
||||
# (2) image data is provided through standard input
|
||||
if msvcrt:
|
||||
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
|
||||
fields['data'] = sys.stdin
|
||||
else:
|
||||
# (3) no image data provided
|
||||
fields['data'] = None
|
||||
fields['data'] = utils.get_data_file(args)
|
||||
|
||||
|
||||
@utils.arg('image', metavar='<IMAGE>', help='Name or ID of image to describe.')
|
||||
|
||||
@@ -44,8 +44,8 @@ class Client(object):
|
||||
|
||||
def _get_image_model(self):
|
||||
schema = self.schemas.get('image')
|
||||
return warlock.model_factory(schema.raw())
|
||||
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
|
||||
|
||||
def _get_member_model(self):
|
||||
schema = self.schemas.get('member')
|
||||
return warlock.model_factory(schema.raw())
|
||||
return warlock.model_factory(schema.raw(), schemas.SchemaBasedModel)
|
||||
|
||||
@@ -14,6 +14,31 @@
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import jsonpatch
|
||||
import warlock.model as warlock
|
||||
|
||||
|
||||
class SchemaBasedModel(warlock.Model):
|
||||
"""Glance specific subclass of the warlock Model
|
||||
|
||||
This implementation alters the function of the patch property
|
||||
to take into account the schema's core properties. With this version
|
||||
undefined properties which are core will generated 'replace'
|
||||
operations rather than 'add' since this is what the Glance API
|
||||
expects.
|
||||
"""
|
||||
|
||||
@warlock.Model.patch.getter
|
||||
def patch(self):
|
||||
"""Return a jsonpatch object representing the delta."""
|
||||
original = copy.deepcopy(self.__dict__['__original__'])
|
||||
new = dict(self)
|
||||
if self.__dict__['schema']:
|
||||
for prop in self.schema['properties']:
|
||||
if prop not in original and prop in new:
|
||||
original[prop] = None
|
||||
|
||||
return jsonpatch.make_patch(original, dict(self)).to_string()
|
||||
|
||||
|
||||
class SchemaProperty(object):
|
||||
@@ -40,6 +65,12 @@ class Schema(object):
|
||||
raw_properties = raw_schema['properties']
|
||||
self.properties = translate_schema_properties(raw_properties)
|
||||
|
||||
def is_core_property(self, property_name):
|
||||
for prop in self.properties:
|
||||
if property_name == prop.name:
|
||||
return True
|
||||
return False
|
||||
|
||||
def raw(self):
|
||||
return copy.deepcopy(self._raw_schema)
|
||||
|
||||
|
||||
@@ -16,6 +16,78 @@
|
||||
from glanceclient.common import progressbar
|
||||
from glanceclient.common import utils
|
||||
from glanceclient import exc
|
||||
import json
|
||||
import os
|
||||
from os.path import expanduser
|
||||
|
||||
IMAGE_SCHEMA = None
|
||||
|
||||
|
||||
def get_image_schema():
|
||||
global IMAGE_SCHEMA
|
||||
if IMAGE_SCHEMA is None:
|
||||
schema_path = expanduser("~/.glanceclient/image_schema.json")
|
||||
if os.path.exists(schema_path) and os.path.isfile(schema_path):
|
||||
with file(schema_path, "r") as f:
|
||||
schema_raw = f.read()
|
||||
IMAGE_SCHEMA = json.loads(schema_raw)
|
||||
return IMAGE_SCHEMA
|
||||
|
||||
|
||||
@utils.schema_args(get_image_schema)
|
||||
@utils.arg('--property', metavar="<key=value>", action='append',
|
||||
default=[], help=('Arbitrary property to associate with image.'
|
||||
' May be used multiple times.'))
|
||||
def do_image_create(gc, args):
|
||||
"""Create a new image."""
|
||||
schema = gc.schemas.get("image")
|
||||
_args = [(x[0].replace('-', '_'), x[1]) for x in vars(args).items()]
|
||||
fields = dict(filter(lambda x: x[1] is not None and
|
||||
(x[0] == 'property' or
|
||||
schema.is_core_property(x[0])),
|
||||
_args))
|
||||
|
||||
raw_properties = fields.pop('property', [])
|
||||
for datum in raw_properties:
|
||||
key, value = datum.split('=', 1)
|
||||
fields[key] = value
|
||||
|
||||
image = gc.images.create(**fields)
|
||||
ignore = ['self', 'access', 'file', 'schema']
|
||||
image = dict([item for item in image.iteritems()
|
||||
if item[0] not in ignore])
|
||||
utils.print_dict(image)
|
||||
|
||||
|
||||
@utils.arg('id', metavar='<IMAGE_ID>', help='ID of image to update.')
|
||||
@utils.schema_args(get_image_schema, omit=['id'])
|
||||
@utils.arg('--property', metavar="<key=value>", action='append',
|
||||
default=[], help=('Arbitrary property to associate with image.'
|
||||
' May be used multiple times.'))
|
||||
@utils.arg('--remove-property', metavar="key", action='append', default=[],
|
||||
help="Name of arbitrary property to remove from the image")
|
||||
def do_image_update(gc, args):
|
||||
"""Update an existing image."""
|
||||
schema = gc.schemas.get("image")
|
||||
_args = [(x[0].replace('-', '_'), x[1]) for x in vars(args).items()]
|
||||
fields = dict(filter(lambda x: x[1] is not None and
|
||||
(x[0] in ['property', 'remove_property'] or
|
||||
schema.is_core_property(x[0])),
|
||||
_args))
|
||||
|
||||
raw_properties = fields.pop('property', [])
|
||||
for datum in raw_properties:
|
||||
key, value = datum.split('=', 1)
|
||||
fields[key] = value
|
||||
|
||||
remove_properties = fields.pop('remove_property', None)
|
||||
|
||||
image_id = fields.pop('id')
|
||||
image = gc.images.update(image_id, remove_properties, **fields)
|
||||
ignore = ['self', 'access', 'file', 'schema']
|
||||
image = dict([item for item in image.iteritems()
|
||||
if item[0] not in ignore])
|
||||
utils.print_dict(image)
|
||||
|
||||
|
||||
@utils.arg('--page-size', metavar='<SIZE>', default=None, type=int,
|
||||
@@ -138,6 +210,18 @@ def do_image_download(gc, args):
|
||||
utils.save_image(body, args.file)
|
||||
|
||||
|
||||
@utils.arg('--file', metavar='<FILE>',
|
||||
help=('Local file that contains disk image to be uploaded'
|
||||
' during creation. Alternatively, images can be passed'
|
||||
' to the client via stdin.'))
|
||||
@utils.arg('id', metavar='<IMAGE_ID>',
|
||||
help='ID of image to upload data to.')
|
||||
def do_image_upload(gc, args):
|
||||
"""Upload data for a specific image."""
|
||||
image_data = utils.get_data_file(args)
|
||||
gc.images.upload(args.id, image_data)
|
||||
|
||||
|
||||
@utils.arg('id', metavar='<IMAGE_ID>', help='ID of image to delete.')
|
||||
def do_image_delete(gc, args):
|
||||
"""Delete specified image."""
|
||||
|
||||
@@ -17,9 +17,17 @@
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import mock
|
||||
|
||||
from glanceclient import exc
|
||||
from glanceclient import shell as openstack_shell
|
||||
|
||||
#NOTE (esheffield) Used for the schema caching tests
|
||||
from glanceclient.v2 import schemas as schemas
|
||||
import json
|
||||
|
||||
from tests import utils
|
||||
|
||||
DEFAULT_IMAGE_URL = 'http://127.0.0.1:5000/'
|
||||
@@ -91,3 +99,102 @@ class ShellTest(utils.TestCase):
|
||||
test_shell = openstack_shell.OpenStackImagesShell()
|
||||
targeted_image_url = test_shell._get_image_url(fake_args)
|
||||
self.assertEqual(expected_image_url, targeted_image_url)
|
||||
|
||||
|
||||
class ShellCacheSchemaTest(utils.TestCase):
|
||||
def setUp(self):
|
||||
super(ShellCacheSchemaTest, self).setUp()
|
||||
self.shell = openstack_shell.OpenStackImagesShell()
|
||||
self._mock_client_setup()
|
||||
|
||||
def _mock_client_setup(self):
|
||||
self.schema_dict = {
|
||||
'name': 'image',
|
||||
'properties': {
|
||||
'name': {'type': 'string', 'description': 'Name of image'},
|
||||
},
|
||||
}
|
||||
|
||||
self.client = mock.Mock()
|
||||
self.client.schemas.get.return_value = schemas.Schema(self.schema_dict)
|
||||
|
||||
def _make_args(self, args):
|
||||
class Args():
|
||||
def __init__(self, entries):
|
||||
self.__dict__.update(entries)
|
||||
|
||||
return Args(args)
|
||||
|
||||
def _write_file(self, path, text):
|
||||
with file(path, 'w') as f:
|
||||
f.write(text)
|
||||
|
||||
def _read_file(self, path):
|
||||
with file(path, 'r') as f:
|
||||
text = f.read()
|
||||
|
||||
return text
|
||||
|
||||
def test_cache_schema_gets_when_not_exists(self):
|
||||
cache_dir = tempfile.gettempdir()
|
||||
|
||||
cache_file = cache_dir + '/image_schema.json'
|
||||
|
||||
if os.path.exists(cache_file):
|
||||
os.remove(cache_file)
|
||||
|
||||
options = {
|
||||
'get_schema': False
|
||||
}
|
||||
|
||||
with mock.patch.object(self.shell, '_get_versioned_client')\
|
||||
as mocked_get_client:
|
||||
mocked_get_client.return_value = self.client
|
||||
self.shell._cache_schema(self._make_args(options),
|
||||
home_dir=cache_dir)
|
||||
|
||||
self.assertTrue(os.path.exists(cache_file))
|
||||
|
||||
def test_cache_schema_gets_when_forced(self):
|
||||
cache_dir = tempfile.gettempdir()
|
||||
|
||||
cache_file = cache_dir + '/image_schema.json'
|
||||
|
||||
dummy_schema = 'my dummy schema'
|
||||
self._write_file(cache_file, dummy_schema)
|
||||
|
||||
options = {
|
||||
'get_schema': True
|
||||
}
|
||||
|
||||
with mock.patch.object(self.shell, '_get_versioned_client') \
|
||||
as mocked_get_client:
|
||||
mocked_get_client.return_value = self.client
|
||||
self.shell._cache_schema(self._make_args(options),
|
||||
home_dir=cache_dir)
|
||||
|
||||
self.assertTrue(os.path.exists(cache_file))
|
||||
text = self._read_file(cache_file)
|
||||
self.assertEquals(text, json.dumps(self.schema_dict))
|
||||
|
||||
def test_cache_schema_leaves_when_present_not_forced(self):
|
||||
cache_dir = tempfile.gettempdir()
|
||||
|
||||
cache_file = cache_dir + '/image_schema.json'
|
||||
|
||||
dummy_schema = 'my dummy schema'
|
||||
self._write_file(cache_file, dummy_schema)
|
||||
|
||||
options = {
|
||||
'get_schema': False
|
||||
}
|
||||
|
||||
with mock.patch.object(self.shell, '_get_versioned_client') \
|
||||
as mocked_get_client:
|
||||
mocked_get_client.return_value = self.client
|
||||
self.shell._cache_schema(self._make_args(options),
|
||||
home_dir=cache_dir)
|
||||
|
||||
self.assertTrue(os.path.exists(cache_file))
|
||||
text = self._read_file(cache_file)
|
||||
self.assertEquals(text, dummy_schema)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import testtools
|
||||
import warlock
|
||||
|
||||
from glanceclient.v2 import schemas
|
||||
from tests import utils
|
||||
@@ -43,6 +44,15 @@ fixtures = {
|
||||
}
|
||||
|
||||
|
||||
_SCHEMA = schemas.Schema({
|
||||
'name': 'image',
|
||||
'properties': {
|
||||
'name': {'type': 'string'},
|
||||
'color': {'type': 'string'},
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
class TestSchemaProperty(testtools.TestCase):
|
||||
def test_property_minimum(self):
|
||||
prop = schemas.SchemaProperty('size')
|
||||
@@ -83,3 +93,73 @@ class TestController(testtools.TestCase):
|
||||
schema = self.controller.get('image')
|
||||
self.assertEqual(schema.name, 'image')
|
||||
self.assertEqual([p.name for p in schema.properties], ['name'])
|
||||
|
||||
|
||||
class TestSchemaBasedModel(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestSchemaBasedModel, self).setUp()
|
||||
self.model = warlock.model_factory(_SCHEMA.raw(),
|
||||
schemas.SchemaBasedModel)
|
||||
|
||||
def test_patch_should_replace_missing_core_properties(self):
|
||||
obj = {
|
||||
'name': 'fred'
|
||||
}
|
||||
|
||||
original = self.model(obj)
|
||||
original['color'] = 'red'
|
||||
|
||||
patch = original.patch
|
||||
expected = '[{"path": "/color", "value": "red", "op": "replace"}]'
|
||||
self.assertEqual(patch, expected)
|
||||
|
||||
def test_patch_should_add_extra_properties(self):
|
||||
obj = {
|
||||
'name': 'fred',
|
||||
}
|
||||
|
||||
original = self.model(obj)
|
||||
original['weight'] = '10'
|
||||
|
||||
patch = original.patch
|
||||
expected = '[{"path": "/weight", "value": "10", "op": "add"}]'
|
||||
self.assertEqual(patch, expected)
|
||||
|
||||
def test_patch_should_replace_extra_properties(self):
|
||||
obj = {
|
||||
'name': 'fred',
|
||||
'weight': '10'
|
||||
}
|
||||
|
||||
original = self.model(obj)
|
||||
original['weight'] = '22'
|
||||
|
||||
patch = original.patch
|
||||
expected = '[{"path": "/weight", "value": "22", "op": "replace"}]'
|
||||
self.assertEqual(patch, expected)
|
||||
|
||||
def test_patch_should_remove_extra_properties(self):
|
||||
obj = {
|
||||
'name': 'fred',
|
||||
'weight': '10'
|
||||
}
|
||||
|
||||
original = self.model(obj)
|
||||
del original['weight']
|
||||
|
||||
patch = original.patch
|
||||
expected = '[{"path": "/weight", "op": "remove"}]'
|
||||
self.assertEqual(patch, expected)
|
||||
|
||||
def test_patch_should_remove_core_properties(self):
|
||||
obj = {
|
||||
'name': 'fred',
|
||||
'color': 'red'
|
||||
}
|
||||
|
||||
original = self.model(obj)
|
||||
del original['color']
|
||||
|
||||
patch = original.patch
|
||||
expected = '[{"path": "/color", "op": "remove"}]'
|
||||
self.assertEqual(patch, expected)
|
||||
|
||||
@@ -102,6 +102,111 @@ class ShellV2Test(testtools.TestCase):
|
||||
mocked_list.assert_called_once_with('pass')
|
||||
utils.print_dict.assert_called_once_with({'id': 'pass'})
|
||||
|
||||
def test_do_image_create_no_user_props(self):
|
||||
args = self._make_args({'name': 'IMG-01', 'disk_format': 'vhd',
|
||||
'container_format': 'bare'})
|
||||
with mock.patch.object(self.gc.images, 'create') as mocked_create:
|
||||
ignore_fields = ['self', 'access', 'file', 'schema']
|
||||
expect_image = dict([(field, field) for field in ignore_fields])
|
||||
expect_image['id'] = 'pass'
|
||||
expect_image['name'] = 'IMG-01'
|
||||
expect_image['disk_format'] = 'vhd'
|
||||
expect_image['container_format'] = 'bare'
|
||||
mocked_create.return_value = expect_image
|
||||
|
||||
test_shell.do_image_create(self.gc, args)
|
||||
|
||||
mocked_create.assert_called_once_with(name='IMG-01',
|
||||
disk_format='vhd',
|
||||
container_format='bare')
|
||||
utils.print_dict.assert_called_once_with({
|
||||
'id': 'pass', 'name': 'IMG-01', 'disk_format': 'vhd',
|
||||
'container_format': 'bare'})
|
||||
|
||||
def test_do_image_create_with_user_props(self):
|
||||
args = self._make_args({'name': 'IMG-01',
|
||||
'property': ['myprop=myval']})
|
||||
with mock.patch.object(self.gc.images, 'create') as mocked_create:
|
||||
ignore_fields = ['self', 'access', 'file', 'schema']
|
||||
expect_image = dict([(field, field) for field in ignore_fields])
|
||||
expect_image['id'] = 'pass'
|
||||
expect_image['name'] = 'IMG-01'
|
||||
expect_image['myprop'] = 'myval'
|
||||
mocked_create.return_value = expect_image
|
||||
|
||||
test_shell.do_image_create(self.gc, args)
|
||||
|
||||
mocked_create.assert_called_once_with(name='IMG-01',
|
||||
myprop='myval')
|
||||
utils.print_dict.assert_called_once_with({
|
||||
'id': 'pass', 'name': 'IMG-01', 'myprop': 'myval'})
|
||||
|
||||
def test_do_image_update_no_user_props(self):
|
||||
args = self._make_args({'id': 'pass', 'name': 'IMG-01',
|
||||
'disk_format': 'vhd',
|
||||
'container_format': 'bare'})
|
||||
with mock.patch.object(self.gc.images, 'update') as mocked_update:
|
||||
ignore_fields = ['self', 'access', 'file', 'schema']
|
||||
expect_image = dict([(field, field) for field in ignore_fields])
|
||||
expect_image['id'] = 'pass'
|
||||
expect_image['name'] = 'IMG-01'
|
||||
expect_image['disk_format'] = 'vhd'
|
||||
expect_image['container_format'] = 'bare'
|
||||
mocked_update.return_value = expect_image
|
||||
|
||||
test_shell.do_image_update(self.gc, args)
|
||||
|
||||
mocked_update.assert_called_once_with('pass',
|
||||
None,
|
||||
name='IMG-01',
|
||||
disk_format='vhd',
|
||||
container_format='bare')
|
||||
utils.print_dict.assert_called_once_with({
|
||||
'id': 'pass', 'name': 'IMG-01', 'disk_format': 'vhd',
|
||||
'container_format': 'bare'})
|
||||
|
||||
def test_do_image_update_with_user_props(self):
|
||||
args = self._make_args({'id': 'pass', 'name': 'IMG-01',
|
||||
'property': ['myprop=myval']})
|
||||
with mock.patch.object(self.gc.images, 'update') as mocked_update:
|
||||
ignore_fields = ['self', 'access', 'file', 'schema']
|
||||
expect_image = dict([(field, field) for field in ignore_fields])
|
||||
expect_image['id'] = 'pass'
|
||||
expect_image['name'] = 'IMG-01'
|
||||
expect_image['myprop'] = 'myval'
|
||||
mocked_update.return_value = expect_image
|
||||
|
||||
test_shell.do_image_update(self.gc, args)
|
||||
|
||||
mocked_update.assert_called_once_with('pass',
|
||||
None,
|
||||
name='IMG-01',
|
||||
myprop='myval')
|
||||
utils.print_dict.assert_called_once_with({
|
||||
'id': 'pass', 'name': 'IMG-01', 'myprop': 'myval'})
|
||||
|
||||
def test_do_image_update_with_remove_props(self):
|
||||
args = self._make_args({'id': 'pass', 'name': 'IMG-01',
|
||||
'disk_format': 'vhd',
|
||||
'remove-property': ['container_format']})
|
||||
with mock.patch.object(self.gc.images, 'update') as mocked_update:
|
||||
ignore_fields = ['self', 'access', 'file', 'schema']
|
||||
expect_image = dict([(field, field) for field in ignore_fields])
|
||||
expect_image['id'] = 'pass'
|
||||
expect_image['name'] = 'IMG-01'
|
||||
expect_image['disk_format'] = 'vhd'
|
||||
|
||||
mocked_update.return_value = expect_image
|
||||
|
||||
test_shell.do_image_update(self.gc, args)
|
||||
|
||||
mocked_update.assert_called_once_with('pass',
|
||||
['container_format'],
|
||||
name='IMG-01',
|
||||
disk_format='vhd')
|
||||
utils.print_dict.assert_called_once_with({
|
||||
'id': 'pass', 'name': 'IMG-01', 'disk_format': 'vhd'})
|
||||
|
||||
def test_do_explain(self):
|
||||
input = {
|
||||
'page_size': 18,
|
||||
@@ -115,6 +220,15 @@ class ShellV2Test(testtools.TestCase):
|
||||
|
||||
self.gc.schemas.get.assert_called_once_with('test')
|
||||
|
||||
def test_image_upload(self):
|
||||
args = self._make_args({'id': 'IMG-01', 'file': 'test'})
|
||||
|
||||
with mock.patch.object(self.gc.images, 'upload') as mocked_upload:
|
||||
utils.get_data_file = mock.Mock(return_value='testfile')
|
||||
mocked_upload.return_value = None
|
||||
test_shell.do_image_upload(self.gc, args)
|
||||
mocked_upload.assert_called_once_with('IMG-01', 'testfile')
|
||||
|
||||
def test_image_download(self):
|
||||
args = self._make_args(
|
||||
{'id': 'pass', 'file': 'test', 'progress': False})
|
||||
|
||||
Reference in New Issue
Block a user