Hook up receipts to v1 initialSync
parent
f0dd6d4cbd
commit
87311d1b8c
|
@ -278,6 +278,11 @@ class MessageHandler(BaseHandler):
|
||||||
user, pagination_config.get_source_config("presence"), None
|
user, pagination_config.get_source_config("presence"), None
|
||||||
)
|
)
|
||||||
|
|
||||||
|
receipt_stream = self.hs.get_event_sources().sources["receipt"]
|
||||||
|
receipt, _ = yield receipt_stream.get_pagination_rows(
|
||||||
|
user, pagination_config.get_source_config("receipt"), None
|
||||||
|
)
|
||||||
|
|
||||||
public_room_ids = yield self.store.get_public_room_ids()
|
public_room_ids = yield self.store.get_public_room_ids()
|
||||||
|
|
||||||
limit = pagin_config.limit
|
limit = pagin_config.limit
|
||||||
|
@ -344,7 +349,8 @@ class MessageHandler(BaseHandler):
|
||||||
ret = {
|
ret = {
|
||||||
"rooms": rooms_ret,
|
"rooms": rooms_ret,
|
||||||
"presence": presence,
|
"presence": presence,
|
||||||
"end": now_token.to_string()
|
"receipts": receipt,
|
||||||
|
"end": now_token.to_string(),
|
||||||
}
|
}
|
||||||
|
|
||||||
defer.returnValue(ret)
|
defer.returnValue(ret)
|
||||||
|
@ -405,9 +411,12 @@ class MessageHandler(BaseHandler):
|
||||||
|
|
||||||
defer.returnValue([p for success, p in presence_defs if success])
|
defer.returnValue([p for success, p in presence_defs if success])
|
||||||
|
|
||||||
presence, (messages, token) = yield defer.gatherResults(
|
receipts_handler = self.hs.get_handlers().receipts_handler
|
||||||
|
|
||||||
|
presence, receipts, (messages, token) = yield defer.gatherResults(
|
||||||
[
|
[
|
||||||
get_presence(),
|
get_presence(),
|
||||||
|
receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
|
||||||
self.store.get_recent_events_for_room(
|
self.store.get_recent_events_for_room(
|
||||||
room_id,
|
room_id,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
|
@ -431,5 +440,6 @@ class MessageHandler(BaseHandler):
|
||||||
"end": end_token.to_string(),
|
"end": end_token.to_string(),
|
||||||
},
|
},
|
||||||
"state": state,
|
"state": state,
|
||||||
"presence": presence
|
"presence": presence,
|
||||||
|
"receipts": receipts,
|
||||||
})
|
})
|
||||||
|
|
|
@ -133,6 +133,24 @@ class ReceiptsHandler(BaseHandler):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_receipts_for_room(self, room_id, to_key):
|
||||||
|
result = yield self.store.get_linearized_receipts_for_room(
|
||||||
|
room_id, None, to_key
|
||||||
|
)
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
defer.returnValue([])
|
||||||
|
|
||||||
|
event = {
|
||||||
|
"type": "m.receipt",
|
||||||
|
"content": {
|
||||||
|
room_id: result,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
defer.returnValue([event])
|
||||||
|
|
||||||
|
|
||||||
class ReceiptEventSource(object):
|
class ReceiptEventSource(object):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -168,4 +186,29 @@ class ReceiptEventSource(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_pagination_rows(self, user, config, key):
|
def get_pagination_rows(self, user, config, key):
|
||||||
defer.returnValue(([{}], 0))
|
to_key = int(config.from_key)
|
||||||
|
|
||||||
|
if config.to_key:
|
||||||
|
from_key = int(config.to_key)
|
||||||
|
else:
|
||||||
|
from_key = None
|
||||||
|
|
||||||
|
rooms = yield self.store.get_rooms_for_user(user.to_string())
|
||||||
|
rooms = [room.room_id for room in rooms]
|
||||||
|
content = {}
|
||||||
|
for room_id in rooms:
|
||||||
|
result = yield self.store.get_linearized_receipts_for_room(
|
||||||
|
room_id, from_key, to_key
|
||||||
|
)
|
||||||
|
if result:
|
||||||
|
content[room_id] = result
|
||||||
|
|
||||||
|
if not content:
|
||||||
|
defer.returnValue(([], to_key))
|
||||||
|
|
||||||
|
event = {
|
||||||
|
"type": "m.receipt",
|
||||||
|
"content": content,
|
||||||
|
}
|
||||||
|
|
||||||
|
defer.returnValue(([event], to_key))
|
||||||
|
|
|
@ -28,15 +28,26 @@ class ReceiptsStore(SQLBaseStore):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_linearized_receipts_for_room(self, room_id, from_key, to_key):
|
def get_linearized_receipts_for_room(self, room_id, from_key, to_key):
|
||||||
def f(txn):
|
def f(txn):
|
||||||
sql = (
|
if from_key:
|
||||||
"SELECT * FROM receipts_linearized WHERE"
|
sql = (
|
||||||
" room_id = ? AND stream_id > ? AND stream_id <= ?"
|
"SELECT * FROM receipts_linearized WHERE"
|
||||||
)
|
" room_id = ? AND stream_id > ? AND stream_id <= ?"
|
||||||
|
)
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
sql,
|
sql,
|
||||||
(room_id, from_key, to_key)
|
(room_id, from_key, to_key)
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
sql = (
|
||||||
|
"SELECT * FROM receipts_linearized WHERE"
|
||||||
|
" room_id = ? AND stream_id <= ?"
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
sql,
|
||||||
|
(room_id, to_key)
|
||||||
|
)
|
||||||
|
|
||||||
rows = self.cursor_to_dict(txn)
|
rows = self.cursor_to_dict(txn)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue