Add basic support for device list updates

hs/super-wip-edus-down-sync
Will Hunt 2020-09-30 12:55:57 +01:00
parent 6201ea56ee
commit f807c7291f
9 changed files with 172 additions and 108 deletions

View File

@ -19,7 +19,7 @@ from typing import TYPE_CHECKING
from synapse.api.constants import EventTypes
from synapse.appservice.api import ApplicationServiceApi
from synapse.types import GroupID, get_domain_from_id
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.storage.databases.main import DataStore

View File

@ -213,7 +213,11 @@ class ApplicationServiceApi(SimpleHttpClient):
try:
await self.put_json(
uri=uri,
json_body={"events": events, "device_messages": to_device, "device_lists": device_lists},
json_body={
"events": events,
"device_messages": to_device,
"device_lists": device_lists,
},
args={"access_token": service.hs_token},
)
return True

View File

@ -14,36 +14,24 @@
# limitations under the License.
import logging
from typing import Collection, List, Union
from prometheus_client import Counter
from twisted.internet import defer
import synapse
from typing import (
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
Collection,
)
from synapse.types import RoomStreamToken, UserID
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken, UserID
from synapse.util.metrics import Measure
from synapse.handlers.presence import format_user_presence_state
logger = logging.getLogger(__name__)
@ -175,8 +163,17 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
async def notify_interested_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []):
services = [service for service in self.store.get_app_services() if service.supports_ephemeral]
async def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services or not self.notify_appservices:
return
logger.info("Checking interested services for %s" % (stream_key))
@ -184,65 +181,99 @@ class ApplicationServicesHandler:
for service in services:
events = []
if stream_key == "typing_key":
from_key = new_token - 1
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _key = await typing_source.get_new_events_as(
service=service,
from_key=from_key
)
events = typing
events = await self._handle_typing(service, new_token)
elif stream_key == "receipt_key":
from_key = new_token - 1
receipts_source = self.event_sources.sources["receipt"]
receipts, _key = await receipts_source.get_new_events_as(
service=service,
from_key=from_key
)
events = receipts
events = await self._handle_receipts(service)
elif stream_key == "presence_key":
events = await self._handle_as_presence(service, users)
elif stream_key == "device_list_key":
# Check if the device lists have changed for any of the users we are interested in
print("device_list_key", users)
events = await self._handle_device_list(service, users, new_token)
elif stream_key == "to_device_key":
# Check the inbox for any users the bridge owns
events, to_device_token = await self._handle_to_device(service, users, new_token)
if events:
# TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
if stream_key == "to_device_key":
# Update database with new token
await self.store.set_device_messages_token_for_appservice(service, to_device_token)
return
# Check the inbox for any users the bridge owns
events = await self._handle_to_device(service, users, new_token)
if events:
# TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
await self.scheduler.submit_ephemeral_events_for_as(
service, events, new_token
)
# We don't persist the token for typing_key
if stream_key == "presence_key":
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
elif stream_key == "receipt_key":
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "to_device_key":
await self.store.set_type_stream_id_for_appservice(
service, "to_device", new_token
)
async def _handle_device_list(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False
async def _handle_typing(self, service, new_token):
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _key = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
# token in the DB and instead fetch the latest typing information
# for appservices.
from_key=new_token - 1,
)
return typing
async def _handle_receipts(self, service, token: int):
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
receipts_source = self.event_sources.sources["receipt"]
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts
async def _handle_device_list(
self, service: ApplicationService, users: List[str], new_token: int
):
# TODO: Determine if any user have left and report those
from_token = await self.store.get_type_stream_id_for_appservice(
service, "device_list"
)
changed_user_ids = await self.store.get_device_changes_for_as(
service, from_token, new_token
)
# Return the
return {
"type": "m.device_list_update",
"content": {"changed": changed_user_ids,},
}
async def _handle_to_device(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False
since_token = await self.store.get_device_messages_token_for_appservice(service)
messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token)
return messages, new_token
since_token = await self.store.get_type_stream_id_for_appservice(
service, "to_device"
)
messages, _ = await self.store.get_new_messages_for_as(
service, since_token, token
)
# This returns user_id -> device_id -> message
return messages
async def _handle_as_presence(self, service, users):
events = []
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _key = await presence_source.get_new_events(
user=user,
service=service,
from_key=None, # TODO: I don't think this is required?
user=user, service=service, from_key=from_key,
)
time_now = self.clock.time_msec()
presence_events = [

View File

@ -431,7 +431,9 @@ class TypingNotificationEventSource:
"content": {"user_ids": list(typing)},
}
async def get_new_events_as(self, from_key, service, **kwargs):
async def get_new_events_as(
self, from_key: int, service: ApplicationService, **kwargs
):
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()
@ -441,8 +443,9 @@ class TypingNotificationEventSource:
if handler._room_serials[room_id] <= from_key:
print("Key too old")
continue
# XXX: Store gut wrenching
if not await service.matches_user_in_member_list(room_id, handler.store):
if not await service.matches_user_in_member_list(
room_id, handler.store
):
continue
events.append(self._make_event_for(room_id))

View File

@ -329,9 +329,16 @@ class Notifier:
except Exception:
logger.exception("Error notifying application services of event")
async def _notify_app_services_ephemeral(self, stream_key: str, new_token: Union[int, RoomStreamToken], users: Collection[UserID] = []):
async def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
try:
await self.appservice_handler.notify_interested_services_ephemeral(stream_key, new_token, users)
await self.appservice_handler.notify_interested_services_ephemeral(
stream_key, new_token, users
)
except Exception:
logger.exception("Error notifying application services of event")
@ -375,7 +382,11 @@ class Notifier:
# Notify appservices
run_as_background_process(
"_notify_app_services_ephemeral", self._notify_app_services_ephemeral, stream_key, new_token, users,
"_notify_app_services_ephemeral",
self._notify_app_services_ephemeral,
stream_key,
new_token,
users,
)
def on_new_replication_data(self) -> None:

View File

@ -350,47 +350,36 @@ class ApplicationServiceTransactionWorkerStore(
events = await self.get_events_as_list(event_ids)
return upper_bound, events
async def get_device_messages_token_for_appservice(self, service):
txn.execute(
"SELECT device_message_stream_id FROM application_services_state WHERE as_id=?",
(service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col
async def set_device_messages_token_for_appservice(self, service, pos) -> None:
def set_appservice_last_pos_txn(txn):
async def get_type_stream_id_for_appservice(self, service, type: str) -> int:
def get_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"UPDATE application_services_state SET device_message_stream_id = ? WHERE as_id=?", (pos, service.id)
"SELECT ? FROM application_services_state WHERE as_id=?",
(stream_id_type, service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0])
return await self.db_pool.runInteraction(
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
)
async def set_type_stream_id_for_appservice(
self, service, type: str, pos: int
) -> None:
def set_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
(stream_id_type, pos, service.id),
)
await self.db_pool.runInteraction(
"set_device_messages_token_for_appservice", set_appservice_last_pos_txn
)
async def get_device_list_token_for_appservice(self, service):
txn.execute(
"SELECT device_list_stream_id FROM application_services_state WHERE as_id=?",
(service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col
async def set_device_list_token_for_appservice(self, service, pos) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE application_services_state SET device_list_stream_id = ?", (pos, service.id)
)
await self.db_pool.runInteraction(
"set_device_list_token_for_appservice", set_appservice_last_pos_txn
"set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn
)

View File

@ -16,12 +16,12 @@
import logging
from typing import List, Tuple
from synapse.appservice import ApplicationService
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.appservice import ApplicationService
logger = logging.getLogger(__name__)
@ -44,15 +44,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
" ORDER BY stream_id ASC"
" LIMIT ?"
)
txn.execute(
sql, (last_stream_id, current_stream_id, limit)
)
txn.execute(sql, (last_stream_id, current_stream_id, limit))
messages = []
for row in txn:
stream_pos = row[0]
if service.is_interested_in_user(row.user_id):
messages.append(db_to_json(row[1]))
msg = db_to_json(row[1])
msg.recipient = {
"device_id": row.device_id,
"user_id": row.user_id,
}
messages.append(msg)
if len(messages) < limit:
stream_pos = current_stream_id
return messages, stream_pos

View File

@ -19,6 +19,7 @@ import logging
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api.errors import Codes, StoreError
from synapse.appservice import ApplicationService
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
@ -525,6 +526,31 @@ class DeviceWorkerStore(SQLBaseStore):
"get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
)
async def get_device_changes_for_as(
self,
service: ApplicationService,
last_stream_id: int,
current_stream_id: int,
limit: int = 100,
) -> Tuple[List[dict], int]:
def get_device_changes_for_as_txn(txn):
sql = (
"SELECT DISTINCT user_ids FROM device_lists_stream"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
" LIMIT ?"
)
txn.execute(sql, (last_stream_id, current_stream_id, limit))
rows = txn.fetchall()
users = []
for user in db_to_json(rows[0]):
if await service.is_interested_in_presence(user):
users.append(user)
return await self.db_pool.runInteraction(
"get_device_changes_for_as", get_device_changes_for_as_txn
)
async def get_users_whose_signatures_changed(
self, user_id: str, from_key: int
) -> Set[str]:

View File

@ -283,9 +283,7 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
}
return results
@cached(
num_args=2,
)
@cached(num_args=2,)
async def _get_linearized_receipts_for_all_rooms(self, to_key, from_key=None):
def f(txn):
if from_key:
@ -326,7 +324,6 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
return results
async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]: