Store if we fail to fetch an event from a destination
							parent
							
								
									d45489474d
								
							
						
					
					
						commit
						f91df1f761
					
				|  | @ -51,10 +51,34 @@ sent_edus_counter = metrics.register_counter("sent_edus") | |||
| sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) | ||||
| 
 | ||||
| 
 | ||||
| PDU_RETRY_TIME_MS = 1 * 60 * 1000 | ||||
| 
 | ||||
| 
 | ||||
| class FederationClient(FederationBase): | ||||
|     def __init__(self, hs): | ||||
|         super(FederationClient, self).__init__(hs) | ||||
| 
 | ||||
|         self.pdu_destination_tried = {} | ||||
|         self._clock.looping_call( | ||||
|             self._clear_tried_cache, 60 * 1000, | ||||
|         ) | ||||
| 
 | ||||
|     def _clear_tried_cache(self): | ||||
|         """Clear pdu_destination_tried cache""" | ||||
|         now = self._clock.time_msec() | ||||
| 
 | ||||
|         old_dict = self.pdu_destination_tried | ||||
|         self.pdu_destination_tried = {} | ||||
| 
 | ||||
|         for event_id, destination_dict in old_dict.items(): | ||||
|             destination_dict = { | ||||
|                 dest: time | ||||
|                 for dest, time in destination_dict.items() | ||||
|                 if time + PDU_RETRY_TIME_MS > now | ||||
|             } | ||||
|             if destination_dict: | ||||
|                 self.pdu_destination_tried[event_id] = destination_dict | ||||
| 
 | ||||
|     def start_get_pdu_cache(self): | ||||
|         self._get_pdu_cache = ExpiringCache( | ||||
|             cache_name="get_pdu_cache", | ||||
|  | @ -240,8 +264,15 @@ class FederationClient(FederationBase): | |||
|             if ev: | ||||
|                 defer.returnValue(ev) | ||||
| 
 | ||||
|         pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) | ||||
| 
 | ||||
|         pdu = None | ||||
|         for destination in destinations: | ||||
|             now = self._clock.time_msec() | ||||
|             last_attempt = pdu_attempts.get(destination, 0) | ||||
|             if last_attempt + PDU_RETRY_TIME_MS > now: | ||||
|                 continue | ||||
| 
 | ||||
|             try: | ||||
|                 limiter = yield get_retry_limiter( | ||||
|                     destination, | ||||
|  | @ -276,9 +307,11 @@ class FederationClient(FederationBase): | |||
|                 ) | ||||
|                 continue | ||||
|             except CodeMessageException as e: | ||||
|                 if 400 <= e.code < 500: | ||||
|                 if 400 <= e.code < 500 and e.code != 404: | ||||
|                     raise | ||||
| 
 | ||||
|                 pdu_attempts[destination] = now | ||||
| 
 | ||||
|                 logger.info( | ||||
|                     "Failed to get PDU %s from %s because %s", | ||||
|                     event_id, destination, e, | ||||
|  | @ -288,6 +321,8 @@ class FederationClient(FederationBase): | |||
|                 logger.info(e.message) | ||||
|                 continue | ||||
|             except Exception as e: | ||||
|                 pdu_attempts[destination] = now | ||||
| 
 | ||||
|                 logger.info( | ||||
|                     "Failed to get PDU %s from %s because %s", | ||||
|                     event_id, destination, e, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston