Add inserted historical messages to /backfill response

pull/10419/head
Eric Eastwood 2021-07-12 20:22:38 -05:00
parent baae5d86f5
commit c05e43bf31
3 changed files with 151 additions and 33 deletions

View File

@ -673,11 +673,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
)
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
sqlAsdf = "SELECT * FROM insertion_event_extremeties as i"
sqlAsdf = "SELECT * FROM insertion_event_edges as i"
txn.execute(sqlAsdf)
logger.info("wfeafewawafeawg %s", dict(txn))
sqlAsdf = "SELECT * FROM insertion_event_extremeties as i WHERE i.room_id = ?"
sqlAsdf = "SELECT * FROM insertion_event_edges as i WHERE i.room_id = ?"
txn.execute(sqlAsdf, (room_id,))
logger.info("awfeawefw %s", dict(txn))
@ -688,7 +688,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# " INNER JOIN event_backward_extremities as b"
# " ON g.prev_event_id = b.event_id"
# TODO
" INNER JOIN insertion_event_extremeties as i"
" INNER JOIN insertion_event_edges as i"
" ON e.event_id = i.insertion_prev_event_id"
" WHERE i.room_id = ?"
" GROUP BY i.insertion_event_id"
@ -703,7 +703,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
# TODO
# " INNER JOIN insertion_event_extremeties as i"
# " INNER JOIN insertion_event_edges as i"
# " ON g.event_id = i.insertion_prev_event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
@ -961,16 +961,50 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
# TODO
# Look for the prev_event_id connected to the given event_id
query = (
"SELECT depth, prev_event_id FROM event_edges"
# Get the depth of the prev_event_id from the events table
" INNER JOIN events"
" ON prev_event_id = events.event_id"
# Find an event which matches the given event_id
" WHERE event_edges.event_id = ?"
" AND event_edges.is_state = ?"
" LIMIT ?"
)
# Look for the "insertion" events connected to the given event_id
# TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't
connected_insertion_event_query = (
"SELECT e.depth, i.insertion_event_id FROM insertion_event_edges AS i"
# Get the depth of the insertion event from the events table
" INNER JOIN events AS e"
" ON e.event_id = i.insertion_event_id"
# Find an insertion event which points via prev_events to the given event_id
" WHERE i.insertion_prev_event_id = ?"
" LIMIT ?"
)
# Find any chunk connections of a given insertion event
# TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't
chunk_connection_query = (
"SELECT e.depth, c.event_id FROM insertion_events AS i"
# Find the chunk that connects to the given insertion event
" INNER JOIN chunk_edges AS c"
" ON i.next_chunk_id = c.chunk_id"
# Get the depth of the chunk start event from the events table
" INNER JOIN events AS e"
" ON e.event_id = c.event_id"
# Find an insertion event which matches the given event_id
" WHERE i.insertion_event_id = ?"
" LIMIT ?"
)
# In a PriorityQueue, the lowest valued entries are retrieved first.
# We're using depth as the priority in the queue.
# Depth is lowest at the oldest-in-time message and highest and
# newest-in-time message. We add events to the queue with a negative depth so that
# we process the newest-in-time messages first going backwards in time.
queue = PriorityQueue()
for event_id in event_list:
@ -996,9 +1030,36 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
txn.execute(query, (event_id, False, limit - len(event_results)))
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
)
connected_insertion_event_id_results = list(txn)
logger.info(
"connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
for row in connected_insertion_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
for row in txn:
# Find any chunk connections for the given insertion event
txn.execute(
chunk_connection_query, (row[1], limit - len(event_results))
)
chunk_start_event_id_results = list(txn)
logger.info(
"chunk_start_event_id_results %s",
chunk_start_event_id_results,
)
for row in chunk_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = list(txn)
logger.info("prev_event_ids %s", prev_event_id_results)
for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))

View File

@ -1506,6 +1506,7 @@ class PersistEventsStore:
self._handle_insertion_event(txn, event)
self._handle_marker_event(txn, event)
self._handle_chunk_id(txn, event)
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
@ -1773,10 +1774,27 @@ class PersistEventsStore:
logger.info("_handle_insertion_event %s", event)
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
if next_chunk_id is None:
# Invalid insertion event without next chunk ID
return
# Keep track of the insertion event and the chunk ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"insertion_event_id": event.event_id,
"room_id": event.room_id,
"next_chunk_id": next_chunk_id,
},
)
# Insert an edge for every prev_event connection
for prev_event_id in event.prev_events:
self.db_pool.simple_insert_txn(
txn,
table="insertion_event_extremeties",
table="insertion_event_edges",
values={
"insertion_event_id": event.event_id,
"room_id": event.room_id,
@ -1798,26 +1816,55 @@ class PersistEventsStore:
logger.info("_handle_marker_event %s", event)
insertion_event_id = event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
)
insertion_prev_event_ids = event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS
)
if not insertion_event_id or not insertion_prev_event_ids:
# Invalid marker event
# TODO: We should attempt to backfill the insertion event instead
# of trying to pack all of the info in the marker event. Otherwise,
# we need to pack in the insertion_prev_events and insertion_next_chunk_id.
# insertion_event_id = event.content.get(
# EventContentFields.MSC2716_MARKER_INSERTION
# )
# insertion_prev_event_ids = event.content.get(
# EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS
# )
# if not insertion_event_id or not insertion_prev_event_ids:
# # Invalid marker event
# return
# for prev_event_id in insertion_prev_event_ids:
# self.db_pool.simple_insert_txn(
# txn,
# table="insertion_event_edges",
# values={
# "insertion_event_id": insertion_event_id,
# "room_id": event.room_id,
# "insertion_prev_event_id": prev_event_id,
# },
# )
def _handle_chunk_id(self, txn, event):
"""Handles inserting the chunk connections between the event at the
start of a chunk and an insertion event
Args: txn event (EventBase)
"""
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
if chunk_id is None:
# No chunk connection to persist
return
for prev_event_id in insertion_prev_event_ids:
self.db_pool.simple_insert_txn(
txn,
table="insertion_event_extremeties",
values={
"insertion_event_id": insertion_event_id,
"room_id": event.room_id,
"insertion_prev_event_id": prev_event_id,
},
)
logger.info("_handle_chunk_id %s %s", chunk_id, event)
# Keep track of the insertion event and the chunk ID
self.db_pool.simple_insert_txn(
txn,
table="chunk_edges",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"chunk_id": chunk_id,
},
)
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove

View File

@ -18,23 +18,33 @@
-- and we hit an event which matches `insertion_prev_event_id`, it should backfill
-- the "insertion" event and start navigating from there.
CREATE TABLE IF NOT EXISTS insertion_events(
insertion_event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
next_chunk_id TEXT NOT NULL,
UNIQUE (insertion_event_id, room_id, next_chunk_id)
);
CREATE TABLE IF NOT EXISTS insertion_event_extremeties(
CREATE INDEX IF NOT EXISTS insertion_events_insertion_room_id ON insertion_events(room_id);
CREATE INDEX IF NOT EXISTS insertion_events_insertion_event_id ON insertion_events(insertion_event_id);
CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);
CREATE TABLE IF NOT EXISTS insertion_event_edges(
insertion_event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
insertion_prev_event_id TEXT NOT NULL,
UNIQUE (insertion_event_id, room_id, room_id, insertion_prev_event_id)
UNIQUE (insertion_event_id, room_id, insertion_prev_event_id)
);
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_room_id ON insertion_event_extremeties(room_id);
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_event_id ON insertion_event_extremeties(insertion_event_id);
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_prev_event_id ON insertion_event_extremeties(insertion_prev_event_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_event_id ON insertion_event_edges(insertion_event_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
CREATE TABLE IF NOT EXISTS chunk_connections(
CREATE TABLE IF NOT EXISTS chunk_edges(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
chunk_id TEXT NOT NULL,
UNIQUE (event_id, room_id)
);
CREATE INDEX IF NOT EXISTS chunk_connections_insertion_chunk_id ON chunk_connections(chunk_id);
CREATE INDEX IF NOT EXISTS chunk_edges_chunk_id ON chunk_edges(chunk_id);