Add check to EDUs and move PDUs check in the event storage controller
parent
7c224e149b
commit
7ad75e6d20
|
@ -1033,6 +1033,10 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
||||||
)
|
)
|
||||||
prev_ids = [str(p) for p in prev_ids] # They may come as ints
|
prev_ids = [str(p) for p in prev_ids] # They may come as ints
|
||||||
|
|
||||||
|
# The result of `validate` is not used yet because for now we only want to
|
||||||
|
# log invalid mxids in the wild.
|
||||||
|
UserID.from_string(user_id).validate(allow_historical_mxids=True)
|
||||||
|
|
||||||
if get_domain_from_id(user_id) != origin:
|
if get_domain_from_id(user_id) != origin:
|
||||||
# TODO: Raise?
|
# TODO: Raise?
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
|
|
@ -110,6 +110,10 @@ class DeviceMessageHandler:
|
||||||
origin,
|
origin,
|
||||||
sender_user_id,
|
sender_user_id,
|
||||||
)
|
)
|
||||||
|
# The result of `validate` is not used yet because for now we only want to
|
||||||
|
# log invalid mxids in the wild.
|
||||||
|
UserID.from_string(sender_user_id).validate(allow_historical_mxids=True)
|
||||||
|
|
||||||
message_type = content["type"]
|
message_type = content["type"]
|
||||||
message_id = content["message_id"]
|
message_id = content["message_id"]
|
||||||
for user_id, by_device in content["messages"].items():
|
for user_id, by_device in content["messages"].items():
|
||||||
|
|
|
@ -1593,6 +1593,10 @@ class SigningKeyEduUpdater:
|
||||||
logger.warning("Got signing key update edu for %r from %r", user_id, origin)
|
logger.warning("Got signing key update edu for %r from %r", user_id, origin)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# The result of `validate` is not used yet because for now we only want to
|
||||||
|
# log invalid mxids in the wild.
|
||||||
|
UserID.from_string(user_id).validate(allow_historical_mxids=True)
|
||||||
|
|
||||||
room_ids = await self.store.get_rooms_for_user(user_id)
|
room_ids = await self.store.get_rooms_for_user(user_id)
|
||||||
if not room_ids:
|
if not room_ids:
|
||||||
# We don't share any rooms with this user. Ignore update, as we
|
# We don't share any rooms with this user. Ignore update, as we
|
||||||
|
|
|
@ -117,6 +117,10 @@ class ReceiptsHandler:
|
||||||
max_batch_id: Optional[int] = None
|
max_batch_id: Optional[int] = None
|
||||||
|
|
||||||
for receipt in receipts:
|
for receipt in receipts:
|
||||||
|
# The result of `validate` is not used yet because for now we only want to
|
||||||
|
# log invalid mxids in the wild.
|
||||||
|
UserID.from_string(receipt.user_id).validate(allow_historical_mxids=True)
|
||||||
|
|
||||||
res = await self.store.insert_receipt(
|
res = await self.store.insert_receipt(
|
||||||
receipt.room_id,
|
receipt.room_id,
|
||||||
receipt.receipt_type,
|
receipt.receipt_type,
|
||||||
|
|
|
@ -362,6 +362,10 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||||
room_id = content["room_id"]
|
room_id = content["room_id"]
|
||||||
user_id = content["user_id"]
|
user_id = content["user_id"]
|
||||||
|
|
||||||
|
# The result of `validate` is not used yet because for now we only want to
|
||||||
|
# log invalid mxids in the wild.
|
||||||
|
UserID.from_string(user_id).validate(allow_historical_mxids=True)
|
||||||
|
|
||||||
# If we're not in the room just ditch the event entirely. This is
|
# If we're not in the room just ditch the event entirely. This is
|
||||||
# probably an old server that has come back and thinks we're still in
|
# probably an old server that has come back and thinks we're still in
|
||||||
# the room (or we've been rejoined to the room by a state reset).
|
# the room (or we've been rejoined to the room by a state reset).
|
||||||
|
|
|
@ -63,6 +63,7 @@ from synapse.types import (
|
||||||
PersistedEventPosition,
|
PersistedEventPosition,
|
||||||
RoomStreamToken,
|
RoomStreamToken,
|
||||||
StateMap,
|
StateMap,
|
||||||
|
UserID,
|
||||||
get_domain_from_id,
|
get_domain_from_id,
|
||||||
)
|
)
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
|
@ -397,6 +398,10 @@ class EventsPersistenceStorageController:
|
||||||
event_ids: List[str] = []
|
event_ids: List[str] = []
|
||||||
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
||||||
for event, ctx in events_and_contexts:
|
for event, ctx in events_and_contexts:
|
||||||
|
# The result of `validate` is not used yet because for now we only want to
|
||||||
|
# log invalid mxids in the wild.
|
||||||
|
UserID.from_string(event.user_id).validate(allow_historical_mxids=True)
|
||||||
|
|
||||||
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
||||||
event_ids.append(event.event_id)
|
event_ids.append(event.event_id)
|
||||||
|
|
||||||
|
|
|
@ -393,10 +393,6 @@ class PersistEventsStore:
|
||||||
# Once the txn completes, invalidate all of the relevant caches. Note that we do this
|
# Once the txn completes, invalidate all of the relevant caches. Note that we do this
|
||||||
# up here because it captures all the events_and_contexts before any are removed.
|
# up here because it captures all the events_and_contexts before any are removed.
|
||||||
for event, _ in events_and_contexts:
|
for event, _ in events_and_contexts:
|
||||||
sender = UserID.from_string(event.sender)
|
|
||||||
# The result of `validate` is not used yet because for now we only want to
|
|
||||||
# log invalid mxids in the wild.
|
|
||||||
sender.validate(allow_historical_mxids=True)
|
|
||||||
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
||||||
if event.redacts:
|
if event.redacts:
|
||||||
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
|
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
|
||||||
|
|
Loading…
Reference in New Issue