Improve logging when processing incoming transactions (#9596)
Put the room id in the logcontext, to make it easier to understand what's going on.pull/9615/head
							parent
							
								
									464e5da7b2
								
							
						
					
					
						commit
						2b328d7e02
					
				| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Improve logging when processing incoming transactions.
 | 
			
		||||
| 
						 | 
				
			
			@ -335,34 +335,41 @@ class FederationServer(FederationBase):
 | 
			
		|||
        # impose a limit to avoid going too crazy with ram/cpu.
 | 
			
		||||
 | 
			
		||||
        async def process_pdus_for_room(room_id: str):
 | 
			
		||||
            with nested_logging_context(room_id):
 | 
			
		||||
                logger.debug("Processing PDUs for %s", room_id)
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    await self.check_server_matches_acl(origin_host, room_id)
 | 
			
		||||
                except AuthError as e:
 | 
			
		||||
                logger.warning("Ignoring PDUs for room %s from banned server", room_id)
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        "Ignoring PDUs for room %s from banned server", room_id
 | 
			
		||||
                    )
 | 
			
		||||
                    for pdu in pdus_by_room[room_id]:
 | 
			
		||||
                        event_id = pdu.event_id
 | 
			
		||||
                        pdu_results[event_id] = e.error_dict()
 | 
			
		||||
                    return
 | 
			
		||||
 | 
			
		||||
                for pdu in pdus_by_room[room_id]:
 | 
			
		||||
                    pdu_results[pdu.event_id] = await process_pdu(pdu)
 | 
			
		||||
 | 
			
		||||
        async def process_pdu(pdu: EventBase) -> JsonDict:
 | 
			
		||||
            event_id = pdu.event_id
 | 
			
		||||
            with pdu_process_time.time():
 | 
			
		||||
                with nested_logging_context(event_id):
 | 
			
		||||
                    try:
 | 
			
		||||
                        await self._handle_received_pdu(origin, pdu)
 | 
			
		||||
                            pdu_results[event_id] = {}
 | 
			
		||||
                        return {}
 | 
			
		||||
                    except FederationError as e:
 | 
			
		||||
                        logger.warning("Error handling PDU %s: %s", event_id, e)
 | 
			
		||||
                            pdu_results[event_id] = {"error": str(e)}
 | 
			
		||||
                        return {"error": str(e)}
 | 
			
		||||
                    except Exception as e:
 | 
			
		||||
                        f = failure.Failure()
 | 
			
		||||
                            pdu_results[event_id] = {"error": str(e)}
 | 
			
		||||
                        logger.error(
 | 
			
		||||
                            "Failed to handle PDU %s",
 | 
			
		||||
                            event_id,
 | 
			
		||||
                            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
 | 
			
		||||
                        )
 | 
			
		||||
                        return {"error": str(e)}
 | 
			
		||||
 | 
			
		||||
        await concurrently_execute(
 | 
			
		||||
            process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -201,7 +201,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
            or pdu.internal_metadata.is_outlier()
 | 
			
		||||
        )
 | 
			
		||||
        if already_seen:
 | 
			
		||||
            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
 | 
			
		||||
            logger.debug("Already seen pdu")
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # do some initial sanity-checking of the event. In particular, make
 | 
			
		||||
| 
						 | 
				
			
			@ -210,18 +210,14 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
        try:
 | 
			
		||||
            self._sanity_check_event(pdu)
 | 
			
		||||
        except SynapseError as err:
 | 
			
		||||
            logger.warning(
 | 
			
		||||
                "[%s %s] Received event failed sanity checks", room_id, event_id
 | 
			
		||||
            )
 | 
			
		||||
            logger.warning("Received event failed sanity checks")
 | 
			
		||||
            raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
 | 
			
		||||
 | 
			
		||||
        # If we are currently in the process of joining this room, then we
 | 
			
		||||
        # queue up events for later processing.
 | 
			
		||||
        if room_id in self.room_queues:
 | 
			
		||||
            logger.info(
 | 
			
		||||
                "[%s %s] Queuing PDU from %s for now: join in progress",
 | 
			
		||||
                room_id,
 | 
			
		||||
                event_id,
 | 
			
		||||
                "Queuing PDU from %s for now: join in progress",
 | 
			
		||||
                origin,
 | 
			
		||||
            )
 | 
			
		||||
            self.room_queues[room_id].append((pdu, origin))
 | 
			
		||||
| 
						 | 
				
			
			@ -236,9 +232,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
        is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
 | 
			
		||||
        if not is_in_room:
 | 
			
		||||
            logger.info(
 | 
			
		||||
                "[%s %s] Ignoring PDU from %s as we're not in the room",
 | 
			
		||||
                room_id,
 | 
			
		||||
                event_id,
 | 
			
		||||
                "Ignoring PDU from %s as we're not in the room",
 | 
			
		||||
                origin,
 | 
			
		||||
            )
 | 
			
		||||
            return None
 | 
			
		||||
| 
						 | 
				
			
			@ -250,7 +244,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
            # We only backfill backwards to the min depth.
 | 
			
		||||
            min_depth = await self.get_min_depth_for_context(pdu.room_id)
 | 
			
		||||
 | 
			
		||||
            logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth)
 | 
			
		||||
            logger.debug("min_depth: %d", min_depth)
 | 
			
		||||
 | 
			
		||||
            prevs = set(pdu.prev_event_ids())
 | 
			
		||||
            seen = await self.store.have_events_in_timeline(prevs)
 | 
			
		||||
| 
						 | 
				
			
			@ -267,17 +261,13 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
                    # If we're missing stuff, ensure we only fetch stuff one
 | 
			
		||||
                    # at a time.
 | 
			
		||||
                    logger.info(
 | 
			
		||||
                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
 | 
			
		||||
                        room_id,
 | 
			
		||||
                        event_id,
 | 
			
		||||
                        "Acquiring room lock to fetch %d missing prev_events: %s",
 | 
			
		||||
                        len(missing_prevs),
 | 
			
		||||
                        shortstr(missing_prevs),
 | 
			
		||||
                    )
 | 
			
		||||
                    with (await self._room_pdu_linearizer.queue(pdu.room_id)):
 | 
			
		||||
                        logger.info(
 | 
			
		||||
                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
 | 
			
		||||
                            room_id,
 | 
			
		||||
                            event_id,
 | 
			
		||||
                            "Acquired room lock to fetch %d missing prev_events",
 | 
			
		||||
                            len(missing_prevs),
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -297,9 +287,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
 | 
			
		||||
                        if not prevs - seen:
 | 
			
		||||
                            logger.info(
 | 
			
		||||
                                "[%s %s] Found all missing prev_events",
 | 
			
		||||
                                room_id,
 | 
			
		||||
                                event_id,
 | 
			
		||||
                                "Found all missing prev_events",
 | 
			
		||||
                            )
 | 
			
		||||
 | 
			
		||||
            if prevs - seen:
 | 
			
		||||
| 
						 | 
				
			
			@ -329,9 +317,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
 | 
			
		||||
                if sent_to_us_directly:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        "[%s %s] Rejecting: failed to fetch %d prev events: %s",
 | 
			
		||||
                        room_id,
 | 
			
		||||
                        event_id,
 | 
			
		||||
                        "Rejecting: failed to fetch %d prev events: %s",
 | 
			
		||||
                        len(prevs - seen),
 | 
			
		||||
                        shortstr(prevs - seen),
 | 
			
		||||
                    )
 | 
			
		||||
| 
						 | 
				
			
			@ -414,10 +400,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
                    state = [event_map[e] for e in state_map.values()]
 | 
			
		||||
                except Exception:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        "[%s %s] Error attempting to resolve state at missing "
 | 
			
		||||
                        "prev_events",
 | 
			
		||||
                        room_id,
 | 
			
		||||
                        event_id,
 | 
			
		||||
                        "Error attempting to resolve state at missing " "prev_events",
 | 
			
		||||
                        exc_info=True,
 | 
			
		||||
                    )
 | 
			
		||||
                    raise FederationError(
 | 
			
		||||
| 
						 | 
				
			
			@ -454,9 +437,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
        latest |= seen
 | 
			
		||||
 | 
			
		||||
        logger.info(
 | 
			
		||||
            "[%s %s]: Requesting missing events between %s and %s",
 | 
			
		||||
            room_id,
 | 
			
		||||
            event_id,
 | 
			
		||||
            "Requesting missing events between %s and %s",
 | 
			
		||||
            shortstr(latest),
 | 
			
		||||
            event_id,
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -523,15 +504,11 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
            # We failed to get the missing events, but since we need to handle
 | 
			
		||||
            # the case of `get_missing_events` not returning the necessary
 | 
			
		||||
            # events anyway, it is safe to simply log the error and continue.
 | 
			
		||||
            logger.warning(
 | 
			
		||||
                "[%s %s]: Failed to get prev_events: %s", room_id, event_id, e
 | 
			
		||||
            )
 | 
			
		||||
            logger.warning("Failed to get prev_events: %s", e)
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        logger.info(
 | 
			
		||||
            "[%s %s]: Got %d prev_events: %s",
 | 
			
		||||
            room_id,
 | 
			
		||||
            event_id,
 | 
			
		||||
            "Got %d prev_events: %s",
 | 
			
		||||
            len(missing_events),
 | 
			
		||||
            shortstr(missing_events),
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -542,9 +519,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
 | 
			
		||||
        for ev in missing_events:
 | 
			
		||||
            logger.info(
 | 
			
		||||
                "[%s %s] Handling received prev_event %s",
 | 
			
		||||
                room_id,
 | 
			
		||||
                event_id,
 | 
			
		||||
                "Handling received prev_event %s",
 | 
			
		||||
                ev.event_id,
 | 
			
		||||
            )
 | 
			
		||||
            with nested_logging_context(ev.event_id):
 | 
			
		||||
| 
						 | 
				
			
			@ -553,9 +528,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
                except FederationError as e:
 | 
			
		||||
                    if e.code == 403:
 | 
			
		||||
                        logger.warning(
 | 
			
		||||
                            "[%s %s] Received prev_event %s failed history check.",
 | 
			
		||||
                            room_id,
 | 
			
		||||
                            event_id,
 | 
			
		||||
                            "Received prev_event %s failed history check.",
 | 
			
		||||
                            ev.event_id,
 | 
			
		||||
                        )
 | 
			
		||||
                    else:
 | 
			
		||||
| 
						 | 
				
			
			@ -707,10 +680,7 @@ class FederationHandler(BaseHandler):
 | 
			
		|||
                (ie, we are missing one or more prev_events), the resolved state at the
 | 
			
		||||
                event
 | 
			
		||||
        """
 | 
			
		||||
        room_id = event.room_id
 | 
			
		||||
        event_id = event.event_id
 | 
			
		||||
 | 
			
		||||
        logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
 | 
			
		||||
        logger.debug("Processing event: %s", event)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            await self._handle_new_event(origin, event, state=state)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue