Scope transaction IDs to rooms
parent
9503e172f3
commit
46d49198b2
|
@ -711,7 +711,10 @@ class EventCreationHandler:
|
|||
with (await self.limiter.queue(event_dict["room_id"])):
|
||||
if txn_id and requester.access_token_id:
|
||||
existing_event_id = await self.store.get_event_id_from_transaction_id(
|
||||
requester.user.to_string(), requester.access_token_id, txn_id,
|
||||
event_dict["room_id"],
|
||||
requester.user.to_string(),
|
||||
requester.access_token_id,
|
||||
txn_id,
|
||||
)
|
||||
if existing_event_id:
|
||||
event = await self.store.get_event(existing_event_id)
|
||||
|
|
|
@ -176,7 +176,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||
# do it up front for efficiency.)
|
||||
if txn_id and requester.access_token_id:
|
||||
existing_event_id = await self.store.get_event_id_from_transaction_id(
|
||||
requester.user.to_string(), requester.access_token_id, txn_id,
|
||||
room_id, requester.user.to_string(), requester.access_token_id, txn_id,
|
||||
)
|
||||
if existing_event_id:
|
||||
event_pos = await self.store.get_position_for_event(existing_event_id)
|
||||
|
|
|
@ -423,6 +423,7 @@ class PersistEventsStore:
|
|||
to_insert.append(
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"user_id": event.sender,
|
||||
"token_id": token_id,
|
||||
"txn_id": txn_id,
|
||||
|
|
|
@ -1314,14 +1314,19 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
async def get_event_id_from_transaction_id(
|
||||
self, user_id: str, token_id: int, txn_id: str
|
||||
self, room_id: str, user_id: str, token_id: int, txn_id: str
|
||||
) -> Optional[str]:
|
||||
"""Look up if we have already persisted an event for the transaction ID,
|
||||
returning the event ID if so.
|
||||
"""
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_txn_id",
|
||||
keyvalues={"user_id": user_id, "token_id": token_id, "txn_id": txn_id},
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"user_id": user_id,
|
||||
"token_id": token_id,
|
||||
"txn_id": txn_id,
|
||||
},
|
||||
retcol="event_id",
|
||||
allow_none=True,
|
||||
desc="get_event_id_from_transaction_id",
|
||||
|
@ -1342,7 +1347,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
txn_id = getattr(event.internal_metadata, "txn_id", None)
|
||||
if token_id and txn_id:
|
||||
existing = await self.get_event_id_from_transaction_id(
|
||||
event.sender, token_id, txn_id
|
||||
event.room_id, event.sender, token_id, txn_id
|
||||
)
|
||||
if existing:
|
||||
mapping[event.event_id] = existing
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
-- events or access token we don't want to try and de-duplicate the event.
|
||||
CREATE TABLE IF NOT EXISTS event_txn_id (
|
||||
event_id TEXT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
token_id BIGINT NOT NULL,
|
||||
txn_id TEXT NOT NULL,
|
||||
|
@ -35,5 +36,5 @@ CREATE TABLE IF NOT EXISTS event_txn_id (
|
|||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(user_id, token_id, txn_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id);
|
||||
CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts);
|
||||
|
|
Loading…
Reference in New Issue