Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls (#13588)
Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls because I've see `_check_sigs_and_hash_and_fetch` take [10.41s to process 100 events](https://github.com/matrix-org/synapse/issues/13587) Fix https://github.com/matrix-org/synapse/issues/13587 Part of https://github.com/matrix-org/synapse/issues/13356pull/13610/head
parent
a25a37002c
commit
7af07f9716
|
@ -0,0 +1 @@
|
||||||
|
Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls for understandable traces in Jaeger.
|
|
@ -28,6 +28,7 @@ from synapse.api.errors import Codes, SynapseError
|
||||||
from synapse.api.room_versions import RoomVersion
|
from synapse.api.room_versions import RoomVersion
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.events.utils import prune_event, prune_event_dict
|
from synapse.events.utils import prune_event, prune_event_dict
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -35,6 +36,7 @@ logger = logging.getLogger(__name__)
|
||||||
Hasher = Callable[[bytes], "hashlib._Hash"]
|
Hasher = Callable[[bytes], "hashlib._Hash"]
|
||||||
|
|
||||||
|
|
||||||
|
@trace
|
||||||
def check_event_content_hash(
|
def check_event_content_hash(
|
||||||
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
|
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
|
|
@ -32,6 +32,7 @@ from typing_extensions import Literal
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
from synapse.api.errors import Codes
|
from synapse.api.errors import Codes
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.rest.media.v1._base import FileInfo
|
from synapse.rest.media.v1._base import FileInfo
|
||||||
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
|
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
|
||||||
from synapse.spam_checker_api import RegistrationBehaviour
|
from synapse.spam_checker_api import RegistrationBehaviour
|
||||||
|
@ -378,6 +379,7 @@ class SpamChecker:
|
||||||
if check_media_file_for_spam is not None:
|
if check_media_file_for_spam is not None:
|
||||||
self._check_media_file_for_spam_callbacks.append(check_media_file_for_spam)
|
self._check_media_file_for_spam_callbacks.append(check_media_file_for_spam)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def check_event_for_spam(
|
async def check_event_for_spam(
|
||||||
self, event: "synapse.events.EventBase"
|
self, event: "synapse.events.EventBase"
|
||||||
) -> Union[Tuple[Codes, JsonDict], str]:
|
) -> Union[Tuple[Codes, JsonDict], str]:
|
||||||
|
|
|
@ -23,6 +23,7 @@ from synapse.crypto.keyring import Keyring
|
||||||
from synapse.events import EventBase, make_event_from_dict
|
from synapse.events import EventBase, make_event_from_dict
|
||||||
from synapse.events.utils import prune_event, validate_canonicaljson
|
from synapse.events.utils import prune_event, validate_canonicaljson
|
||||||
from synapse.http.servlet import assert_params_in_dict
|
from synapse.http.servlet import assert_params_in_dict
|
||||||
|
from synapse.logging.opentracing import log_kv, trace
|
||||||
from synapse.types import JsonDict, get_domain_from_id
|
from synapse.types import JsonDict, get_domain_from_id
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -55,6 +56,7 @@ class FederationBase:
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
|
|
||||||
|
@trace
|
||||||
async def _check_sigs_and_hash(
|
async def _check_sigs_and_hash(
|
||||||
self, room_version: RoomVersion, pdu: EventBase
|
self, room_version: RoomVersion, pdu: EventBase
|
||||||
) -> EventBase:
|
) -> EventBase:
|
||||||
|
@ -97,17 +99,36 @@ class FederationBase:
|
||||||
"Event %s seems to have been redacted; using our redacted copy",
|
"Event %s seems to have been redacted; using our redacted copy",
|
||||||
pdu.event_id,
|
pdu.event_id,
|
||||||
)
|
)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Event seems to have been redacted; using our redacted copy",
|
||||||
|
"event_id": pdu.event_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Event %s content has been tampered, redacting",
|
"Event %s content has been tampered, redacting",
|
||||||
pdu.event_id,
|
pdu.event_id,
|
||||||
)
|
)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Event content has been tampered, redacting",
|
||||||
|
"event_id": pdu.event_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
return redacted_event
|
return redacted_event
|
||||||
|
|
||||||
spam_check = await self.spam_checker.check_event_for_spam(pdu)
|
spam_check = await self.spam_checker.check_event_for_spam(pdu)
|
||||||
|
|
||||||
if spam_check != self.spam_checker.NOT_SPAM:
|
if spam_check != self.spam_checker.NOT_SPAM:
|
||||||
logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
|
logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Event contains spam, redacting (to save disk space) "
|
||||||
|
"as well as soft-failing (to stop using the event in prev_events)",
|
||||||
|
"event_id": pdu.event_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
# we redact (to save disk space) as well as soft-failing (to stop
|
# we redact (to save disk space) as well as soft-failing (to stop
|
||||||
# using the event in prev_events).
|
# using the event in prev_events).
|
||||||
redacted_event = prune_event(pdu)
|
redacted_event = prune_event(pdu)
|
||||||
|
@ -117,6 +138,7 @@ class FederationBase:
|
||||||
return pdu
|
return pdu
|
||||||
|
|
||||||
|
|
||||||
|
@trace
|
||||||
async def _check_sigs_on_pdu(
|
async def _check_sigs_on_pdu(
|
||||||
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
|
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
@ -61,7 +61,7 @@ from synapse.federation.federation_base import (
|
||||||
)
|
)
|
||||||
from synapse.federation.transport.client import SendJoinResponse
|
from synapse.federation.transport.client import SendJoinResponse
|
||||||
from synapse.http.types import QueryParams
|
from synapse.http.types import QueryParams
|
||||||
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
|
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
|
||||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
|
@ -587,11 +587,15 @@ class FederationClient(FederationBase):
|
||||||
Returns:
|
Returns:
|
||||||
A list of PDUs that have valid signatures and hashes.
|
A list of PDUs that have valid signatures and hashes.
|
||||||
"""
|
"""
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.RESULT_PREFIX + "pdus.length",
|
||||||
|
str(len(pdus)),
|
||||||
|
)
|
||||||
|
|
||||||
# We limit how many PDUs we check at once, as if we try to do hundreds
|
# We limit how many PDUs we check at once, as if we try to do hundreds
|
||||||
# of thousands of PDUs at once we see large memory spikes.
|
# of thousands of PDUs at once we see large memory spikes.
|
||||||
|
|
||||||
valid_pdus = []
|
valid_pdus: List[EventBase] = []
|
||||||
|
|
||||||
async def _execute(pdu: EventBase) -> None:
|
async def _execute(pdu: EventBase) -> None:
|
||||||
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
|
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
|
||||||
|
@ -607,6 +611,8 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
return valid_pdus
|
return valid_pdus
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def _check_sigs_and_hash_and_fetch_one(
|
async def _check_sigs_and_hash_and_fetch_one(
|
||||||
self,
|
self,
|
||||||
pdu: EventBase,
|
pdu: EventBase,
|
||||||
|
@ -639,16 +645,27 @@ class FederationClient(FederationBase):
|
||||||
except InvalidEventSignatureError as e:
|
except InvalidEventSignatureError as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Signature on retrieved event %s was invalid (%s). "
|
"Signature on retrieved event %s was invalid (%s). "
|
||||||
"Checking local store/orgin server",
|
"Checking local store/origin server",
|
||||||
pdu.event_id,
|
pdu.event_id,
|
||||||
e,
|
e,
|
||||||
)
|
)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"message": "Signature on retrieved event was invalid. "
|
||||||
|
"Checking local store/origin server",
|
||||||
|
"event_id": pdu.event_id,
|
||||||
|
"InvalidEventSignatureError": e,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Check local db.
|
# Check local db.
|
||||||
res = await self.store.get_event(
|
res = await self.store.get_event(
|
||||||
pdu.event_id, allow_rejected=True, allow_none=True
|
pdu.event_id, allow_rejected=True, allow_none=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If the PDU fails its signature check and we don't have it in our
|
||||||
|
# database, we then request it from sender's server (if that is not the
|
||||||
|
# same as `origin`).
|
||||||
pdu_origin = get_domain_from_id(pdu.sender)
|
pdu_origin = get_domain_from_id(pdu.sender)
|
||||||
if not res and pdu_origin != origin:
|
if not res and pdu_origin != origin:
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue