parent
							
								
									11ea16777f
								
							
						
					
					
						commit
						130f932cbc
					
				|  | @ -40,8 +40,7 @@ logger = logging.getLogger(__name__) | |||
| 
 | ||||
| 
 | ||||
| sent_edus_counter = Counter( | ||||
|     "synapse_federation_client_sent_edus", | ||||
|     "Total number of EDUs successfully sent", | ||||
|     "synapse_federation_client_sent_edus", "Total number of EDUs successfully sent" | ||||
| ) | ||||
| 
 | ||||
| sent_edus_by_type = Counter( | ||||
|  | @ -61,6 +60,7 @@ class PerDestinationQueue(object): | |||
|         destination (str): the server_name of the destination that we are managing | ||||
|             transmission for. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, hs, transaction_manager, destination): | ||||
|         self._server_name = hs.hostname | ||||
|         self._clock = hs.get_clock() | ||||
|  | @ -71,17 +71,17 @@ class PerDestinationQueue(object): | |||
|         self.transmission_loop_running = False | ||||
| 
 | ||||
|         # a list of tuples of (pending pdu, order) | ||||
|         self._pending_pdus = []    # type: list[tuple[EventBase, int]] | ||||
|         self._pending_edus = []    # type: list[Edu] | ||||
|         self._pending_pdus = []  # type: list[tuple[EventBase, int]] | ||||
|         self._pending_edus = []  # type: list[Edu] | ||||
| 
 | ||||
|         # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered | ||||
|         # based on their key (e.g. typing events by room_id) | ||||
|         # Map of (edu_type, key) -> Edu | ||||
|         self._pending_edus_keyed = {}   # type: dict[tuple[str, str], Edu] | ||||
|         self._pending_edus_keyed = {}  # type: dict[tuple[str, str], Edu] | ||||
| 
 | ||||
|         # Map of user_id -> UserPresenceState of pending presence to be sent to this | ||||
|         # destination | ||||
|         self._pending_presence = {}   # type: dict[str, UserPresenceState] | ||||
|         self._pending_presence = {}  # type: dict[str, UserPresenceState] | ||||
| 
 | ||||
|         # room_id -> receipt_type -> user_id -> receipt_dict | ||||
|         self._pending_rrs = {} | ||||
|  | @ -123,9 +123,7 @@ class PerDestinationQueue(object): | |||
|         Args: | ||||
|             states (iterable[UserPresenceState]): presence to send | ||||
|         """ | ||||
|         self._pending_presence.update({ | ||||
|             state.user_id: state for state in states | ||||
|         }) | ||||
|         self._pending_presence.update({state.user_id: state for state in states}) | ||||
|         self.attempt_new_transaction() | ||||
| 
 | ||||
|     def queue_read_receipt(self, receipt): | ||||
|  | @ -135,14 +133,9 @@ class PerDestinationQueue(object): | |||
|         Args: | ||||
|             receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued | ||||
|         """ | ||||
|         self._pending_rrs.setdefault( | ||||
|             receipt.room_id, {}, | ||||
|         ).setdefault( | ||||
|         self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( | ||||
|             receipt.receipt_type, {} | ||||
|         )[receipt.user_id] = { | ||||
|             "event_ids": receipt.event_ids, | ||||
|             "data": receipt.data, | ||||
|         } | ||||
|         )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} | ||||
| 
 | ||||
|     def flush_read_receipts_for_room(self, room_id): | ||||
|         # if we don't have any read-receipts for this room, it may be that we've already | ||||
|  | @ -173,10 +166,7 @@ class PerDestinationQueue(object): | |||
|             # request at which point pending_pdus just keeps growing. | ||||
|             # we need application-layer timeouts of some flavour of these | ||||
|             # requests | ||||
|             logger.debug( | ||||
|                 "TX [%s] Transaction already in progress", | ||||
|                 self._destination | ||||
|             ) | ||||
|             logger.debug("TX [%s] Transaction already in progress", self._destination) | ||||
|             return | ||||
| 
 | ||||
|         logger.debug("TX [%s] Starting transaction loop", self._destination) | ||||
|  | @ -243,14 +233,22 @@ class PerDestinationQueue(object): | |||
|                     ) | ||||
| 
 | ||||
|                 pending_edus.extend(device_message_edus) | ||||
|                 pending_edus.extend(self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))) | ||||
|                 while len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self._pending_edus_keyed: | ||||
|                 pending_edus.extend( | ||||
|                     self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) | ||||
|                 ) | ||||
|                 while ( | ||||
|                     len(pending_edus) < MAX_EDUS_PER_TRANSACTION | ||||
|                     and self._pending_edus_keyed | ||||
|                 ): | ||||
|                     _, val = self._pending_edus_keyed.popitem() | ||||
|                     pending_edus.append(val) | ||||
| 
 | ||||
|                 if pending_pdus: | ||||
|                     logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", | ||||
|                                  self._destination, len(pending_pdus)) | ||||
|                     logger.debug( | ||||
|                         "TX [%s] len(pending_pdus_by_dest[dest]) = %d", | ||||
|                         self._destination, | ||||
|                         len(pending_pdus), | ||||
|                     ) | ||||
| 
 | ||||
|                 if not pending_pdus and not pending_edus: | ||||
|                     logger.debug("TX [%s] Nothing to send", self._destination) | ||||
|  | @ -303,22 +301,25 @@ class PerDestinationQueue(object): | |||
|         except HttpResponseException as e: | ||||
|             logger.warning( | ||||
|                 "TX [%s] Received %d response to transaction: %s", | ||||
|                 self._destination, e.code, e, | ||||
|                 self._destination, | ||||
|                 e.code, | ||||
|                 e, | ||||
|             ) | ||||
|         except RequestSendFailed as e: | ||||
|             logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e) | ||||
|             logger.warning( | ||||
|                 "TX [%s] Failed to send transaction: %s", self._destination, e | ||||
|             ) | ||||
| 
 | ||||
|             for p, _ in pending_pdus: | ||||
|                 logger.info("Failed to send event %s to %s", p.event_id, | ||||
|                             self._destination) | ||||
|                 logger.info( | ||||
|                     "Failed to send event %s to %s", p.event_id, self._destination | ||||
|                 ) | ||||
|         except Exception: | ||||
|             logger.exception( | ||||
|                 "TX [%s] Failed to send transaction", | ||||
|                 self._destination, | ||||
|             ) | ||||
|             logger.exception("TX [%s] Failed to send transaction", self._destination) | ||||
|             for p, _ in pending_pdus: | ||||
|                 logger.info("Failed to send event %s to %s", p.event_id, | ||||
|                             self._destination) | ||||
|                 logger.info( | ||||
|                     "Failed to send event %s to %s", p.event_id, self._destination | ||||
|                 ) | ||||
|         finally: | ||||
|             # We want to be *very* sure we clear this after we stop processing | ||||
|             self.transmission_loop_running = False | ||||
|  | @ -367,7 +368,10 @@ class PerDestinationQueue(object): | |||
|         last_device_stream_id = self._last_device_stream_id | ||||
|         to_device_stream_id = self._store.get_to_device_stream_token() | ||||
|         contents, stream_id = yield self._store.get_new_device_msgs_for_remote( | ||||
|             self._destination, last_device_stream_id, to_device_stream_id, limit - len(edus) | ||||
|             self._destination, | ||||
|             last_device_stream_id, | ||||
|             to_device_stream_id, | ||||
|             limit - len(edus), | ||||
|         ) | ||||
|         edus.extend( | ||||
|             Edu( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Richard van der Hoff
						Richard van der Hoff