diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index fc2f38c1c0..94f0810057 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -37,7 +37,8 @@ class ReceiptsHandler(BaseHandler): "m.receipt", self._received_remote_receipt ) - self._latest_serial = 0 + # self._earliest_cached_serial = 0 + # self._rooms_to_latest_serial = {} @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, @@ -53,8 +54,10 @@ class ReceiptsHandler(BaseHandler): "event_ids": [event_id], } - yield self._handle_new_receipts([receipt]) - self._push_remotes([receipt]) + is_new = yield self._handle_new_receipts([receipt]) + + if is_new: + self._push_remotes([receipt]) @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): @@ -81,33 +84,24 @@ class ReceiptsHandler(BaseHandler): user_id = receipt["user_id"] event_ids = receipt["event_ids"] - stream_id, max_persisted_id = yield self.store.insert_receipt( + res = yield self.store.insert_receipt( room_id, receipt_type, user_id, event_ids, ) - # TODO: Use max_persisted_id + if not res: + # res will be None if this read receipt is 'old' + defer.returnValue(False) - self._latest_serial = max(self._latest_serial, stream_id) + stream_id, max_persisted_id = res with PreserveLoggingContext(): self.notifier.on_new_event( - "receipt_key", self._latest_serial, rooms=[room_id] + "receipt_key", max_persisted_id, rooms=[room_id] ) - localusers = set() - remotedomains = set() - - rm_handler = self.hs.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains - ) - - receipt["remotedomains"] = remotedomains - - self.notifier.on_new_event( - "receipt_key", self._latest_room_serial, rooms=[room_id] - ) + defer.returnValue(True) + @defer.inlineCallbacks def _push_remotes(self, receipts): # TODO: Some of this stuff should be coallesced. for receipt in receipts: @@ -115,7 +109,15 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] - remotedomains = receipt["remotedomains"] + + remotedomains = set() + + rm_handler = self.hs.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=None, remotedomains=remotedomains + ) + + logger.debug("Sending receipt to: %r", remotedomains) for domain in remotedomains: self.federation.send_edu( @@ -130,3 +132,40 @@ class ReceiptsHandler(BaseHandler): }, }, ) + + +class ReceiptEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + to_key = yield self.get_current_key() + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + content = {} + for room_id in rooms: + result = yield self.store.get_linearized_receipts_for_room( + room_id, from_key, to_key + ) + if result: + content[room_id] = result + + if not content: + defer.returnValue(([], to_key)) + + event = { + "type": "m.receipt", + "content": content, + } + + defer.returnValue(([event], to_key)) + + def get_current_key(self, direction='f'): + return self.store.get_max_receipt_stream_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + defer.returnValue(([{}], 0)) diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py index 829427b7b6..40406e2ede 100644 --- a/synapse/rest/client/v2_alpha/receipts.py +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -28,7 +28,7 @@ class ReceiptRestServlet(RestServlet): PATTERN = client_v2_pattern( "/rooms/(?P[^/]*)" "/receipt/(?P[^/]*)" - "/(?P[^/])*" + "/(?P[^/]*)$" ) def __init__(self, hs): @@ -41,7 +41,6 @@ class ReceiptRestServlet(RestServlet): def on_POST(self, request, room_id, receipt_type, event_id): user, client = yield self.auth.get_user_by_req(request) - # TODO: STUFF yield self.receipts_handler.received_client_receipt( room_id, receipt_type, diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 15c11fd410..5a02c80252 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -17,17 +17,33 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer +import logging + + +logger = logging.getLogger(__name__) + class ReceiptsStore(SQLBaseStore): - @cached @defer.inlineCallbacks - def get_linearized_receipts_for_room(self, room_id): - rows = yield self._simple_select_list( - table="receipts_linearized", - keyvalues={"room_id": room_id}, - retcols=["receipt_type", "user_id", "event_id"], - desc="get_linearized_receipts_for_room", + def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + def f(txn): + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id > ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, from_key, to_key) + ) + + rows = self.cursor_to_dict(txn) + + return rows + + rows = yield self.runInteraction( + "get_linearized_receipts_for_room", f ) result = {} @@ -40,6 +56,9 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(result) + def get_max_receipt_stream_id(self): + return self._receipts_id_gen.get_max_token(self) + @cached @defer.inlineCallbacks def get_graph_receipts_for_room(self, room_id): @@ -62,11 +81,38 @@ class ReceiptsStore(SQLBaseStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, stream_id): + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + sql = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + + txn.execute(sql, (room_id, receipt_type, user_id)) + results = txn.fetchall() + + if results: + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + ) + topological_ordering = int(res["topological_ordering"]) + stream_ordering = int(res["stream_ordering"]) + + for to, so, _ in results: + if int(to) > topological_ordering: + return False + elif int(to) == topological_ordering and int(so) >= stream_ordering: + return False + self._simple_delete_txn( txn, table="receipts_linearized", keyvalues={ - "stream_id": stream_id, "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, @@ -85,6 +131,8 @@ class ReceiptsStore(SQLBaseStore): } ) + return True + @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids): if not event_ids: @@ -115,13 +163,16 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: - yield self.runInteraction( + have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, stream_id=stream_id, ) + if not have_persisted: + defer.returnValue(None) + yield self.insert_graph_receipt( room_id, receipt_type, user_id, event_ids ) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 0a1a3a3d03..aaa3609aa5 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -20,6 +20,7 @@ from synapse.types import StreamToken from synapse.handlers.presence import PresenceEventSource from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource +from synapse.handlers.receipts import ReceiptEventSource class NullSource(object): @@ -43,6 +44,7 @@ class EventSources(object): "room": RoomEventSource, "presence": PresenceEventSource, "typing": TypingNotificationEventSource, + "receipt": ReceiptEventSource, } def __init__(self, hs): @@ -63,7 +65,9 @@ class EventSources(object): typing_key=( yield self.sources["typing"].get_current_key() ), - receipt_key="0", + receipt_key=( + yield self.sources["receipt"].get_current_key() + ), ) defer.returnValue(token)