Split out federated PDU retrieval into a non-cached version (#11242)
Context: https://github.com/matrix-org/synapse/pull/11114/files#r741643968pull/11292/head
parent
0ef69ddbdc
commit
f1d5c2f269
|
@ -0,0 +1 @@
|
||||||
|
Split out federated PDU retrieval function into a non-cached version.
|
|
@ -277,6 +277,58 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
return pdus
|
return pdus
|
||||||
|
|
||||||
|
async def get_pdu_from_destination_raw(
|
||||||
|
self,
|
||||||
|
destination: str,
|
||||||
|
event_id: str,
|
||||||
|
room_version: RoomVersion,
|
||||||
|
outlier: bool = False,
|
||||||
|
timeout: Optional[int] = None,
|
||||||
|
) -> Optional[EventBase]:
|
||||||
|
"""Requests the PDU with given origin and ID from the remote home
|
||||||
|
server. Does not have any caching or rate limiting!
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination: Which homeserver to query
|
||||||
|
event_id: event to fetch
|
||||||
|
room_version: version of the room
|
||||||
|
outlier: Indicates whether the PDU is an `outlier`, i.e. if
|
||||||
|
it's from an arbitrary point in the context as opposed to part
|
||||||
|
of the current block of PDUs. Defaults to `False`
|
||||||
|
timeout: How long to try (in ms) each destination for before
|
||||||
|
moving to the next destination. None indicates no timeout.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The requested PDU, or None if we were unable to find it.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SynapseError, NotRetryingDestination, FederationDeniedError
|
||||||
|
"""
|
||||||
|
transaction_data = await self.transport_layer.get_event(
|
||||||
|
destination, event_id, timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"retrieved event id %s from %s: %r",
|
||||||
|
event_id,
|
||||||
|
destination,
|
||||||
|
transaction_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
pdu_list: List[EventBase] = [
|
||||||
|
event_from_pdu_json(p, room_version, outlier=outlier)
|
||||||
|
for p in transaction_data["pdus"]
|
||||||
|
]
|
||||||
|
|
||||||
|
if pdu_list and pdu_list[0]:
|
||||||
|
pdu = pdu_list[0]
|
||||||
|
|
||||||
|
# Check signatures are correct.
|
||||||
|
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
|
||||||
|
return signed_pdu
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
async def get_pdu(
|
async def get_pdu(
|
||||||
self,
|
self,
|
||||||
destinations: Iterable[str],
|
destinations: Iterable[str],
|
||||||
|
@ -321,30 +373,14 @@ class FederationClient(FederationBase):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
transaction_data = await self.transport_layer.get_event(
|
signed_pdu = await self.get_pdu_from_destination_raw(
|
||||||
destination, event_id, timeout=timeout
|
destination=destination,
|
||||||
|
event_id=event_id,
|
||||||
|
room_version=room_version,
|
||||||
|
outlier=outlier,
|
||||||
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
"retrieved event id %s from %s: %r",
|
|
||||||
event_id,
|
|
||||||
destination,
|
|
||||||
transaction_data,
|
|
||||||
)
|
|
||||||
|
|
||||||
pdu_list: List[EventBase] = [
|
|
||||||
event_from_pdu_json(p, room_version, outlier=outlier)
|
|
||||||
for p in transaction_data["pdus"]
|
|
||||||
]
|
|
||||||
|
|
||||||
if pdu_list and pdu_list[0]:
|
|
||||||
pdu = pdu_list[0]
|
|
||||||
|
|
||||||
# Check signatures are correct.
|
|
||||||
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
|
|
||||||
|
|
||||||
break
|
|
||||||
|
|
||||||
pdu_attempts[destination] = now
|
pdu_attempts[destination] = now
|
||||||
|
|
||||||
except SynapseError as e:
|
except SynapseError as e:
|
||||||
|
|
Loading…
Reference in New Issue