Make historical messages available to federated servers

Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716

Follow-up to https://github.com/matrix-org/synapse/pull/9247
pull/10419/head
Eric Eastwood 2021-06-24 01:32:24 -05:00
parent 33701dc116
commit d2e2aa7bf4
4 changed files with 78 additions and 0 deletions

View File

@ -1052,6 +1052,7 @@ class FederationHandler(BaseHandler):
with (await self._room_backfill.queue(room_id)):
return await self._maybe_backfill_inner(room_id, current_depth, limit)
# Todo
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:

View File

@ -663,6 +663,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" INNER JOIN insertion_event_extremeties as i"
" ON g.event_id = i.insertion_prev_event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)

View File

@ -1504,6 +1504,8 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
self._handle_marker_event(txn, event)
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@ -1756,6 +1758,39 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
def _handle_marker_event(self, txn, event):
"""Handles inserting insertion extremeties during peristence of marker events
Args:
txn
event (EventBase)
"""
if event.type != EventTypes.MSC2716_MARKER:
# Not a marker event
return
insertion_event_id = event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
)
insertion_prev_event_ids = event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS
)
if not insertion_event_id or not insertion_prev_event_ids:
# Invalid marker event
return
for prev_event_id in insertion_prev_event_ids:
self.db_pool.simple_insert_txn(
txn,
table="insertion_event_extremeties",
values={
"insertion_event_id": insertion_event_id,
"room_id": event.room_id,
"insertion_prev_event_id": prev_event_id,
},
)
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.

View File

@ -0,0 +1,40 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a table that keeps track of "insertion" events back in the history
-- when we get a "marker" event over the "live" timeline. When navigating the DAG
-- and we hit an event which matches `insertion_prev_event_id`, it should backfill
-- the "insertion" event and start navigating from there.
CREATE TABLE IF NOT EXISTS insertion_event_extremeties(
insertion_event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
insertion_prev_event_id TEXT NOT NULL,
UNIQUE (insertion_event_id, room_id, room_id, insertion_prev_event_id)
);
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_room_id ON insertion_event_extremeties(room_id);
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_event_id ON insertion_event_extremeties(insertion_event_id);
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_prev_event_id ON insertion_event_extremeties(insertion_prev_event_id);
CREATE TABLE IF NOT EXISTS chunk_connections(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
chunk_id TEXT NOT NULL,
UNIQUE (event_id, room_id)
);
CREATE INDEX IF NOT EXISTS chunk_connections_insertion_chunk_id ON chunk_connections(chunk_id);