Fix a bug in the federation API which could cause occasional "Failed to get PDU" errors (#7089).
parent
a319cb1dd1
commit
c2db6599c8
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug in the federation API which could cause occasional "Failed to get PDU" errors.
|
|
@ -25,11 +25,7 @@ from twisted.python.failure import Failure
|
||||||
|
|
||||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
from synapse.api.room_versions import (
|
from synapse.api.room_versions import EventFormatVersions, RoomVersion
|
||||||
KNOWN_ROOM_VERSIONS,
|
|
||||||
EventFormatVersions,
|
|
||||||
RoomVersion,
|
|
||||||
)
|
|
||||||
from synapse.crypto.event_signing import check_event_content_hash
|
from synapse.crypto.event_signing import check_event_content_hash
|
||||||
from synapse.crypto.keyring import Keyring
|
from synapse.crypto.keyring import Keyring
|
||||||
from synapse.events import EventBase, make_event_from_dict
|
from synapse.events import EventBase, make_event_from_dict
|
||||||
|
@ -55,13 +51,15 @@ class FederationBase(object):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
|
|
||||||
def _check_sigs_and_hash(self, room_version: str, pdu: EventBase) -> Deferred:
|
def _check_sigs_and_hash(
|
||||||
|
self, room_version: RoomVersion, pdu: EventBase
|
||||||
|
) -> Deferred:
|
||||||
return make_deferred_yieldable(
|
return make_deferred_yieldable(
|
||||||
self._check_sigs_and_hashes(room_version, [pdu])[0]
|
self._check_sigs_and_hashes(room_version, [pdu])[0]
|
||||||
)
|
)
|
||||||
|
|
||||||
def _check_sigs_and_hashes(
|
def _check_sigs_and_hashes(
|
||||||
self, room_version: str, pdus: List[EventBase]
|
self, room_version: RoomVersion, pdus: List[EventBase]
|
||||||
) -> List[Deferred]:
|
) -> List[Deferred]:
|
||||||
"""Checks that each of the received events is correctly signed by the
|
"""Checks that each of the received events is correctly signed by the
|
||||||
sending server.
|
sending server.
|
||||||
|
@ -146,7 +144,7 @@ class PduToCheckSig(
|
||||||
|
|
||||||
|
|
||||||
def _check_sigs_on_pdus(
|
def _check_sigs_on_pdus(
|
||||||
keyring: Keyring, room_version: str, pdus: Iterable[EventBase]
|
keyring: Keyring, room_version: RoomVersion, pdus: Iterable[EventBase]
|
||||||
) -> List[Deferred]:
|
) -> List[Deferred]:
|
||||||
"""Check that the given events are correctly signed
|
"""Check that the given events are correctly signed
|
||||||
|
|
||||||
|
@ -191,10 +189,6 @@ def _check_sigs_on_pdus(
|
||||||
for p in pdus
|
for p in pdus
|
||||||
]
|
]
|
||||||
|
|
||||||
v = KNOWN_ROOM_VERSIONS.get(room_version)
|
|
||||||
if not v:
|
|
||||||
raise RuntimeError("Unrecognized room version %s" % (room_version,))
|
|
||||||
|
|
||||||
# First we check that the sender event is signed by the sender's domain
|
# First we check that the sender event is signed by the sender's domain
|
||||||
# (except if its a 3pid invite, in which case it may be sent by any server)
|
# (except if its a 3pid invite, in which case it may be sent by any server)
|
||||||
pdus_to_check_sender = [p for p in pdus_to_check if not _is_invite_via_3pid(p.pdu)]
|
pdus_to_check_sender = [p for p in pdus_to_check if not _is_invite_via_3pid(p.pdu)]
|
||||||
|
@ -204,7 +198,7 @@ def _check_sigs_on_pdus(
|
||||||
(
|
(
|
||||||
p.sender_domain,
|
p.sender_domain,
|
||||||
p.redacted_pdu_json,
|
p.redacted_pdu_json,
|
||||||
p.pdu.origin_server_ts if v.enforce_key_validity else 0,
|
p.pdu.origin_server_ts if room_version.enforce_key_validity else 0,
|
||||||
p.pdu.event_id,
|
p.pdu.event_id,
|
||||||
)
|
)
|
||||||
for p in pdus_to_check_sender
|
for p in pdus_to_check_sender
|
||||||
|
@ -227,7 +221,7 @@ def _check_sigs_on_pdus(
|
||||||
# event id's domain (normally only the case for joins/leaves), and add additional
|
# event id's domain (normally only the case for joins/leaves), and add additional
|
||||||
# checks. Only do this if the room version has a concept of event ID domain
|
# checks. Only do this if the room version has a concept of event ID domain
|
||||||
# (ie, the room version uses old-style non-hash event IDs).
|
# (ie, the room version uses old-style non-hash event IDs).
|
||||||
if v.event_format == EventFormatVersions.V1:
|
if room_version.event_format == EventFormatVersions.V1:
|
||||||
pdus_to_check_event_id = [
|
pdus_to_check_event_id = [
|
||||||
p
|
p
|
||||||
for p in pdus_to_check
|
for p in pdus_to_check
|
||||||
|
@ -239,7 +233,7 @@ def _check_sigs_on_pdus(
|
||||||
(
|
(
|
||||||
get_domain_from_id(p.pdu.event_id),
|
get_domain_from_id(p.pdu.event_id),
|
||||||
p.redacted_pdu_json,
|
p.redacted_pdu_json,
|
||||||
p.pdu.origin_server_ts if v.enforce_key_validity else 0,
|
p.pdu.origin_server_ts if room_version.enforce_key_validity else 0,
|
||||||
p.pdu.event_id,
|
p.pdu.event_id,
|
||||||
)
|
)
|
||||||
for p in pdus_to_check_event_id
|
for p in pdus_to_check_event_id
|
||||||
|
|
|
@ -220,8 +220,7 @@ class FederationClient(FederationBase):
|
||||||
# FIXME: We should handle signature failures more gracefully.
|
# FIXME: We should handle signature failures more gracefully.
|
||||||
pdus[:] = await make_deferred_yieldable(
|
pdus[:] = await make_deferred_yieldable(
|
||||||
defer.gatherResults(
|
defer.gatherResults(
|
||||||
self._check_sigs_and_hashes(room_version.identifier, pdus),
|
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True,
|
||||||
consumeErrors=True,
|
|
||||||
).addErrback(unwrapFirstError)
|
).addErrback(unwrapFirstError)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -291,9 +290,7 @@ class FederationClient(FederationBase):
|
||||||
pdu = pdu_list[0]
|
pdu = pdu_list[0]
|
||||||
|
|
||||||
# Check signatures are correct.
|
# Check signatures are correct.
|
||||||
signed_pdu = await self._check_sigs_and_hash(
|
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
|
||||||
room_version.identifier, pdu
|
|
||||||
)
|
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -350,7 +347,7 @@ class FederationClient(FederationBase):
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
pdus: List[EventBase],
|
pdus: List[EventBase],
|
||||||
room_version: str,
|
room_version: RoomVersion,
|
||||||
outlier: bool = False,
|
outlier: bool = False,
|
||||||
include_none: bool = False,
|
include_none: bool = False,
|
||||||
) -> List[EventBase]:
|
) -> List[EventBase]:
|
||||||
|
@ -396,7 +393,7 @@ class FederationClient(FederationBase):
|
||||||
self.get_pdu(
|
self.get_pdu(
|
||||||
destinations=[pdu.origin],
|
destinations=[pdu.origin],
|
||||||
event_id=pdu.event_id,
|
event_id=pdu.event_id,
|
||||||
room_version=room_version, # type: ignore
|
room_version=room_version,
|
||||||
outlier=outlier,
|
outlier=outlier,
|
||||||
timeout=10000,
|
timeout=10000,
|
||||||
)
|
)
|
||||||
|
@ -434,7 +431,7 @@ class FederationClient(FederationBase):
|
||||||
]
|
]
|
||||||
|
|
||||||
signed_auth = await self._check_sigs_and_hash_and_fetch(
|
signed_auth = await self._check_sigs_and_hash_and_fetch(
|
||||||
destination, auth_chain, outlier=True, room_version=room_version.identifier
|
destination, auth_chain, outlier=True, room_version=room_version
|
||||||
)
|
)
|
||||||
|
|
||||||
signed_auth.sort(key=lambda e: e.depth)
|
signed_auth.sort(key=lambda e: e.depth)
|
||||||
|
@ -661,7 +658,7 @@ class FederationClient(FederationBase):
|
||||||
destination,
|
destination,
|
||||||
list(pdus.values()),
|
list(pdus.values()),
|
||||||
outlier=True,
|
outlier=True,
|
||||||
room_version=room_version.identifier,
|
room_version=room_version,
|
||||||
)
|
)
|
||||||
|
|
||||||
valid_pdus_map = {p.event_id: p for p in valid_pdus}
|
valid_pdus_map = {p.event_id: p for p in valid_pdus}
|
||||||
|
@ -756,7 +753,7 @@ class FederationClient(FederationBase):
|
||||||
pdu = event_from_pdu_json(pdu_dict, room_version)
|
pdu = event_from_pdu_json(pdu_dict, room_version)
|
||||||
|
|
||||||
# Check signatures are correct.
|
# Check signatures are correct.
|
||||||
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
|
pdu = await self._check_sigs_and_hash(room_version, pdu)
|
||||||
|
|
||||||
# FIXME: We should handle signature failures more gracefully.
|
# FIXME: We should handle signature failures more gracefully.
|
||||||
|
|
||||||
|
@ -948,7 +945,7 @@ class FederationClient(FederationBase):
|
||||||
]
|
]
|
||||||
|
|
||||||
signed_events = await self._check_sigs_and_hash_and_fetch(
|
signed_events = await self._check_sigs_and_hash_and_fetch(
|
||||||
destination, events, outlier=False, room_version=room_version.identifier
|
destination, events, outlier=False, room_version=room_version
|
||||||
)
|
)
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
if not e.code == 400:
|
if not e.code == 400:
|
||||||
|
|
|
@ -409,7 +409,7 @@ class FederationServer(FederationBase):
|
||||||
pdu = event_from_pdu_json(content, room_version)
|
pdu = event_from_pdu_json(content, room_version)
|
||||||
origin_host, _ = parse_server_name(origin)
|
origin_host, _ = parse_server_name(origin)
|
||||||
await self.check_server_matches_acl(origin_host, pdu.room_id)
|
await self.check_server_matches_acl(origin_host, pdu.room_id)
|
||||||
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
|
pdu = await self._check_sigs_and_hash(room_version, pdu)
|
||||||
ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
|
ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
return {"event": ret_pdu.get_pdu_json(time_now)}
|
return {"event": ret_pdu.get_pdu_json(time_now)}
|
||||||
|
@ -425,7 +425,7 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
|
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
|
||||||
|
|
||||||
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
|
pdu = await self._check_sigs_and_hash(room_version, pdu)
|
||||||
|
|
||||||
res_pdus = await self.handler.on_send_join_request(origin, pdu)
|
res_pdus = await self.handler.on_send_join_request(origin, pdu)
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
|
@ -455,7 +455,7 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
|
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
|
||||||
|
|
||||||
pdu = await self._check_sigs_and_hash(room_version.identifier, pdu)
|
pdu = await self._check_sigs_and_hash(room_version, pdu)
|
||||||
|
|
||||||
await self.handler.on_send_leave_request(origin, pdu)
|
await self.handler.on_send_leave_request(origin, pdu)
|
||||||
return {}
|
return {}
|
||||||
|
@ -611,7 +611,7 @@ class FederationServer(FederationBase):
|
||||||
logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
|
logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
|
||||||
|
|
||||||
# We've already checked that we know the room version by this point
|
# We've already checked that we know the room version by this point
|
||||||
room_version = await self.store.get_room_version_id(pdu.room_id)
|
room_version = await self.store.get_room_version(pdu.room_id)
|
||||||
|
|
||||||
# Check signature.
|
# Check signature.
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue