Merge "db: Don't rely on autocommit behavior"

This commit is contained in:
Zuul
2022-04-22 17:29:12 +00:00
committed by Gerrit Code Review
4 changed files with 309 additions and 215 deletions
+18 -10
View File
@@ -612,9 +612,9 @@ def _compute_node_select(context, filters=None, limit=None, marker=None):
def _compute_node_fetchall(context, filters=None, limit=None, marker=None):
select = _compute_node_select(context, filters, limit=limit, marker=marker)
engine = get_engine(context=context)
conn = engine.connect()
results = conn.execute(select).fetchall()
with engine.connect() as conn, conn.begin():
results = conn.execute(select).fetchall()
# Callers expect dict-like objects, not SQLAlchemy RowProxy objects...
results = [dict(r._mapping) for r in results]
@@ -983,9 +983,9 @@ def compute_node_statistics(context):
).label('disk_available_least'),
]
select = sql.select(*agg_cols).select_from(j)
conn = engine.connect()
results = conn.execute(select).fetchone()
with engine.connect() as conn, conn.begin():
results = conn.execute(select).fetchone()
# Build a dict of the info--making no assumptions about result
fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
@@ -993,7 +993,6 @@ def compute_node_statistics(context):
'current_workload', 'running_vms', 'disk_available_least')
results = {field: int(results[idx] or 0)
for idx, field in enumerate(fields)}
conn.close()
return results
@@ -4293,7 +4292,8 @@ def _get_fk_stmts(metadata, conn, table, column, records):
select = sql.select(fk.column).where(
sql.and_(fk.parent == fk.column, column.in_(records))
)
rows = conn.execute(select).fetchall()
with conn.begin():
rows = conn.execute(select).fetchall()
p_records = [r[0] for r in rows]
# Then, select rows in the child table that correspond to the
# parent table records that were passed in.
@@ -4308,7 +4308,8 @@ def _get_fk_stmts(metadata, conn, table, column, records):
fk_select = sql.select(fk_column).where(
sql.and_(fk.parent == fk.column, fk.column.in_(p_records))
)
fk_rows = conn.execute(fk_select).fetchall()
with conn.begin():
fk_rows = conn.execute(fk_select).fetchall()
fk_records = [r[0] for r in fk_rows]
if fk_records:
# If we found any records in the child table, create shadow
@@ -4395,7 +4396,8 @@ def _archive_deleted_rows_for_table(
select = select.where(table.c.updated_at < before)
select = select.order_by(column).limit(max_rows)
rows = conn.execute(select).fetchall()
with conn.begin():
rows = conn.execute(select).fetchall()
records = [r[0] for r in rows]
# We will archive deleted rows for this table and also generate insert and
@@ -4431,7 +4433,8 @@ def _archive_deleted_rows_for_table(
query_select = sql.select(table.c.uuid).where(
table.c.id.in_(records)
)
rows = conn.execute(query_select).fetchall()
with conn.begin():
rows = conn.execute(query_select).fetchall()
deleted_instance_uuids = [r[0] for r in rows]
try:
@@ -4453,6 +4456,8 @@ def _archive_deleted_rows_for_table(
"%(tablename)s: %(error)s",
{'tablename': tablename, 'error': str(ex)})
conn.close()
return rows_archived, deleted_instance_uuids, extras
@@ -4575,7 +4580,8 @@ def purge_shadow_tables(context, before_date, status_fn=None):
else:
delete = table.delete()
deleted = conn.execute(delete)
with conn.begin():
deleted = conn.execute(delete)
if deleted.rowcount > 0:
status_fn(_('Deleted %(rows)i rows from %(table)s based on '
'timestamp column %(col)s') % {
@@ -4584,6 +4590,8 @@ def purge_shadow_tables(context, before_date, status_fn=None):
'col': col is None and '(n/a)' or col.name})
total_deleted += deleted.rowcount
conn.close()
return total_deleted
-9
View File
@@ -870,15 +870,6 @@ class WarningsFixture(fixtures.Fixture):
category=sqla_exc.SADeprecationWarning,
)
# ...but filter everything out until we get around to fixing them
# TODO(stephenfin): Fix all of these
warnings.filterwarnings(
'ignore',
module='nova',
message=r'The current statement is being autocommitted .*',
category=sqla_exc.SADeprecationWarning)
self.addCleanup(self._reset_warning_filters)
def _reset_warning_filters(self):
+283 -190
View File
@@ -5663,7 +5663,6 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
super(ArchiveTestCase, self).setUp()
self.engine = db.get_engine()
self.metadata = sa.MetaData()
self.conn = self.engine.connect()
self.instance_id_mappings = models.InstanceIdMapping.__table__
self.shadow_instance_id_mappings = sqlalchemyutils.get_table(
self.engine, "shadow_instance_id_mappings")
@@ -5696,12 +5695,13 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
"""
metadata = sa.MetaData()
metadata.reflect(bind=self.engine)
for table in metadata.tables:
if table.startswith("shadow_") and table not in exceptions:
rows = self.conn.exec_driver_sql(
"SELECT * FROM %s" % table
).fetchall()
self.assertEqual(rows, [], "Table %s not empty" % table)
with self.engine.connect() as conn, conn.begin():
for table in metadata.tables:
if table.startswith("shadow_") and table not in exceptions:
rows = conn.exec_driver_sql(
"SELECT * FROM %s" % table
).fetchall()
self.assertEqual(rows, [], "Table %s not empty" % table)
def test_shadow_tables(self):
"""Validate shadow table schema.
@@ -5754,57 +5754,72 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
self._assert_shadow_tables_empty_except()
def test_archive_deleted_rows(self):
# Add 6 rows to table
for uuidstr in self.uuidstrs:
ins_stmt = self.instance_id_mappings.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
# Add 6 rows to table
for uuidstr in self.uuidstrs:
ins_stmt = self.instance_id_mappings.insert().values(
uuid=uuidstr,
)
conn.execute(ins_stmt)
# Set 4 to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
with self.engine.connect() as conn, conn.begin():
update_statement = self.instance_id_mappings.update().where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4])
).values(deleted=1, deleted_at=timeutils.utcnow())
conn.execute(update_statement)
qiim = sql.select(self.instance_id_mappings).where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qiim).fetchall()
# Verify we have 6 in main
self.assertEqual(len(rows), 6)
qsiim = sql.select(self.shadow_instance_id_mappings).where(
self.shadow_instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qsiim).fetchall()
# Verify we have 0 in shadow
self.assertEqual(len(rows), 0)
# Archive 2 rows
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
results = db.archive_deleted_rows(max_rows=2)
expected = dict(instance_id_mappings=2)
self._assertEqualObjects(expected, results[0])
rows = self.conn.execute(qiim).fetchall()
# Verify we have 4 left in main
self.assertEqual(len(rows), 4)
rows = self.conn.execute(qsiim).fetchall()
# Verify we have 2 in shadow
self.assertEqual(len(rows), 2)
with self.engine.connect() as conn, conn.begin():
# Archive 2 rows and verify we have 4 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
# Archive 2 more rows
results = db.archive_deleted_rows(max_rows=2)
expected = dict(instance_id_mappings=2)
self._assertEqualObjects(expected, results[0])
rows = self.conn.execute(qiim).fetchall()
# Verify we have 2 left in main
self.assertEqual(len(rows), 2)
rows = self.conn.execute(qsiim).fetchall()
# Verify we have 4 in shadow
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
# Try to archive more, but there are no deleted rows left.
results = db.archive_deleted_rows(max_rows=2)
expected = dict()
self._assertEqualObjects(expected, results[0])
rows = self.conn.execute(qiim).fetchall()
# Verify we still have 2 left in main
self.assertEqual(len(rows), 2)
rows = self.conn.execute(qsiim).fetchall()
# Verify we still have 4 in shadow
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we still have 2 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we still have 4 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
# Ensure only deleted rows were deleted
self._assert_shadow_tables_empty_except(
@@ -5814,34 +5829,45 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Add 6 rows to table
for uuidstr in self.uuidstrs:
ins_stmt = self.instances.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
ins_stmt = self.instance_actions.insert().\
values(instance_uuid=uuidstr)
result = self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
ins_stmt = self.instance_actions.insert().values(
instance_uuid=uuidstr,
)
with self.engine.connect() as conn, conn.begin():
result = conn.execute(ins_stmt)
instance_action_uuid = result.inserted_primary_key[0]
ins_stmt = self.instance_actions_events.insert().\
values(action_id=instance_action_uuid)
self.conn.execute(ins_stmt)
ins_stmt = self.instance_actions_events.insert().values(
action_id=instance_action_uuid,
)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Set 1 to deleted before 2017-01-01
deleted_at = timeutils.parse_strtime('2017-01-01T00:00:00.0')
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[0:1]))\
.values(deleted=1, deleted_at=deleted_at)
self.conn.execute(update_statement)
update_statement = self.instances.update().where(
self.instances.c.uuid.in_(self.uuidstrs[0:1])
).values(deleted=1, deleted_at=deleted_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 1 to deleted before 2017-01-02
deleted_at = timeutils.parse_strtime('2017-01-02T00:00:00.0')
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[1:2]))\
.values(deleted=1, deleted_at=deleted_at)
self.conn.execute(update_statement)
update_statement = self.instances.update().where(
self.instances.c.uuid.in_(self.uuidstrs[1:2])
).values(deleted=1, deleted_at=deleted_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 2 to deleted now
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[2:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
update_statement = self.instances.update().where(
self.instances.c.uuid.in_(self.uuidstrs[2:4])
).values(deleted=1, deleted_at=timeutils.utcnow())
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qiim = sql.select(self.instances).where(
self. instances.c.uuid.in_(self.uuidstrs)
)
@@ -5849,9 +5875,11 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
self.shadow_instances.c.uuid.in_(self.uuidstrs)
)
# Verify we have 6 in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
# Make sure 'before' comparison is for < not <=, nothing deleted
before_date = dateutil_parser.parse('2017-01-01', fuzzy=True)
_, uuids, _ = db.archive_deleted_rows(max_rows=1, before=before_date)
@@ -5885,22 +5913,25 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
expected = {}
self._assertEqualObjects(expected, results[0])
# Verify we have 4 left in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
# Archive everything else, make sure default operation without
# before argument didn't break
results = db.archive_deleted_rows(max_rows=1000)
# Verify we have 2 left in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
def test_archive_deleted_rows_for_every_uuid_table(self):
tablenames = []
@@ -5928,94 +5959,117 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
for uuidstr in self.uuidstrs:
ins_stmt = main_table.insert().values(uuid=uuidstr)
try:
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
except (db_exc.DBError, sqla_exc.OperationalError):
# This table has constraints that require a table-specific
# insert, so skip it.
return 2
# Set 4 to deleted
update_statement = main_table.update().\
where(main_table.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
update_statement = main_table.update().where(
main_table.c.uuid.in_(self.uuidstrs[:4])
).values(deleted=1, deleted_at=timeutils.utcnow())
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qmt = sql.select(main_table).where(
main_table.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qmt).fetchall()
# Verify we have 6 in main
self.assertEqual(len(rows), 6)
qst = sql.select(shadow_table).where(
shadow_table.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qst).fetchall()
# Verify we have 0 in shadow
self.assertEqual(len(rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 0)
# Archive 2 rows
db._archive_deleted_rows_for_table(
self.metadata, self.engine, tablename, max_rows=2, before=None,
task_log=False,
)
# Verify we have 4 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 2)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 left in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 2)
# Archive 2 more rows
db._archive_deleted_rows_for_table(
self.metadata, self.engine, tablename, max_rows=2, before=None,
task_log=False,
)
# Verify we have 2 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
# Try to archive more, but there are no deleted rows left.
db._archive_deleted_rows_for_table(
self.metadata, self.engine, tablename, max_rows=2, before=None,
task_log=False,
)
# Verify we still have 2 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we still have 4 in shadow
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we still have 2 left in main
rows = conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
# Verify we still have 4 in shadow
rows = conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
return 0
def test_archive_deleted_rows_shadow_insertions_equals_deletions(self):
# Add 2 rows to table
for uuidstr in self.uuidstrs[:2]:
ins_stmt = self.instance_id_mappings.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Set both to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:2]))\
.values(deleted=1)
self.conn.execute(update_statement)
update_statement = self.instance_id_mappings.update().where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:2])
).values(deleted=1)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qiim = sql.select(self.instance_id_mappings).where(
self. instance_id_mappings.c.uuid.in_(self.uuidstrs[:2])
)
rows = self.conn.execute(qiim).fetchall()
# Verify we have 2 in main
self.assertEqual(len(rows), 2)
qsiim = sql.select(self.shadow_instance_id_mappings).where(
self.shadow_instance_id_mappings.c.uuid.in_(self.uuidstrs[:2])
)
shadow_rows = self.conn.execute(qsiim).fetchall()
# Verify we have 0 in shadow
self.assertEqual(len(shadow_rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 0 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
# Archive the rows
db.archive_deleted_rows(max_rows=2)
main_rows = self.conn.execute(qiim).fetchall()
shadow_rows = self.conn.execute(qsiim).fetchall()
# Verify the insertions into shadow is same as deletions from main
self.assertEqual(len(shadow_rows), len(rows) - len(main_rows))
with self.engine.connect() as conn, conn.begin():
# Verify we now have 0 in main
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 0)
# Verify we now have 2 in shadow
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
def test_archive_deleted_rows_for_migrations(self):
# migrations.instance_uuid depends on instances.uuid
@@ -6025,13 +6079,18 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
instance_uuid = uuidsentinel.instance
ins_stmt = self.instances.insert().values(
uuid=instance_uuid,
deleted=1,
deleted_at=timeutils.utcnow())
self.conn.execute(ins_stmt)
ins_stmt = self.migrations.insert().values(instance_uuid=instance_uuid,
deleted=0)
self.conn.execute(ins_stmt)
uuid=instance_uuid,
deleted=1,
deleted_at=timeutils.utcnow(),
)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
ins_stmt = self.migrations.insert().values(
instance_uuid=instance_uuid, deleted=0,
)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Archiving instances should result in migrations related to the
# instances also being archived.
num = db._archive_deleted_rows_for_table(
@@ -6047,70 +6106,86 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Add 6 rows to each table
for uuidstr in self.uuidstrs:
ins_stmt = self.instance_id_mappings.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
ins_stmt2 = self.instances.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt2)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt2)
# Set 4 of each to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
update_statement2 = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement2)
# Verify we have 6 in each main table
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement2)
qiim = sql.select(self.instance_id_mappings).where(
self.instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
qi = sql.select(self.instances).where(
self.instances.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in each shadow table
qsiim = sql.select(self.shadow_instance_id_mappings).where(
self.shadow_instance_id_mappings.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
qsi = sql.select(self.shadow_instances).where(
self.shadow_instances.c.uuid.in_(self.uuidstrs)
)
rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in each main table
rows = conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
rows = conn.execute(qi).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in each shadow table
rows = conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 0)
rows = conn.execute(qsi).fetchall()
self.assertEqual(len(rows), 0)
# Archive 7 rows, which should be 4 in one table and 3 in the other.
db.archive_deleted_rows(max_rows=7)
# Verify we have 5 left in the two main tables combined
iim_rows = self.conn.execute(qiim).fetchall()
i_rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 5)
# Verify we have 7 in the two shadow tables combined.
siim_rows = self.conn.execute(qsiim).fetchall()
si_rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 7)
with self.engine.connect() as conn, conn.begin():
# Verify we have 5 left in the two main tables combined
iim_rows = conn.execute(qiim).fetchall()
i_rows = conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 5)
# Verify we have 7 in the two shadow tables combined.
siim_rows = conn.execute(qsiim).fetchall()
si_rows = conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 7)
# Archive the remaining deleted rows.
db.archive_deleted_rows(max_rows=1)
# Verify we have 4 total left in both main tables.
iim_rows = self.conn.execute(qiim).fetchall()
i_rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = self.conn.execute(qsiim).fetchall()
si_rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 total left in both main tables.
iim_rows = conn.execute(qiim).fetchall()
i_rows = conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = conn.execute(qsiim).fetchall()
si_rows = conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
# Try to archive more, but there are no deleted rows left.
db.archive_deleted_rows(max_rows=500)
# Verify we have 4 total left in both main tables.
iim_rows = self.conn.execute(qiim).fetchall()
i_rows = self.conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = self.conn.execute(qsiim).fetchall()
si_rows = self.conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
with self.engine.connect() as conn, conn.begin():
# Verify we have 4 total left in both main tables.
iim_rows = conn.execute(qiim).fetchall()
i_rows = conn.execute(qi).fetchall()
self.assertEqual(len(iim_rows) + len(i_rows), 4)
# Verify we have 8 in shadow
siim_rows = conn.execute(qsiim).fetchall()
si_rows = conn.execute(qsi).fetchall()
self.assertEqual(len(siim_rows) + len(si_rows), 8)
self._assert_shadow_tables_empty_except(
'shadow_instances',
'shadow_instance_id_mappings'
@@ -6122,34 +6197,47 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
ins_stmt = self.task_log.insert().values(
id=i, task_name='instance_usage_audit', state='DONE',
host='host', message='message')
self.conn.execute(ins_stmt)
with self.engine.connect() as conn, conn.begin():
conn.execute(ins_stmt)
# Set 1 to updated before 2017-01-01
updated_at = timeutils.parse_strtime('2017-01-01T00:00:00.0')
update_statement = self.task_log.update().where(
self.task_log.c.id == 1).values(updated_at=updated_at)
self.conn.execute(update_statement)
self.task_log.c.id == 1
).values(updated_at=updated_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 1 to updated before 2017-01-02
updated_at = timeutils.parse_strtime('2017-01-02T00:00:00.0')
update_statement = self.task_log.update().where(
self.task_log.c.id == 2).values(updated_at=updated_at)
self.conn.execute(update_statement)
self.task_log.c.id == 2
).values(updated_at=updated_at)
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
# Set 2 to updated now
update_statement = self.task_log.update().where(
self.task_log.c.id.in_(range(3, 5))).values(
updated_at=timeutils.utcnow())
self.conn.execute(update_statement)
# Verify we have 6 in main
self.task_log.c.id.in_(range(3, 5))
).values(updated_at=timeutils.utcnow())
with self.engine.connect() as conn, conn.begin():
conn.execute(update_statement)
qtl = sql.select(self.task_log).where(
self.task_log.c.id.in_(range(1, 7))
)
rows = self.conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
qstl = sql.select(self.shadow_task_log).where(
self.shadow_task_log.c.id.in_(range(1, 7))
)
rows = self.conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 0)
with self.engine.connect() as conn, conn.begin():
# Verify we have 6 in main
rows = conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 6)
# Verify we have 0 in shadow
rows = conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 0)
# Make sure 'before' comparison is for < not <=
before_date = dateutil_parser.parse('2017-01-01', fuzzy=True)
_, _, rows = db.archive_deleted_rows(
@@ -6171,22 +6259,27 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
results = db.archive_deleted_rows(max_rows=2, task_log=True)
expected = dict(task_log=2)
self._assertEqualObjects(expected, results[0])
# Verify we have 2 left in main
rows = self.conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = self.conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 4)
with self.engine.connect() as conn, conn.begin():
# Verify we have 2 left in main
rows = conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 4)
# Archive the rest
results = db.archive_deleted_rows(max_rows=100, task_log=True)
expected = dict(task_log=2)
self._assertEqualObjects(expected, results[0])
# Verify we have 0 left in main
rows = self.conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 0)
# Verify we have 6 in shadow
rows = self.conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 6)
with self.engine.connect() as conn, conn.begin():
# Verify we have 0 left in main
rows = conn.execute(qtl).fetchall()
self.assertEqual(len(rows), 0)
# Verify we have 6 in shadow
rows = conn.execute(qstl).fetchall()
self.assertEqual(len(rows), 6)
class PciDeviceDBApiTestCase(test.TestCase, ModelsObjectComparatorMixin):
+8 -6
View File
@@ -2015,12 +2015,14 @@ class TestInstanceListObject(test_objects._LocalTest,
# manually here.
engine = db.get_engine()
table = sql_models.Instance.__table__
with engine.connect() as conn:
update = table.insert().values(user_id=self.context.user_id,
project_id=self.context.project_id,
uuid=uuids.nullinst,
host='foo',
hidden=None)
with engine.connect() as conn, conn.begin():
update = table.insert().values(
user_id=self.context.user_id,
project_id=self.context.project_id,
uuid=uuids.nullinst,
host='foo',
hidden=None,
)
conn.execute(update)
insts = objects.InstanceList.get_by_filters(self.context,