diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 12e8df9cc6..43daf673c0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -169,7 +169,7 @@ class TransactionQueue(object): while True: last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( - last_token, self._last_poked_id, limit=20, + last_token, self._last_poked_id, limit=100, ) logger.debug("Handling %s -> %s", last_token, next_token) @@ -177,12 +177,13 @@ class TransactionQueue(object): if not events and next_token >= self._last_poked_id: break - for event in events: + @defer.inlineCallbacks + def handle_event(event): # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.event_id) if not is_mine and send_on_behalf_of is None: - continue + return try: # Get the state from before the event. @@ -198,7 +199,7 @@ class TransactionQueue(object): ) except Exception: logger.exception("Failed to calculate hosts in room") - continue + return destinations = set(destinations) @@ -212,6 +213,19 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + def handle_room_events(events): + for event in events: + return handle_event(event) + + events_by_room = {} + for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + yield logcontext.make_deferred_yieldable(defer.gatherResults( + [handle_room_events(evs) for evs in events_by_room.itervalues()], + consumeErrors=True + )) + events_processed_counter.inc_by(len(events)) yield self.store.update_federation_out_pos(