Correctly deduplicate events with same transaction ID.

Fixes #3365.
pull/8476/head
Erik Johnston 2020-10-06 16:04:45 +01:00
parent 9de6e9e249
commit 29d4b885ed
6 changed files with 169 additions and 4 deletions

View File

@ -752,6 +752,14 @@ class EventCreationHandler:
# extremities to pile up, which in turn leads to state resolution
# taking longer.
with (await self.limiter.queue(event_dict["room_id"])):
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
requester.user.to_string(), requester.access_token_id, txn_id,
)
if existing_event_id:
event = await self.store.get_event(existing_event_id)
return event, event.internal_metadata.stream_ordering
event, context = await self.create_event(
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)

View File

@ -171,6 +171,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if requester.is_guest:
content["kind"] = "guest"
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
requester.user.to_string(), requester.access_token_id, txn_id,
)
if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
event, context = await self.event_creation_handler.create_event(
requester,
{

View File

@ -367,6 +367,8 @@ class PersistEventsStore:
self._store_event_txn(txn, events_and_contexts=events_and_contexts)
self._persist_transaction_ids_txn(txn, events_and_contexts)
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
@ -411,6 +413,34 @@ class PersistEventsStore:
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
def _persist_transaction_ids_txn(
self,
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
):
"""Persist the mapping from transaction IDs to event IDs (if defined).
"""
to_insert = []
for event, _ in events_and_contexts:
token_id = getattr(event.internal_metadata, "token_id", None)
txn_id = getattr(event.internal_metadata, "txn_id", None)
if token_id and txn_id:
to_insert.append(
{
"event_id": event.event_id,
"user_id": event.sender,
"token_id": token_id,
"txn_id": txn_id,
"inserted_ts": self._clock.time_msec(),
}
)
if to_insert:
self.db_pool.simple_insert_many_txn(
txn, table="event_txn_id", values=to_insert,
)
def _update_current_state_txn(
self,
txn: LoggingTransaction,

View File

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import threading
@ -130,6 +129,15 @@ class EventsWorkerStore(SQLBaseStore):
db_conn, "events", "stream_ordering", step=-1
)
if not hs.config.worker.worker_app:
# We periodically clean out old transaction ID mappings
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
"_cleanup_old_transaction_ids",
self._cleanup_old_transaction_ids,
)
self._get_event_cache = Cache(
"*getEvent*",
keylen=3,
@ -1287,3 +1295,55 @@ class EventsWorkerStore(SQLBaseStore):
return await self.db_pool.runInteraction(
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)
async def get_event_id_from_transaction_id(
self, user_id: str, token_id: str, txn_id: str
) -> Optional[str]:
"""Look up if we have already persisted an event for the transaction ID,
returning the event ID if so.
"""
return await self.db_pool.simple_select_one_onecol(
table="event_txn_id",
keyvalues={"user_id": user_id, "token_id": token_id, "txn_id": txn_id},
retcol="event_id",
allow_none=True,
desc="get_event_id_from_transaction_id",
)
async def get_already_persisted_events(
self, events: Iterable[EventBase]
) -> Dict[str, str]:
"""Look up if we have already persisted an event for the transaction ID,
returning a mapping from event ID in the given list to the event ID of
an existing event.
"""
mapping = {}
for event in events:
token_id = getattr(event.internal_metadata, "token_id", None)
txn_id = getattr(event.internal_metadata, "txn_id", None)
if token_id and txn_id:
existing = await self.get_event_id_from_transaction_id(
event.sender, token_id, txn_id
)
if existing:
mapping[event.event_id] = existing
return mapping
async def _cleanup_old_transaction_ids(self):
"""Cleans out transaction id mappings older than 24hrs.
"""
def _cleanup_old_transaction_ids_txn(txn):
sql = """
DELETE FROM event_txn_id
WHERE inserted_ts < ?
"""
one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000
txn.execute(sql, (one_day_ago,))
return await self.db_pool.runInteraction(
"_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn,
)

View File

@ -0,0 +1,29 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- A map of recent events persisted with transaction IDs. Used to deduplicate
-- send event requests with the same transaction ID.
CREATE TABLE event_txn_id (
event_id TEXT NOT NULL,
user_id TEXT NOT NULL,
token_id BIGINT NOT NULL,
txn_id TEXT NOT NULL,
inserted_ts BIGINT NOT NULL
);
CREATE UNIQUE INDEX event_txn_id_event_id ON event_txn_id(event_id);
CREATE UNIQUE INDEX event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id);
CREATE INDEX event_txn_id_ts ON event_txn_id(inserted_ts);

View File

@ -245,7 +245,10 @@ class EventsPersistenceStorage:
self._maybe_start_persisting(event.room_id)
await make_deferred_yieldable(deferred)
replaced_events = await make_deferred_yieldable(deferred)
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
event_stream_id = event.internal_metadata.stream_ordering
@ -265,12 +268,37 @@ class EventsPersistenceStorage:
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
):
) -> Dict[str, str]:
"""Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Returns:
A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID.
"""
replaced_events = {} # type: Dict[str, str]
if not events_and_contexts:
return
return replaced_events
# Check if any of the events have a transaction ID that has already been
# persitsed, and if so we don't persist again.
#
# We should have checked this a long time before we get here, but its
# possible that different send event requests race in such a way that
# they both pass the earlier checks.
replaced_events = await self.main_store.get_already_persisted_events(
(event for event, _ in events_and_contexts)
)
if replaced_events:
events_and_contexts = [
(e, ctx)
for e, ctx in events_and_contexts
if e.event_id not in replaced_events
]
if not events_and_contexts:
return replaced_events
chunks = [
events_and_contexts[x : x + 100]
@ -439,6 +467,8 @@ class EventsPersistenceStorage:
await self._handle_potentially_left_users(potentially_left_users)
return replaced_events
async def _calculate_new_extremities(
self,
room_id: str,