Merge pull request #2520 from matrix-org/rav/process_incoming_rooms_in_parallel
fed server: process PDUs for different rooms in parallelpull/2039/merge
commit
79bea15830
|
@ -12,14 +12,12 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from .federation_base import FederationBase
|
from .federation_base import FederationBase
|
||||||
from .units import Transaction, Edu
|
from .units import Transaction, Edu
|
||||||
|
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util import async
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
|
@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
# when processing incoming transactions, we try to handle multiple rooms in
|
||||||
|
# parallel, up to this limit.
|
||||||
|
TRANSACTION_CONCURRENCY_LIMIT = 10
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -52,8 +53,8 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
self.auth = hs.get_auth()
|
self.auth = hs.get_auth()
|
||||||
|
|
||||||
self._server_linearizer = Linearizer("fed_server")
|
self._server_linearizer = async.Linearizer("fed_server")
|
||||||
self._transaction_linearizer = Linearizer("fed_txn_handler")
|
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
|
||||||
|
|
||||||
# We cache responses to state queries, as they take a while and often
|
# We cache responses to state queries, as they take a while and often
|
||||||
# come in waves.
|
# come in waves.
|
||||||
|
@ -159,7 +160,7 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
received_pdus_counter.inc_by(len(transaction.pdus))
|
received_pdus_counter.inc_by(len(transaction.pdus))
|
||||||
|
|
||||||
pdu_list = []
|
pdus_by_room = {}
|
||||||
|
|
||||||
for p in transaction.pdus:
|
for p in transaction.pdus:
|
||||||
if "unsigned" in p:
|
if "unsigned" in p:
|
||||||
|
@ -171,22 +172,36 @@ class FederationServer(FederationBase):
|
||||||
del p["age"]
|
del p["age"]
|
||||||
|
|
||||||
event = self.event_from_pdu_json(p)
|
event = self.event_from_pdu_json(p)
|
||||||
pdu_list.append(event)
|
room_id = event.room_id
|
||||||
|
pdus_by_room.setdefault(room_id, []).append(event)
|
||||||
|
|
||||||
pdu_results = {}
|
pdu_results = {}
|
||||||
|
|
||||||
for pdu in pdu_list:
|
# we can process different rooms in parallel (which is useful if they
|
||||||
event_id = pdu.event_id
|
# require callouts to other servers to fetch missing events), but
|
||||||
try:
|
# impose a limit to avoid going too crazy with ram/cpu.
|
||||||
yield self._handle_received_pdu(transaction.origin, pdu)
|
@defer.inlineCallbacks
|
||||||
pdu_results[event_id] = {}
|
def process_pdus_for_room(room_id):
|
||||||
except FederationError as e:
|
logger.debug("Processing PDUs for %s", room_id)
|
||||||
logger.warn("Error handling PDU %s: %s", event_id, e)
|
for pdu in pdus_by_room[room_id]:
|
||||||
self.send_failure(e, transaction.origin)
|
event_id = pdu.event_id
|
||||||
pdu_results[event_id] = {"error": str(e)}
|
try:
|
||||||
except Exception as e:
|
yield self._handle_received_pdu(
|
||||||
pdu_results[event_id] = {"error": str(e)}
|
transaction.origin, pdu
|
||||||
logger.exception("Failed to handle PDU")
|
)
|
||||||
|
pdu_results[event_id] = {}
|
||||||
|
except FederationError as e:
|
||||||
|
logger.warn("Error handling PDU %s: %s", event_id, e)
|
||||||
|
self.send_failure(e, transaction.origin)
|
||||||
|
pdu_results[event_id] = {"error": str(e)}
|
||||||
|
except Exception as e:
|
||||||
|
pdu_results[event_id] = {"error": str(e)}
|
||||||
|
logger.exception("Failed to handle PDU %s", event_id)
|
||||||
|
|
||||||
|
yield async.concurrently_execute(
|
||||||
|
process_pdus_for_room, pdus_by_room.keys(),
|
||||||
|
TRANSACTION_CONCURRENCY_LIMIT,
|
||||||
|
)
|
||||||
|
|
||||||
if hasattr(transaction, "edus"):
|
if hasattr(transaction, "edus"):
|
||||||
for edu in (Edu(**x) for x in transaction.edus):
|
for edu in (Edu(**x) for x in transaction.edus):
|
||||||
|
|
Loading…
Reference in New Issue