Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster remote room joins: stream the un-partial-stating of events over replication. [rei:frrj/streams/unpsr] #14545

Merged
merged 7 commits into from
Dec 14, 2022
Prev Previous commit
Next Next commit
Add update fetch txn for unPS-event stream
  • Loading branch information
reivilibre committed Dec 6, 2022
commit 4f48193bd35b70e14d5b0e507eae85e1d8d2f2be
61 changes: 61 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
@@ -318,6 +318,67 @@ def get_un_partial_stated_events_token(self) -> int:
# readers up.
return self._un_partial_stated_events_stream_id_gen.get_current_token()

async def get_un_partial_stated_events_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
"""Get updates for the un-partial-stated events replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return [], current_id, False

def get_un_partial_stated_events_from_stream_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
sql = """
SELECT stream_id, event_id, rejection_status_changed
FROM un_partial_stated_event_stream
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, instance_name, limit))
updates = [
(
row[0],
(
row[1],
bool(row[2]),
),
)
for row in txn
]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True

return updates, upto_token, limited

return await self.db_pool.runInteraction(
"get_un_partial_stated_events_from_stream",
get_un_partial_stated_events_from_stream_txn,
)

def process_replication_rows(
self,
stream_name: str,