Merge pull request #4852 from matrix-org/rav/move_rr_sending_to_worker

Move client receipt processing to federation sender worker.
pull/4863/head
Richard van der Hoff 2019-03-15 12:30:30 +00:00 committed by GitHub
commit 2dee441bdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 55 deletions

1
changelog.d/4852.misc Normal file
View File

@ -0,0 +1 @@
Move client read-receipt processing to federation sender worker.

View File

@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue from synapse.federation import send_queue
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.devices import SlavedDeviceStore
@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.logcontext import LoggingContext, run_in_background
@ -202,6 +205,7 @@ class FederationSenderHandler(object):
""" """
def __init__(self, hs, replication_client): def __init__(self, hs, replication_client):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender() self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client self.replication_client = replication_client
@ -234,6 +238,32 @@ class FederationSenderHandler(object):
elif stream_name == "events": elif stream_name == "events":
self.federation_sender.notify_new_events(token) self.federation_sender.notify_new_events(token)
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
run_as_background_process(
"process_receipts_for_federation", self._on_new_receipts, rows,
)
@defer.inlineCallbacks
def _on_new_receipts(self, rows):
"""
Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
new receipts to be processed
"""
for receipt in rows:
# we only want to send on receipts for our own users
if not self._is_mine_id(receipt.user_id):
continue
receipt_info = ReadReceipt(
receipt.room_id,
receipt.receipt_type,
receipt.user_id,
[receipt.event_id],
receipt.data,
)
yield self.federation_sender.send_read_receipt(receipt_info)
@defer.inlineCallbacks @defer.inlineCallbacks
def update_token(self, token): def update_token(self, token):
try: try:

View File

@ -183,6 +183,15 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data() self.notifier.on_new_replication_data()
def send_read_receipt(self, receipt):
"""As per TransactionQueue
Args:
receipt (synapse.types.ReadReceipt):
"""
# nothing to do here: the replication listener will handle it.
pass
def send_presence(self, states): def send_presence(self, states):
"""As per TransactionQueue """As per TransactionQueue

View File

@ -290,6 +290,41 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination) self._attempt_new_transaction(destination)
@defer.inlineCallbacks
def send_read_receipt(self, receipt):
"""Send a RR to any other servers in the room
Args:
receipt (synapse.types.ReadReceipt): receipt to be sent
"""
# Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
domains = [d for d in domains if d != self.server_name]
if not domains:
return
logger.debug("Sending receipt to: %r", domains)
content = {
receipt.room_id: {
receipt.receipt_type: {
receipt.user_id: {
"event_ids": receipt.event_ids,
"data": receipt.data,
},
},
},
}
key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
for domain in domains:
self.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content=content,
key=key,
)
@logcontext.preserve_fn # the caller should not yield on this @logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks @defer.inlineCallbacks
def send_presence(self, states): def send_presence(self, states):

View File

@ -16,9 +16,8 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.types import get_domain_from_id from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt
from ._base import BaseHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
"""Called when we receive an EDU of type m.receipt from a remote HS. """Called when we receive an EDU of type m.receipt from a remote HS.
""" """
receipts = [ receipts = [
{ ReadReceipt(
"room_id": room_id, room_id=room_id,
"receipt_type": receipt_type, receipt_type=receipt_type,
"user_id": user_id, user_id=user_id,
"event_ids": user_values["event_ids"], event_ids=user_values["event_ids"],
"data": user_values.get("data", {}), data=user_values.get("data", {}),
} )
for room_id, room_values in content.items() for room_id, room_values in content.items()
for receipt_type, users in room_values.items() for receipt_type, users in room_values.items()
for user_id, user_values in users.items() for user_id, user_values in users.items()
@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
max_batch_id = None max_batch_id = None
for receipt in receipts: for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
res = yield self.store.insert_receipt( res = yield self.store.insert_receipt(
room_id, receipt_type, user_id, event_ids, data receipt.room_id,
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.data,
) )
if not res: if not res:
@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts # no new receipts
defer.returnValue(False) defer.returnValue(False)
affected_room_ids = list(set([r["room_id"] for r in receipts])) affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event( self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids "receipt_key", max_batch_id, rooms=affected_room_ids
@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
"""Called when a client tells us a local user has read up to the given """Called when a client tells us a local user has read up to the given
event_id in the room. event_id in the room.
""" """
receipt = { receipt = ReadReceipt(
"room_id": room_id, room_id=room_id,
"receipt_type": receipt_type, receipt_type=receipt_type,
"user_id": user_id, user_id=user_id,
"event_ids": [event_id], event_ids=[event_id],
"data": { data={
"ts": int(self.clock.time_msec()), "ts": int(self.clock.time_msec()),
} },
} )
is_new = yield self._handle_new_receipts([receipt]) is_new = yield self._handle_new_receipts([receipt])
if not is_new: if not is_new:
return return
# Work out which remote servers should be poked and poke them. self.federation.send_read_receipt(receipt)
# TODO: optimise this to move some of the work to the workers.
data = receipt["data"]
# XXX why does this not use state.get_current_hosts_in_room() ?
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains:
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": [event_id],
"data": data,
}
}
},
},
key=(room_id, receipt_type, user_id),
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key): def get_receipts_for_room(self, room_id, to_key):

View File

@ -16,6 +16,8 @@ import re
import string import string
from collections import namedtuple from collections import namedtuple
import attr
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
@ -455,3 +457,13 @@ class ThirdPartyInstanceID(
@classmethod @classmethod
def create(cls, appservice_id, network_id,): def create(cls, appservice_id, network_id,):
return cls(appservice_id=appservice_id, network_id=network_id) return cls(appservice_id=appservice_id, network_id=network_id)
@attr.s(slots=True)
class ReadReceipt(object):
"""Information about a read-receipt"""
room_id = attr.ib()
receipt_type = attr.ib()
user_id = attr.ib()
event_ids = attr.ib()
data = attr.ib()