db: Move regex helpers to utils
So that we can use them for API DB methods, which are found in nova.objects instead of nova.db. Change-Id: Ifb15ee90ac6a6400b7268ed80f727080e98c4cdf Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
+3
-45
@@ -1960,49 +1960,6 @@ def _model_get_uuid_by_sort_filters(context, model, sort_keys, sort_dirs,
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def _db_connection_type(db_connection):
|
|
||||||
"""Returns a lowercase symbol for the db type.
|
|
||||||
|
|
||||||
This is useful when we need to change what we are doing per DB
|
|
||||||
(like handling regexes). In a CellsV2 world it probably needs to
|
|
||||||
do something better than use the database configuration string.
|
|
||||||
"""
|
|
||||||
|
|
||||||
db_string = db_connection.split(':')[0].split('+')[0]
|
|
||||||
return db_string.lower()
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_regex_mysql(raw_string):
|
|
||||||
"""Make regex safe to mysql.
|
|
||||||
|
|
||||||
Certain items like '|' are interpreted raw by mysql REGEX. If you
|
|
||||||
search for a single | then you trigger an error because it's
|
|
||||||
expecting content on either side.
|
|
||||||
|
|
||||||
For consistency sake we escape all '|'. This does mean we wouldn't
|
|
||||||
support something like foo|bar to match completely different
|
|
||||||
things, however, one can argue putting such complicated regex into
|
|
||||||
name search probably means you are doing this wrong.
|
|
||||||
"""
|
|
||||||
return raw_string.replace('|', '\\|')
|
|
||||||
|
|
||||||
|
|
||||||
def _get_regexp_ops(connection):
|
|
||||||
"""Return safety filter and db opts for regex."""
|
|
||||||
regexp_op_map = {
|
|
||||||
'postgresql': '~',
|
|
||||||
'mysql': 'REGEXP',
|
|
||||||
'sqlite': 'REGEXP'
|
|
||||||
}
|
|
||||||
regex_safe_filters = {
|
|
||||||
'mysql': _safe_regex_mysql
|
|
||||||
}
|
|
||||||
db_type = _db_connection_type(connection)
|
|
||||||
|
|
||||||
return (regex_safe_filters.get(db_type, lambda x: x),
|
|
||||||
regexp_op_map.get(db_type, 'LIKE'))
|
|
||||||
|
|
||||||
|
|
||||||
def _regex_instance_filter(query, filters):
|
def _regex_instance_filter(query, filters):
|
||||||
"""Applies regular expression filtering to an Instance query.
|
"""Applies regular expression filtering to an Instance query.
|
||||||
|
|
||||||
@@ -2013,7 +1970,8 @@ def _regex_instance_filter(query, filters):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
model = models.Instance
|
model = models.Instance
|
||||||
safe_regex_filter, db_regexp_op = _get_regexp_ops(CONF.database.connection)
|
safe_regex_filter, db_regexp_op = db_utils.get_regexp_ops(
|
||||||
|
CONF.database.connection)
|
||||||
for filter_name in filters:
|
for filter_name in filters:
|
||||||
try:
|
try:
|
||||||
column_attr = getattr(model, filter_name)
|
column_attr = getattr(model, filter_name)
|
||||||
@@ -2027,7 +1985,7 @@ def _regex_instance_filter(query, filters):
|
|||||||
filter_val = str(filter_val)
|
filter_val = str(filter_val)
|
||||||
if db_regexp_op == 'LIKE':
|
if db_regexp_op == 'LIKE':
|
||||||
query = query.filter(column_attr.op(db_regexp_op)(
|
query = query.filter(column_attr.op(db_regexp_op)(
|
||||||
u'%' + filter_val + u'%'))
|
'%' + filter_val + '%'))
|
||||||
else:
|
else:
|
||||||
filter_val = safe_regex_filter(filter_val)
|
filter_val = safe_regex_filter(filter_val)
|
||||||
query = query.filter(column_attr.op(db_regexp_op)(
|
query = query.filter(column_attr.op(db_regexp_op)(
|
||||||
|
|||||||
@@ -10,6 +10,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import collections.abc
|
||||||
import functools
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
|
|
||||||
@@ -107,3 +108,49 @@ def process_sort_params(
|
|||||||
result_dirs.append(default_dir_value)
|
result_dirs.append(default_dir_value)
|
||||||
|
|
||||||
return result_keys, result_dirs
|
return result_keys, result_dirs
|
||||||
|
|
||||||
|
|
||||||
|
def _db_connection_type(db_connection: str) -> str:
|
||||||
|
"""Returns a lowercase symbol for the db type.
|
||||||
|
|
||||||
|
This is useful when we need to change what we are doing per DB (like
|
||||||
|
handling regexes). In a CellsV2 world it probably needs to do something
|
||||||
|
better than use the database configuration string.
|
||||||
|
"""
|
||||||
|
db_string = db_connection.split(':')[0].split('+')[0]
|
||||||
|
return db_string.lower()
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_regex_mysql(raw_string: str) -> str:
|
||||||
|
"""Make regex safe to MySQL.
|
||||||
|
|
||||||
|
Certain items like ``|`` are interpreted raw by mysql REGEX. If you search
|
||||||
|
for a single | then you trigger an error because it's expecting content on
|
||||||
|
either side.
|
||||||
|
|
||||||
|
For consistency sake we escape all ``|``. This does mean we wouldn't
|
||||||
|
support something like foo|bar to match completely different things,
|
||||||
|
however, one can argue putting such complicated regex into name search
|
||||||
|
probably means you are doing this wrong.
|
||||||
|
"""
|
||||||
|
return raw_string.replace('|', '\\|')
|
||||||
|
|
||||||
|
|
||||||
|
def get_regexp_ops(
|
||||||
|
connection: str
|
||||||
|
) -> tuple[collections.abc.Callable[[str], str], str]:
|
||||||
|
"""Return safety filter and db opts for regex."""
|
||||||
|
regex_safe_filters = {
|
||||||
|
'mysql': _safe_regex_mysql
|
||||||
|
}
|
||||||
|
regexp_op_map = {
|
||||||
|
'postgresql': '~',
|
||||||
|
'mysql': 'REGEXP',
|
||||||
|
'sqlite': 'REGEXP'
|
||||||
|
}
|
||||||
|
db_type = _db_connection_type(connection)
|
||||||
|
|
||||||
|
return (
|
||||||
|
regex_safe_filters.get(db_type, lambda x: x),
|
||||||
|
regexp_op_map.get(db_type, 'LIKE'),
|
||||||
|
)
|
||||||
|
|||||||
@@ -329,8 +329,8 @@ def _create_aggregate_with_hosts(context=context.get_admin_context(),
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(db, '_get_regexp_ops',
|
@mock.patch.object(
|
||||||
return_value=(lambda x: x, 'LIKE'))
|
db_utils, 'get_regexp_ops', return_value=(lambda x: x, 'LIKE'))
|
||||||
class UnsupportedDbRegexpTestCase(DbTestCase):
|
class UnsupportedDbRegexpTestCase(DbTestCase):
|
||||||
|
|
||||||
def test_instance_get_all_by_filters_paginate(self, mock_get_regexp):
|
def test_instance_get_all_by_filters_paginate(self, mock_get_regexp):
|
||||||
@@ -719,28 +719,6 @@ class SqlAlchemyDbApiNoDbTestCase(test.NoDBTestCase):
|
|||||||
db.convert_objects_related_datetimes(test1, *datetime_keys)
|
db.convert_objects_related_datetimes(test1, *datetime_keys)
|
||||||
self.assertEqual(test1, expected_dict)
|
self.assertEqual(test1, expected_dict)
|
||||||
|
|
||||||
def test_get_regexp_op_for_database_sqlite(self):
|
|
||||||
filter, op = db._get_regexp_ops('sqlite:///')
|
|
||||||
self.assertEqual('|', filter('|'))
|
|
||||||
self.assertEqual('REGEXP', op)
|
|
||||||
|
|
||||||
def test_get_regexp_op_for_database_mysql(self):
|
|
||||||
filter, op = db._get_regexp_ops(
|
|
||||||
'mysql+pymysql://root@localhost')
|
|
||||||
self.assertEqual('\\|', filter('|'))
|
|
||||||
self.assertEqual('REGEXP', op)
|
|
||||||
|
|
||||||
def test_get_regexp_op_for_database_postgresql(self):
|
|
||||||
filter, op = db._get_regexp_ops(
|
|
||||||
'postgresql://localhost')
|
|
||||||
self.assertEqual('|', filter('|'))
|
|
||||||
self.assertEqual('~', op)
|
|
||||||
|
|
||||||
def test_get_regexp_op_for_database_unknown(self):
|
|
||||||
filter, op = db._get_regexp_ops('notdb:///')
|
|
||||||
self.assertEqual('|', filter('|'))
|
|
||||||
self.assertEqual('LIKE', op)
|
|
||||||
|
|
||||||
@mock.patch.object(db, 'context_manager')
|
@mock.patch.object(db, 'context_manager')
|
||||||
def test_get_engine(self, mock_ctxt_mgr):
|
def test_get_engine(self, mock_ctxt_mgr):
|
||||||
db.get_engine()
|
db.get_engine()
|
||||||
@@ -769,22 +747,6 @@ class SqlAlchemyDbApiNoDbTestCase(test.NoDBTestCase):
|
|||||||
mock_get.assert_called_once_with(mock.sentinel.elevated, 'foo')
|
mock_get.assert_called_once_with(mock.sentinel.elevated, 'foo')
|
||||||
ctxt.elevated.assert_called_once_with(read_deleted='yes')
|
ctxt.elevated.assert_called_once_with(read_deleted='yes')
|
||||||
|
|
||||||
def test_replace_sub_expression(self):
|
|
||||||
ret = db._safe_regex_mysql('|')
|
|
||||||
self.assertEqual('\\|', ret)
|
|
||||||
|
|
||||||
ret = db._safe_regex_mysql('||')
|
|
||||||
self.assertEqual('\\|\\|', ret)
|
|
||||||
|
|
||||||
ret = db._safe_regex_mysql('a||')
|
|
||||||
self.assertEqual('a\\|\\|', ret)
|
|
||||||
|
|
||||||
ret = db._safe_regex_mysql('|a|')
|
|
||||||
self.assertEqual('\\|a\\|', ret)
|
|
||||||
|
|
||||||
ret = db._safe_regex_mysql('||a')
|
|
||||||
self.assertEqual('\\|\\|a', ret)
|
|
||||||
|
|
||||||
|
|
||||||
class SqlAlchemyDbApiTestCase(DbTestCase):
|
class SqlAlchemyDbApiTestCase(DbTestCase):
|
||||||
def test_instance_get_all_by_host(self):
|
def test_instance_get_all_by_host(self):
|
||||||
|
|||||||
@@ -121,3 +121,41 @@ class ProcessSortParamTestCase(test.TestCase):
|
|||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exception.InvalidInput,
|
exception.InvalidInput,
|
||||||
utils.process_sort_params, ['key'], dirs)
|
utils.process_sort_params, ['key'], dirs)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetRegexOps(test.TestCase):
|
||||||
|
def test_get_regexp_op_for_database_sqlite(self):
|
||||||
|
filter, op = utils.get_regexp_ops('sqlite:///')
|
||||||
|
self.assertEqual('|', filter('|'))
|
||||||
|
self.assertEqual('REGEXP', op)
|
||||||
|
|
||||||
|
def test_get_regexp_op_for_database_mysql(self):
|
||||||
|
filter, op = utils.get_regexp_ops('mysql+pymysql://root@localhost')
|
||||||
|
self.assertEqual('\\|', filter('|'))
|
||||||
|
self.assertEqual('REGEXP', op)
|
||||||
|
|
||||||
|
def test_get_regexp_op_for_database_postgresql(self):
|
||||||
|
filter, op = utils.get_regexp_ops('postgresql://localhost')
|
||||||
|
self.assertEqual('|', filter('|'))
|
||||||
|
self.assertEqual('~', op)
|
||||||
|
|
||||||
|
def test_get_regexp_op_for_database_unknown(self):
|
||||||
|
filter, op = utils.get_regexp_ops('notdb:///')
|
||||||
|
self.assertEqual('|', filter('|'))
|
||||||
|
self.assertEqual('LIKE', op)
|
||||||
|
|
||||||
|
def test_replace_sub_expression(self):
|
||||||
|
ret = utils._safe_regex_mysql('|')
|
||||||
|
self.assertEqual('\\|', ret)
|
||||||
|
|
||||||
|
ret = utils._safe_regex_mysql('||')
|
||||||
|
self.assertEqual('\\|\\|', ret)
|
||||||
|
|
||||||
|
ret = utils._safe_regex_mysql('a||')
|
||||||
|
self.assertEqual('a\\|\\|', ret)
|
||||||
|
|
||||||
|
ret = utils._safe_regex_mysql('|a|')
|
||||||
|
self.assertEqual('\\|a\\|', ret)
|
||||||
|
|
||||||
|
ret = utils._safe_regex_mysql('||a')
|
||||||
|
self.assertEqual('\\|\\|a', ret)
|
||||||
|
|||||||
Reference in New Issue
Block a user