Last little bits

hs/super-wip-edus-down-sync
Will Hunt 2020-09-21 16:22:11 +01:00
parent 3bf1b79d3c
commit 4392526bf0
2 changed files with 35 additions and 11 deletions

View File

@ -31,9 +31,10 @@ from typing import (
Tuple, Tuple,
TypeVar, TypeVar,
Union, Union,
Collection,
) )
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken, UserID
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import ( from synapse.metrics import (
@ -42,6 +43,7 @@ from synapse.metrics import (
) )
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.handlers.presence import format_user_presence_state
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -173,7 +175,7 @@ class ApplicationServicesHandler:
finally: finally:
self.is_processing = False self.is_processing = False
async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken]): async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []):
services = [service for service in self.store.get_app_services() if service.supports_ephemeral] services = [service for service in self.store.get_app_services() if service.supports_ephemeral]
if not services or not self.notify_appservices: if not services or not self.notify_appservices:
return return
@ -185,7 +187,7 @@ class ApplicationServicesHandler:
from_key = new_token - 1 from_key = new_token - 1
typing_source = self.event_sources.sources["typing"] typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current # Get the typing events from just before current
typing, _typing_key = await typing_source.get_new_events_as( typing, _key = await typing_source.get_new_events_as(
service=service, service=service,
from_key=from_key from_key=from_key
) )
@ -193,15 +195,37 @@ class ApplicationServicesHandler:
elif stream_key == "receipt_key": elif stream_key == "receipt_key":
from_key = new_token - 1 from_key = new_token - 1
receipts_source = self.event_sources.sources["receipt"] receipts_source = self.event_sources.sources["receipt"]
receipts, _receipts_key = await receipts_source.get_new_events_as( receipts, _key = await receipts_source.get_new_events_as(
service=service, service=service,
from_key=from_key from_key=from_key
) )
events = receipts events = receipts
elif stream_key == "presence": elif stream_key == "presence_key":
# TODO: This. Presence means trying to determine all the events = []
# users the appservice cares about, which means checking presence_source = self.event_sources.sources["presence"]
# all the rooms the appservice is in. for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _key = await presence_source.get_new_events(
user=user,
service=service,
from_key=None, # I don't think this is required
)
time_now = self.clock.time_msec()
presence_events = [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in presence_events
]
events = events + presence_events
elif stream_key == "to_device_key":
print("to_device_key", users)
if events: if events:
# TODO: Do in background? # TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(service, events) await self.scheduler.submit_ephemeral_events_for_as(service, events)

View File

@ -326,9 +326,9 @@ class Notifier:
except Exception: except Exception:
logger.exception("Error notifying application services of event") logger.exception("Error notifying application services of event")
async def _notify_app_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken]): async def _notify_app_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []):
try: try:
await self.appservice_handler.notify_interested_services_ephemeral(stream_key, new_token) await self.appservice_handler.notify_interested_services_ephemeral(stream_key, new_token, users)
except Exception: except Exception:
logger.exception("Error notifying application services of event") logger.exception("Error notifying application services of event")
@ -372,7 +372,7 @@ class Notifier:
# Notify appservices # Notify appservices
run_as_background_process( run_as_background_process(
"_notify_app_services_ephemeral", self._notify_app_services_ephemeral, stream_key, new_token, "_notify_app_services_ephemeral", self._notify_app_services_ephemeral, stream_key, new_token, users,
) )
def on_new_replication_data(self) -> None: def on_new_replication_data(self) -> None: