Base public room list off of public_rooms stream
parent
5810cffd33
commit
4fb65a1091
|
@ -55,16 +55,26 @@ class RoomListHandler(BaseHandler):
|
||||||
else:
|
else:
|
||||||
since_token = None
|
since_token = None
|
||||||
|
|
||||||
room_ids = yield self.store.get_public_room_ids()
|
|
||||||
|
|
||||||
rooms_to_order_value = {}
|
rooms_to_order_value = {}
|
||||||
rooms_to_num_joined = {}
|
rooms_to_num_joined = {}
|
||||||
rooms_to_latest_event_ids = {}
|
rooms_to_latest_event_ids = {}
|
||||||
|
|
||||||
|
newly_visible = []
|
||||||
|
newly_unpublished = []
|
||||||
if since_token:
|
if since_token:
|
||||||
current_stream_token = since_token.stream_ordering
|
stream_token = since_token.stream_ordering
|
||||||
|
current_public_id = yield self.store.get_current_public_room_stream_id()
|
||||||
|
public_room_stream_id = since_token.public_room_stream_id
|
||||||
|
newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
|
||||||
|
public_room_stream_id, current_public_id
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
current_stream_token = yield self.store.get_room_max_stream_ordering()
|
stream_token = yield self.store.get_room_max_stream_ordering()
|
||||||
|
public_room_stream_id = yield self.store.get_current_public_room_stream_id()
|
||||||
|
|
||||||
|
room_ids = yield self.store.get_public_room_ids_at_stream_id(
|
||||||
|
public_room_stream_id
|
||||||
|
)
|
||||||
|
|
||||||
# We want to return rooms in a particular order: the number of joined
|
# We want to return rooms in a particular order: the number of joined
|
||||||
# users. We then arbitrarily use the room_id as a tie breaker.
|
# users. We then arbitrarily use the room_id as a tie breaker.
|
||||||
|
@ -74,7 +84,7 @@ class RoomListHandler(BaseHandler):
|
||||||
latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
|
latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
|
||||||
if not latest_event_ids:
|
if not latest_event_ids:
|
||||||
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
|
latest_event_ids = yield self.store.get_forward_extremeties_for_room(
|
||||||
room_id, current_stream_token
|
room_id, stream_token
|
||||||
)
|
)
|
||||||
rooms_to_latest_event_ids[room_id] = latest_event_ids
|
rooms_to_latest_event_ids[room_id] = latest_event_ids
|
||||||
|
|
||||||
|
@ -125,6 +135,9 @@ class RoomListHandler(BaseHandler):
|
||||||
if num_joined_users == 0:
|
if num_joined_users == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if room_id in newly_unpublished:
|
||||||
|
return
|
||||||
|
|
||||||
result = {
|
result = {
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
"num_joined_members": num_joined_users,
|
"num_joined_members": num_joined_users,
|
||||||
|
@ -207,10 +220,14 @@ class RoomListHandler(BaseHandler):
|
||||||
"chunk": chunk,
|
"chunk": chunk,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if since_token:
|
||||||
|
results["new_rooms"] = bool(newly_visible)
|
||||||
|
|
||||||
if not since_token or since_token.direction_is_forward:
|
if not since_token or since_token.direction_is_forward:
|
||||||
if new_limit:
|
if new_limit:
|
||||||
results["next_batch"] = RoomListNextBatch(
|
results["next_batch"] = RoomListNextBatch(
|
||||||
stream_ordering=current_stream_token,
|
stream_ordering=stream_token,
|
||||||
|
public_room_stream_id=public_room_stream_id,
|
||||||
current_limit=new_limit,
|
current_limit=new_limit,
|
||||||
direction_is_forward=True,
|
direction_is_forward=True,
|
||||||
).to_token()
|
).to_token()
|
||||||
|
@ -222,7 +239,8 @@ class RoomListHandler(BaseHandler):
|
||||||
else:
|
else:
|
||||||
if new_limit:
|
if new_limit:
|
||||||
results["prev_batch"] = RoomListNextBatch(
|
results["prev_batch"] = RoomListNextBatch(
|
||||||
stream_ordering=current_stream_token,
|
stream_ordering=stream_token,
|
||||||
|
public_room_stream_id=public_room_stream_id,
|
||||||
current_limit=new_limit,
|
current_limit=new_limit,
|
||||||
direction_is_forward=False,
|
direction_is_forward=False,
|
||||||
).to_token()
|
).to_token()
|
||||||
|
@ -245,12 +263,14 @@ class RoomListHandler(BaseHandler):
|
||||||
|
|
||||||
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
|
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
|
||||||
"stream_ordering", # stream_ordering of the first public room list
|
"stream_ordering", # stream_ordering of the first public room list
|
||||||
|
"public_room_stream_id", # public room stream id for first public room list
|
||||||
"current_limit", # The number of previous rooms returned
|
"current_limit", # The number of previous rooms returned
|
||||||
"direction_is_forward", # Bool if this is a next_batch, false if prev_batch
|
"direction_is_forward", # Bool if this is a next_batch, false if prev_batch
|
||||||
))):
|
))):
|
||||||
|
|
||||||
KEY_DICT = {
|
KEY_DICT = {
|
||||||
"stream_ordering": "s",
|
"stream_ordering": "s",
|
||||||
|
"public_room_stream_id": "p",
|
||||||
"current_limit": "n",
|
"current_limit": "n",
|
||||||
"direction_is_forward": "d",
|
"direction_is_forward": "d",
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,3 +255,55 @@ class RoomStore(SQLBaseStore):
|
||||||
},
|
},
|
||||||
desc="add_event_report"
|
desc="add_event_report"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_current_public_room_stream_id(self):
|
||||||
|
return self._public_room_id_gen.get_current_token()
|
||||||
|
|
||||||
|
def get_public_room_ids_at_stream_id(self, stream_id):
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_public_room_ids_at_stream_id",
|
||||||
|
self.get_public_room_ids_at_stream_id_txn, stream_id
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_public_room_ids_at_stream_id_txn(self, txn, stream_id):
|
||||||
|
return {
|
||||||
|
rm
|
||||||
|
for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items()
|
||||||
|
if vis
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_published_at_stream_id_txn(self, txn, stream_id):
|
||||||
|
sql = ("""
|
||||||
|
SELECT room_id, visibility FROM public_room_list_stream
|
||||||
|
INNER JOIN (
|
||||||
|
SELECT room_id, max(stream_id) AS stream_id
|
||||||
|
FROM public_room_list_stream
|
||||||
|
WHERE stream_id <= ?
|
||||||
|
GROUP BY room_id
|
||||||
|
) grouped USING (room_id, stream_id)
|
||||||
|
""")
|
||||||
|
|
||||||
|
txn.execute(sql, (stream_id,))
|
||||||
|
return dict(txn.fetchall())
|
||||||
|
|
||||||
|
def get_public_room_changes(self, prev_stream_id, new_stream_id):
|
||||||
|
def get_public_room_changes_txn(txn):
|
||||||
|
then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id)
|
||||||
|
|
||||||
|
now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id)
|
||||||
|
|
||||||
|
now_rooms_visible = set(
|
||||||
|
rm for rm, vis in now_rooms_dict.items() if vis
|
||||||
|
)
|
||||||
|
now_rooms_not_visible = set(
|
||||||
|
rm for rm, vis in now_rooms_dict.items() if not vis
|
||||||
|
)
|
||||||
|
|
||||||
|
newly_visible = now_rooms_visible - then_rooms
|
||||||
|
newly_unpublished = now_rooms_not_visible & then_rooms
|
||||||
|
|
||||||
|
return newly_visible, newly_unpublished
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_public_room_changes", get_public_room_changes_txn
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue