diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f2ff4e5a18..9da47a0b0e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -686,7 +686,7 @@ class EventCreationHandler: ) return ( prev_event, - await self.store.get_stream_id_for_event(prev_event.event_id), + prev_event.internal_metadata.stream_ordering, ) return await self.handle_new_client_event( diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 97e63a1dfc..d656957942 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -285,6 +285,14 @@ class EventsPersistenceStorage: return event, pos, self.main_store.get_room_max_token() def _maybe_start_persisting(self, room_id: str): + """Pokes the `_event_persist_queue` to start handling new items in the + queue, if not already in progress. + + Causes the deferreds returned by `add_to_queue` to resolve with: a + dictionary of event ID to event ID we didn't persist as we already had + another event persisted with the same TXN ID. + """ + async def persisting_queue(item): with Measure(self._clock, "persist_events"): return await self._persist_events(