Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve performance of remove_{hidden,deleted}_devices_from_device_inbox #11421

Merged
merged 19 commits into from
Nov 25, 2021
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11421.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of various background database schema updates.
93 changes: 60 additions & 33 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
@@ -653,41 +653,67 @@ async def _remove_deleted_devices_from_device_inbox(

def _remove_deleted_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> int:
"""stream_id is not unique
we need to use an inclusive `stream_id >= ?` clause,
since we might not have deleted all dead device messages for the stream_id
returned from the previous query

Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
) -> Tuple[int, bool]:
"""We delete only rows matching the `(user_id, device_id, stream_id)` tuple,
to avoid problems of deleting a large number of rows all at once
due to a single device having lots of device messages.
"""

last_stream_id = progress.get("stream_id", 0)

sql = """
SELECT device_id, user_id, stream_id
FROM device_inbox
WHERE
stream_id >= ?
AND (device_id, user_id) NOT IN (
SELECT device_id, user_id FROM devices
if "max_stream_id" in progress:
max_stream_id = progress["max_stream_id"]
else:
txn.execute("SELECT max(stream_id) FROM device_inbox")
# There's a type mismatch here between how we want to type the row and
# what fetchone says it returns, but we silence it because we know that
# res can't be None.
res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment]
if res[0] is None:
return 0, True
babolivier marked this conversation as resolved.
Show resolved Hide resolved
else:
max_stream_id = res[0]

start = progress.get("stream_id", 0)
stop = start + batch_size

if self.database_engine.supports_returning:
# If the database engine supports the RETURNING clause, use it and do
# everything in one go.
sql = """
DELETE FROM device_inbox
WHERE
stream_id >= ? AND stream_id < ?
AND (device_id, user_id) NOT IN (
SELECT device_id, user_id FROM devices
)
RETURNING device_id, user_id, stream_id
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""

txn.execute(sql, (start, stop))
num_deleted = txn.rowcount
rows = txn.fetchall()
else:
# Otherwise do the select and delete separately.
sql = """
SELECT device_id, user_id, stream_id
FROM device_inbox
WHERE
stream_id >= ? AND stream_id < ?
AND (device_id, user_id) NOT IN (
SELECT device_id, user_id FROM devices
)
ORDER BY stream_id
"""

txn.execute(sql, (start, stop))
rows = txn.fetchall()

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)
ORDER BY stream_id
LIMIT ?
"""

txn.execute(sql, (last_stream_id, batch_size))
rows = txn.fetchall()

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)

if rows:
# send more than stream_id to progress
@@ -701,18 +727,19 @@ def _remove_deleted_devices_from_device_inbox_txn(
"device_id": rows[-1][0],
"user_id": rows[-1][1],
"stream_id": rows[-1][2],
"max_stream_id": max_stream_id,
},
)

return num_deleted
return num_deleted, stop >= max_stream_id
babolivier marked this conversation as resolved.
Show resolved Hide resolved

number_deleted = await self.db_pool.runInteraction(
number_deleted, finished = await self.db_pool.runInteraction(
"_remove_deleted_devices_from_device_inbox",
_remove_deleted_devices_from_device_inbox_txn,
)

# The task is finished when no more lines are deleted.
if not number_deleted:
if finished:
await self.db_pool.updates._end_background_update(
self.REMOVE_DELETED_DEVICES
)