diff --git a/nova/db/main/api.py b/nova/db/main/api.py index 13b84c544d..1a97becabc 100644 --- a/nova/db/main/api.py +++ b/nova/db/main/api.py @@ -1960,49 +1960,6 @@ def _model_get_uuid_by_sort_filters(context, model, sort_keys, sort_dirs, return result -def _db_connection_type(db_connection): - """Returns a lowercase symbol for the db type. - - This is useful when we need to change what we are doing per DB - (like handling regexes). In a CellsV2 world it probably needs to - do something better than use the database configuration string. - """ - - db_string = db_connection.split(':')[0].split('+')[0] - return db_string.lower() - - -def _safe_regex_mysql(raw_string): - """Make regex safe to mysql. - - Certain items like '|' are interpreted raw by mysql REGEX. If you - search for a single | then you trigger an error because it's - expecting content on either side. - - For consistency sake we escape all '|'. This does mean we wouldn't - support something like foo|bar to match completely different - things, however, one can argue putting such complicated regex into - name search probably means you are doing this wrong. - """ - return raw_string.replace('|', '\\|') - - -def _get_regexp_ops(connection): - """Return safety filter and db opts for regex.""" - regexp_op_map = { - 'postgresql': '~', - 'mysql': 'REGEXP', - 'sqlite': 'REGEXP' - } - regex_safe_filters = { - 'mysql': _safe_regex_mysql - } - db_type = _db_connection_type(connection) - - return (regex_safe_filters.get(db_type, lambda x: x), - regexp_op_map.get(db_type, 'LIKE')) - - def _regex_instance_filter(query, filters): """Applies regular expression filtering to an Instance query. @@ -2013,7 +1970,8 @@ def _regex_instance_filter(query, filters): """ model = models.Instance - safe_regex_filter, db_regexp_op = _get_regexp_ops(CONF.database.connection) + safe_regex_filter, db_regexp_op = db_utils.get_regexp_ops( + CONF.database.connection) for filter_name in filters: try: column_attr = getattr(model, filter_name) @@ -2027,7 +1985,7 @@ def _regex_instance_filter(query, filters): filter_val = str(filter_val) if db_regexp_op == 'LIKE': query = query.filter(column_attr.op(db_regexp_op)( - u'%' + filter_val + u'%')) + '%' + filter_val + '%')) else: filter_val = safe_regex_filter(filter_val) query = query.filter(column_attr.op(db_regexp_op)( diff --git a/nova/db/utils.py b/nova/db/utils.py index 234845a359..e0a4baae18 100644 --- a/nova/db/utils.py +++ b/nova/db/utils.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections.abc import functools import inspect @@ -107,3 +108,49 @@ def process_sort_params( result_dirs.append(default_dir_value) return result_keys, result_dirs + + +def _db_connection_type(db_connection: str) -> str: + """Returns a lowercase symbol for the db type. + + This is useful when we need to change what we are doing per DB (like + handling regexes). In a CellsV2 world it probably needs to do something + better than use the database configuration string. + """ + db_string = db_connection.split(':')[0].split('+')[0] + return db_string.lower() + + +def _safe_regex_mysql(raw_string: str) -> str: + """Make regex safe to MySQL. + + Certain items like ``|`` are interpreted raw by mysql REGEX. If you search + for a single | then you trigger an error because it's expecting content on + either side. + + For consistency sake we escape all ``|``. This does mean we wouldn't + support something like foo|bar to match completely different things, + however, one can argue putting such complicated regex into name search + probably means you are doing this wrong. + """ + return raw_string.replace('|', '\\|') + + +def get_regexp_ops( + connection: str +) -> tuple[collections.abc.Callable[[str], str], str]: + """Return safety filter and db opts for regex.""" + regex_safe_filters = { + 'mysql': _safe_regex_mysql + } + regexp_op_map = { + 'postgresql': '~', + 'mysql': 'REGEXP', + 'sqlite': 'REGEXP' + } + db_type = _db_connection_type(connection) + + return ( + regex_safe_filters.get(db_type, lambda x: x), + regexp_op_map.get(db_type, 'LIKE'), + ) diff --git a/nova/tests/unit/db/main/test_api.py b/nova/tests/unit/db/main/test_api.py index 61927a6a46..9ca8eb44d1 100644 --- a/nova/tests/unit/db/main/test_api.py +++ b/nova/tests/unit/db/main/test_api.py @@ -329,8 +329,8 @@ def _create_aggregate_with_hosts(context=context.get_admin_context(), return result -@mock.patch.object(db, '_get_regexp_ops', - return_value=(lambda x: x, 'LIKE')) +@mock.patch.object( + db_utils, 'get_regexp_ops', return_value=(lambda x: x, 'LIKE')) class UnsupportedDbRegexpTestCase(DbTestCase): def test_instance_get_all_by_filters_paginate(self, mock_get_regexp): @@ -719,28 +719,6 @@ class SqlAlchemyDbApiNoDbTestCase(test.NoDBTestCase): db.convert_objects_related_datetimes(test1, *datetime_keys) self.assertEqual(test1, expected_dict) - def test_get_regexp_op_for_database_sqlite(self): - filter, op = db._get_regexp_ops('sqlite:///') - self.assertEqual('|', filter('|')) - self.assertEqual('REGEXP', op) - - def test_get_regexp_op_for_database_mysql(self): - filter, op = db._get_regexp_ops( - 'mysql+pymysql://root@localhost') - self.assertEqual('\\|', filter('|')) - self.assertEqual('REGEXP', op) - - def test_get_regexp_op_for_database_postgresql(self): - filter, op = db._get_regexp_ops( - 'postgresql://localhost') - self.assertEqual('|', filter('|')) - self.assertEqual('~', op) - - def test_get_regexp_op_for_database_unknown(self): - filter, op = db._get_regexp_ops('notdb:///') - self.assertEqual('|', filter('|')) - self.assertEqual('LIKE', op) - @mock.patch.object(db, 'context_manager') def test_get_engine(self, mock_ctxt_mgr): db.get_engine() @@ -769,22 +747,6 @@ class SqlAlchemyDbApiNoDbTestCase(test.NoDBTestCase): mock_get.assert_called_once_with(mock.sentinel.elevated, 'foo') ctxt.elevated.assert_called_once_with(read_deleted='yes') - def test_replace_sub_expression(self): - ret = db._safe_regex_mysql('|') - self.assertEqual('\\|', ret) - - ret = db._safe_regex_mysql('||') - self.assertEqual('\\|\\|', ret) - - ret = db._safe_regex_mysql('a||') - self.assertEqual('a\\|\\|', ret) - - ret = db._safe_regex_mysql('|a|') - self.assertEqual('\\|a\\|', ret) - - ret = db._safe_regex_mysql('||a') - self.assertEqual('\\|\\|a', ret) - class SqlAlchemyDbApiTestCase(DbTestCase): def test_instance_get_all_by_host(self): diff --git a/nova/tests/unit/db/test_utils.py b/nova/tests/unit/db/test_utils.py index 5f293723f8..aab7ce1913 100644 --- a/nova/tests/unit/db/test_utils.py +++ b/nova/tests/unit/db/test_utils.py @@ -121,3 +121,41 @@ class ProcessSortParamTestCase(test.TestCase): self.assertRaises( exception.InvalidInput, utils.process_sort_params, ['key'], dirs) + + +class TestGetRegexOps(test.TestCase): + def test_get_regexp_op_for_database_sqlite(self): + filter, op = utils.get_regexp_ops('sqlite:///') + self.assertEqual('|', filter('|')) + self.assertEqual('REGEXP', op) + + def test_get_regexp_op_for_database_mysql(self): + filter, op = utils.get_regexp_ops('mysql+pymysql://root@localhost') + self.assertEqual('\\|', filter('|')) + self.assertEqual('REGEXP', op) + + def test_get_regexp_op_for_database_postgresql(self): + filter, op = utils.get_regexp_ops('postgresql://localhost') + self.assertEqual('|', filter('|')) + self.assertEqual('~', op) + + def test_get_regexp_op_for_database_unknown(self): + filter, op = utils.get_regexp_ops('notdb:///') + self.assertEqual('|', filter('|')) + self.assertEqual('LIKE', op) + + def test_replace_sub_expression(self): + ret = utils._safe_regex_mysql('|') + self.assertEqual('\\|', ret) + + ret = utils._safe_regex_mysql('||') + self.assertEqual('\\|\\|', ret) + + ret = utils._safe_regex_mysql('a||') + self.assertEqual('a\\|\\|', ret) + + ret = utils._safe_regex_mysql('|a|') + self.assertEqual('\\|a\\|', ret) + + ret = utils._safe_regex_mysql('||a') + self.assertEqual('\\|\\|a', ret)