Fix HTTP update_function
parent
bd64b8fcd5
commit
f8038f4670
|
@ -147,6 +147,28 @@ def db_query_to_update_function(
|
||||||
return update_function
|
return update_function
|
||||||
|
|
||||||
|
|
||||||
|
def make_http_update_function(
|
||||||
|
hs, stream_name: str
|
||||||
|
) -> Callable[[int, int, int], Awaitable[Tuple[List[Tuple[int, tuple]], int, bool]]]:
|
||||||
|
"""Makes a suitable function for use as an `update_function` that queries
|
||||||
|
the master process for updates.
|
||||||
|
"""
|
||||||
|
|
||||||
|
client = ReplicationGetStreamUpdates.make_client(hs)
|
||||||
|
|
||||||
|
async def update_function(
|
||||||
|
from_token: int, upto_token: int, limit: int
|
||||||
|
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|
||||||
|
return await client(
|
||||||
|
stream_name=stream_name,
|
||||||
|
from_token=from_token,
|
||||||
|
upto_token=upto_token,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
return update_function
|
||||||
|
|
||||||
|
|
||||||
class BackfillStream(Stream):
|
class BackfillStream(Stream):
|
||||||
"""We fetched some old events and either we had never seen that event before
|
"""We fetched some old events and either we had never seen that event before
|
||||||
or it went from being an outlier to not.
|
or it went from being an outlier to not.
|
||||||
|
@ -204,7 +226,7 @@ class PresenceStream(Stream):
|
||||||
self.update_function = db_query_to_update_function(presence_handler.get_all_presence_updates) # type: ignore
|
self.update_function = db_query_to_update_function(presence_handler.get_all_presence_updates) # type: ignore
|
||||||
else:
|
else:
|
||||||
# Query master process
|
# Query master process
|
||||||
self.update_function = ReplicationGetStreamUpdates.make_client(hs) # type: ignore
|
self.update_function = make_http_update_function(hs, self.NAME) # type: ignore
|
||||||
|
|
||||||
super(PresenceStream, self).__init__(hs)
|
super(PresenceStream, self).__init__(hs)
|
||||||
|
|
||||||
|
@ -226,7 +248,7 @@ class TypingStream(Stream):
|
||||||
self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore
|
self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore
|
||||||
else:
|
else:
|
||||||
# Query master process
|
# Query master process
|
||||||
self.update_function = ReplicationGetStreamUpdates.make_client(hs) # type: ignore
|
self.update_function = make_http_update_function(hs, self.NAME) # type: ignore
|
||||||
|
|
||||||
super(TypingStream, self).__init__(hs)
|
super(TypingStream, self).__init__(hs)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue