Filter through the potentially deduplicated events
parent
29d4b885ed
commit
9b68b6341e
|
@ -2966,17 +2966,20 @@ class FederationHandler(BaseHandler):
|
||||||
return result["max_stream_id"]
|
return result["max_stream_id"]
|
||||||
else:
|
else:
|
||||||
assert self.storage.persistence
|
assert self.storage.persistence
|
||||||
max_stream_token = await self.storage.persistence.persist_events(
|
|
||||||
|
# Note that this returns the events that wer persisted, which may not be
|
||||||
|
# the same as we passed in if some were deduplicated due transaction IDs.
|
||||||
|
events, max_stream_token = await self.storage.persistence.persist_events(
|
||||||
event_and_contexts, backfilled=backfilled
|
event_and_contexts, backfilled=backfilled
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._ephemeral_messages_enabled:
|
if self._ephemeral_messages_enabled:
|
||||||
for (event, context) in event_and_contexts:
|
for event in events:
|
||||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||||
self._message_handler.maybe_schedule_expiry(event)
|
self._message_handler.maybe_schedule_expiry(event)
|
||||||
|
|
||||||
if not backfilled: # Never notify for backfilled events
|
if not backfilled: # Never notify for backfilled events
|
||||||
for event, _ in event_and_contexts:
|
for event in events:
|
||||||
await self._notify_persisted_event(event, max_stream_token)
|
await self._notify_persisted_event(event, max_stream_token)
|
||||||
|
|
||||||
return max_stream_token.stream
|
return max_stream_token.stream
|
||||||
|
|
|
@ -642,7 +642,7 @@ class EventCreationHandler:
|
||||||
context: EventContext,
|
context: EventContext,
|
||||||
ratelimit: bool = True,
|
ratelimit: bool = True,
|
||||||
ignore_shadow_ban: bool = False,
|
ignore_shadow_ban: bool = False,
|
||||||
) -> int:
|
) -> Tuple[EventBase, int]:
|
||||||
"""
|
"""
|
||||||
Persists and notifies local clients and federation of an event.
|
Persists and notifies local clients and federation of an event.
|
||||||
|
|
||||||
|
@ -654,8 +654,10 @@ class EventCreationHandler:
|
||||||
ignore_shadow_ban: True if shadow-banned users should be allowed to
|
ignore_shadow_ban: True if shadow-banned users should be allowed to
|
||||||
send this event.
|
send this event.
|
||||||
|
|
||||||
Return:
|
Returns:
|
||||||
The stream_id of the persisted event.
|
The event and stream_id of the persisted event. The returned event
|
||||||
|
may not match the given event if it was deduplicated due to an
|
||||||
|
existing event matching the transaction ID.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ShadowBanError if the requester has been shadow-banned.
|
ShadowBanError if the requester has been shadow-banned.
|
||||||
|
@ -682,7 +684,10 @@ class EventCreationHandler:
|
||||||
event.event_id,
|
event.event_id,
|
||||||
prev_event.event_id,
|
prev_event.event_id,
|
||||||
)
|
)
|
||||||
return await self.store.get_stream_id_for_event(prev_event.event_id)
|
return (
|
||||||
|
prev_event,
|
||||||
|
await self.store.get_stream_id_for_event(prev_event.event_id),
|
||||||
|
)
|
||||||
|
|
||||||
return await self.handle_new_client_event(
|
return await self.handle_new_client_event(
|
||||||
requester=requester, event=event, context=context, ratelimit=ratelimit
|
requester=requester, event=event, context=context, ratelimit=ratelimit
|
||||||
|
@ -738,6 +743,11 @@ class EventCreationHandler:
|
||||||
ignore_shadow_ban: True if shadow-banned users should be allowed to
|
ignore_shadow_ban: True if shadow-banned users should be allowed to
|
||||||
send this event.
|
send this event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The event and stream_id of the persisted event. The returned event
|
||||||
|
may not match the given event if it was deduplicated due to an
|
||||||
|
existing event matching the transaction ID.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ShadowBanError if the requester has been shadow-banned.
|
ShadowBanError if the requester has been shadow-banned.
|
||||||
"""
|
"""
|
||||||
|
@ -770,7 +780,7 @@ class EventCreationHandler:
|
||||||
spam_error = "Spam is not permitted here"
|
spam_error = "Spam is not permitted here"
|
||||||
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
|
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
|
||||||
|
|
||||||
stream_id = await self.send_nonmember_event(
|
event, stream_id = await self.send_nonmember_event(
|
||||||
requester,
|
requester,
|
||||||
event,
|
event,
|
||||||
context,
|
context,
|
||||||
|
@ -851,7 +861,7 @@ class EventCreationHandler:
|
||||||
context: EventContext,
|
context: EventContext,
|
||||||
ratelimit: bool = True,
|
ratelimit: bool = True,
|
||||||
extra_users: List[UserID] = [],
|
extra_users: List[UserID] = [],
|
||||||
) -> int:
|
) -> Tuple[EventBase, int]:
|
||||||
"""Processes a new event. This includes checking auth, persisting it,
|
"""Processes a new event. This includes checking auth, persisting it,
|
||||||
notifying users, sending to remote servers, etc.
|
notifying users, sending to remote servers, etc.
|
||||||
|
|
||||||
|
@ -865,8 +875,10 @@ class EventCreationHandler:
|
||||||
ratelimit
|
ratelimit
|
||||||
extra_users: Any extra users to notify about event
|
extra_users: Any extra users to notify about event
|
||||||
|
|
||||||
Return:
|
Returns:
|
||||||
The stream_id of the persisted event.
|
The event and stream_id of the persisted event. The returned event
|
||||||
|
may not match the given event if it was deduplicated due to an
|
||||||
|
existing event matching the transaction ID.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if event.is_state() and (event.type, event.state_key) == (
|
if event.is_state() and (event.type, event.state_key) == (
|
||||||
|
@ -922,14 +934,17 @@ class EventCreationHandler:
|
||||||
extra_users=extra_users,
|
extra_users=extra_users,
|
||||||
)
|
)
|
||||||
stream_id = result["stream_id"]
|
stream_id = result["stream_id"]
|
||||||
|
event_id = result["event_id"]
|
||||||
|
if event_id != event.event_id:
|
||||||
|
event = await self.store.get_event(event_id)
|
||||||
event.internal_metadata.stream_ordering = stream_id
|
event.internal_metadata.stream_ordering = stream_id
|
||||||
return stream_id
|
return event, stream_id
|
||||||
|
|
||||||
stream_id = await self.persist_and_notify_client_event(
|
event, stream_id = await self.persist_and_notify_client_event(
|
||||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||||
)
|
)
|
||||||
|
|
||||||
return stream_id
|
return event, stream_id
|
||||||
except Exception:
|
except Exception:
|
||||||
# Ensure that we actually remove the entries in the push actions
|
# Ensure that we actually remove the entries in the push actions
|
||||||
# staging area, if we calculated them.
|
# staging area, if we calculated them.
|
||||||
|
@ -974,7 +989,7 @@ class EventCreationHandler:
|
||||||
context: EventContext,
|
context: EventContext,
|
||||||
ratelimit: bool = True,
|
ratelimit: bool = True,
|
||||||
extra_users: List[UserID] = [],
|
extra_users: List[UserID] = [],
|
||||||
) -> int:
|
) -> Tuple[EventBase, int]:
|
||||||
"""Called when we have fully built the event, have already
|
"""Called when we have fully built the event, have already
|
||||||
calculated the push actions for the event, and checked auth.
|
calculated the push actions for the event, and checked auth.
|
||||||
|
|
||||||
|
@ -1146,9 +1161,13 @@ class EventCreationHandler:
|
||||||
if prev_state_ids:
|
if prev_state_ids:
|
||||||
raise AuthError(403, "Changing the room create event is forbidden")
|
raise AuthError(403, "Changing the room create event is forbidden")
|
||||||
|
|
||||||
event_pos, max_stream_token = await self.storage.persistence.persist_event(
|
# Note that this returns the event that was persisted, which may not be
|
||||||
event, context=context
|
# the same as we passed in if it was deduplicated due transaction IDs.
|
||||||
)
|
(
|
||||||
|
event,
|
||||||
|
event_pos,
|
||||||
|
max_stream_token,
|
||||||
|
) = await self.storage.persistence.persist_event(event, context=context)
|
||||||
|
|
||||||
if self._ephemeral_events_enabled:
|
if self._ephemeral_events_enabled:
|
||||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||||
|
@ -1169,7 +1188,7 @@ class EventCreationHandler:
|
||||||
# matters as sometimes presence code can take a while.
|
# matters as sometimes presence code can take a while.
|
||||||
run_in_background(self._bump_active_time, requester.user)
|
run_in_background(self._bump_active_time, requester.user)
|
||||||
|
|
||||||
return event_pos.stream
|
return event, event_pos.stream
|
||||||
|
|
||||||
async def _bump_active_time(self, user: UserID) -> None:
|
async def _bump_active_time(self, user: UserID) -> None:
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -229,7 +229,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||||
retry_after_ms=int(1000 * (time_allowed - time_now_s))
|
retry_after_ms=int(1000 * (time_allowed - time_now_s))
|
||||||
)
|
)
|
||||||
|
|
||||||
stream_id = await self.event_creation_handler.handle_new_client_event(
|
event, stream_id = await self.event_creation_handler.handle_new_client_event(
|
||||||
requester, event, context, extra_users=[target], ratelimit=ratelimit,
|
requester, event, context, extra_users=[target], ratelimit=ratelimit,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -700,7 +700,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||||
if is_blocked:
|
if is_blocked:
|
||||||
raise SynapseError(403, "This room has been blocked on this server")
|
raise SynapseError(403, "This room has been blocked on this server")
|
||||||
|
|
||||||
await self.event_creation_handler.handle_new_client_event(
|
event, _ = await self.event_creation_handler.handle_new_client_event(
|
||||||
requester, event, context, extra_users=[target_user], ratelimit=ratelimit
|
requester, event, context, extra_users=[target_user], ratelimit=ratelimit
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1193,7 +1193,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||||
|
|
||||||
context = await self.state_handler.compute_event_context(event)
|
context = await self.state_handler.compute_event_context(event)
|
||||||
context.app_service = requester.app_service
|
context.app_service = requester.app_service
|
||||||
stream_id = await self.event_creation_handler.handle_new_client_event(
|
event, stream_id = await self.event_creation_handler.handle_new_client_event(
|
||||||
requester, event, context, extra_users=[UserID.from_string(target_user)],
|
requester, event, context, extra_users=[UserID.from_string(target_user)],
|
||||||
)
|
)
|
||||||
return event.event_id, stream_id
|
return event.event_id, stream_id
|
||||||
|
|
|
@ -116,11 +116,14 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||||
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
|
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
|
||||||
)
|
)
|
||||||
|
|
||||||
stream_id = await self.event_creation_handler.persist_and_notify_client_event(
|
(
|
||||||
|
event,
|
||||||
|
stream_id,
|
||||||
|
) = await self.event_creation_handler.persist_and_notify_client_event(
|
||||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||||
)
|
)
|
||||||
|
|
||||||
return 200, {"stream_id": stream_id}
|
return 200, {"stream_id": stream_id, "event_id": event.event_id}
|
||||||
|
|
||||||
|
|
||||||
def register_servlets(hs, http_server):
|
def register_servlets(hs, http_server):
|
||||||
|
|
|
@ -199,7 +199,7 @@ class EventsPersistenceStorage:
|
||||||
self,
|
self,
|
||||||
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
|
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
|
||||||
backfilled: bool = False,
|
backfilled: bool = False,
|
||||||
) -> RoomStreamToken:
|
) -> Tuple[List[EventBase], RoomStreamToken]:
|
||||||
"""
|
"""
|
||||||
Write events to the database
|
Write events to the database
|
||||||
Args:
|
Args:
|
||||||
|
@ -209,7 +209,11 @@ class EventsPersistenceStorage:
|
||||||
which might update the current state etc.
|
which might update the current state etc.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
the stream ordering of the latest persisted event
|
List of events persisted, the current position room stream position.
|
||||||
|
The list of events persisted may not be the same as those passed in
|
||||||
|
if they were deduplicated due to an event already existing that
|
||||||
|
matched the transcation ID; the existing event is returned in such
|
||||||
|
a case.
|
||||||
"""
|
"""
|
||||||
partitioned = {} # type: Dict[str, List[Tuple[EventBase, EventContext]]]
|
partitioned = {} # type: Dict[str, List[Tuple[EventBase, EventContext]]]
|
||||||
for event, ctx in events_and_contexts:
|
for event, ctx in events_and_contexts:
|
||||||
|
@ -225,19 +229,41 @@ class EventsPersistenceStorage:
|
||||||
for room_id in partitioned:
|
for room_id in partitioned:
|
||||||
self._maybe_start_persisting(room_id)
|
self._maybe_start_persisting(room_id)
|
||||||
|
|
||||||
await make_deferred_yieldable(
|
# The deferred returns a map from event ID to existing event ID if the
|
||||||
|
# event was deduplicated. (The dict may also include other entries if
|
||||||
|
# the event was persisted in a batch with other events).
|
||||||
|
#
|
||||||
|
# Since we use `defer.gatherResults` we need to merge the returned list
|
||||||
|
# of dicts into one.
|
||||||
|
ret_vals = await make_deferred_yieldable(
|
||||||
defer.gatherResults(deferreds, consumeErrors=True)
|
defer.gatherResults(deferreds, consumeErrors=True)
|
||||||
)
|
)
|
||||||
|
replaced_events = {}
|
||||||
|
for d in ret_vals:
|
||||||
|
replaced_events.update(d)
|
||||||
|
|
||||||
return self.main_store.get_room_max_token()
|
events = []
|
||||||
|
for event, _ in events_and_contexts:
|
||||||
|
existing_event_id = replaced_events.get(event.event_id)
|
||||||
|
if existing_event_id:
|
||||||
|
events.append(await self.main_store.get_event(existing_event_id))
|
||||||
|
else:
|
||||||
|
events.append(event)
|
||||||
|
|
||||||
|
return (
|
||||||
|
events,
|
||||||
|
self.main_store.get_room_max_token(),
|
||||||
|
)
|
||||||
|
|
||||||
async def persist_event(
|
async def persist_event(
|
||||||
self, event: EventBase, context: EventContext, backfilled: bool = False
|
self, event: EventBase, context: EventContext, backfilled: bool = False
|
||||||
) -> Tuple[PersistedEventPosition, RoomStreamToken]:
|
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
|
||||||
"""
|
"""
|
||||||
Returns:
|
Returns:
|
||||||
The stream ordering of `event`, and the stream ordering of the
|
The event, stream ordering of `event`, and the stream ordering of the
|
||||||
latest persisted event
|
latest persisted event. The returned event may not match the given
|
||||||
|
event if it was deduplicated due to an existing event matching the
|
||||||
|
transaction ID.
|
||||||
"""
|
"""
|
||||||
deferred = self._event_persist_queue.add_to_queue(
|
deferred = self._event_persist_queue.add_to_queue(
|
||||||
event.room_id, [(event, context)], backfilled=backfilled
|
event.room_id, [(event, context)], backfilled=backfilled
|
||||||
|
@ -245,6 +271,9 @@ class EventsPersistenceStorage:
|
||||||
|
|
||||||
self._maybe_start_persisting(event.room_id)
|
self._maybe_start_persisting(event.room_id)
|
||||||
|
|
||||||
|
# The deferred returns a map from event ID to existing event ID if the
|
||||||
|
# event was deduplicated. (The dict may also include other entries if
|
||||||
|
# the event was persisted in a batch with other events.)
|
||||||
replaced_events = await make_deferred_yieldable(deferred)
|
replaced_events = await make_deferred_yieldable(deferred)
|
||||||
replaced_event = replaced_events.get(event.event_id)
|
replaced_event = replaced_events.get(event.event_id)
|
||||||
if replaced_event:
|
if replaced_event:
|
||||||
|
@ -253,12 +282,12 @@ class EventsPersistenceStorage:
|
||||||
event_stream_id = event.internal_metadata.stream_ordering
|
event_stream_id = event.internal_metadata.stream_ordering
|
||||||
|
|
||||||
pos = PersistedEventPosition(self._instance_name, event_stream_id)
|
pos = PersistedEventPosition(self._instance_name, event_stream_id)
|
||||||
return pos, self.main_store.get_room_max_token()
|
return event, pos, self.main_store.get_room_max_token()
|
||||||
|
|
||||||
def _maybe_start_persisting(self, room_id: str):
|
def _maybe_start_persisting(self, room_id: str):
|
||||||
async def persisting_queue(item):
|
async def persisting_queue(item):
|
||||||
with Measure(self._clock, "persist_events"):
|
with Measure(self._clock, "persist_events"):
|
||||||
await self._persist_events(
|
return await self._persist_events(
|
||||||
item.events_and_contexts, backfilled=item.backfilled
|
item.events_and_contexts, backfilled=item.backfilled
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue