From 316ad09a64b455efc18823f92619fd9c7d3a6c63 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 22 Sep 2020 11:31:59 +0100 Subject: [PATCH] Add support for device messages, start support for device lists --- synapse/appservice/api.py | 4 +- synapse/handlers/appservice.py | 76 +++++++++++++------ synapse/storage/databases/main/appservice.py | 44 ++++++++++- synapse/storage/databases/main/deviceinbox.py | 32 ++++++++ .../schema/delta/58/18as_device_stream.sql | 25 ++++++ 5 files changed, 153 insertions(+), 28 deletions(-) create mode 100644 synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 48982d2ad3..9523c3a5d9 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -201,7 +201,7 @@ class ApplicationServiceApi(SimpleHttpClient): key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) - async def push_ephemeral(self, service, events): + async def push_ephemeral(self, service, events, to_device=None, device_lists=None): if service.url is None: return True if service.supports_ephemeral is False: @@ -213,7 +213,7 @@ class ApplicationServiceApi(SimpleHttpClient): try: await self.put_json( uri=uri, - json_body={"events": events}, + json_body={"events": events, "device_messages": to_device, "device_lists": device_lists}, args={"access_token": service.hs_token}, ) return True diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9813447903..dbbde3db18 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -201,35 +201,61 @@ class ApplicationServicesHandler: ) events = receipts elif stream_key == "presence_key": - events = [] - presence_source = self.event_sources.sources["presence"] - for user in users: - interested = await service.is_interested_in_presence(user, self.store) - if not interested: - continue - presence_events, _key = await presence_source.get_new_events( - user=user, - service=service, - from_key=None, # I don't think this is required - ) - time_now = self.clock.time_msec() - presence_events = [ - { - "type": "m.presence", - "sender": event.user_id, - "content": format_user_presence_state( - event, time_now, include_user_id=False - ), - } - for event in presence_events - ] - events = events + presence_events + events = await self._handle_as_presence(service, users) + elif stream_key == "device_list_key": + # Check if the device lists have changed for any of the users we are interested in + print("device_list_key", users) elif stream_key == "to_device_key": - print("to_device_key", users) + # Check the inbox for any users the bridge owns + events, to_device_token = await self._handle_to_device(service, users, new_token) + if events: + # TODO: Do in background? + await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) + if stream_key == "to_device_key": + # Update database with new token + await self.store.set_device_messages_token_for_appservice(service, to_device_token) + return if events: # TODO: Do in background? - await self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token) + + async def _handle_device_list(self, service, users, token): + if not any([True for u in users if service.is_interested_in_user(u)]): + return False + + async def _handle_to_device(self, service, users, token): + if not any([True for u in users if service.is_interested_in_user(u)]): + return False + since_token = await self.store.get_device_messages_token_for_appservice(service) + + messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token) + return messages, new_token + + async def _handle_as_presence(self, service, users): + events = [] + presence_source = self.event_sources.sources["presence"] + for user in users: + interested = await service.is_interested_in_presence(user, self.store) + if not interested: + continue + presence_events, _key = await presence_source.get_new_events( + user=user, + service=service, + from_key=None, # TODO: I don't think this is required? + ) + time_now = self.clock.time_msec() + presence_events = [ + { + "type": "m.presence", + "sender": event.user_id, + "content": format_user_presence_state( + event, time_now, include_user_id=False + ), + } + for event in presence_events + ] + events = events + presence_events async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 85f6b1e3fd..1ebe4504fd 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -320,7 +320,7 @@ class ApplicationServiceTransactionWorkerStore( ) async def get_new_events_for_appservice(self, current_id, limit): - """Get all new evnets""" + """Get all new events for an appservice""" def get_new_events_for_appservice_txn(txn): sql = ( @@ -350,6 +350,48 @@ class ApplicationServiceTransactionWorkerStore( events = await self.get_events_as_list(event_ids) return upper_bound, events + + async def get_device_messages_token_for_appservice(self, service): + txn.execute( + "SELECT device_message_stream_id FROM application_services_state WHERE as_id=?", + (service.id,), + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col + + async def set_device_messages_token_for_appservice(self, service, pos) -> None: + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE application_services_state SET device_message_stream_id = ? WHERE as_id=?", (pos, service.id) + ) + + await self.db_pool.runInteraction( + "set_device_messages_token_for_appservice", set_appservice_last_pos_txn + ) + + async def get_device_list_token_for_appservice(self, service): + txn.execute( + "SELECT device_list_stream_id FROM application_services_state WHERE as_id=?", + (service.id,), + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) # select 'last_txn' col + + async def set_device_list_token_for_appservice(self, service, pos) -> None: + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE application_services_state SET device_list_stream_id = ?", (pos, service.id) + ) + + await self.db_pool.runInteraction( + "set_device_list_token_for_appservice", set_appservice_last_pos_txn + ) class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore): diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index e71217a41f..e4fd979a33 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -21,6 +21,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla from synapse.storage.database import DatabasePool from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache +from synapse.appservice import ApplicationService logger = logging.getLogger(__name__) @@ -29,6 +30,37 @@ class DeviceInboxWorkerStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() + async def get_new_messages_for_as( + self, + service: ApplicationService, + last_stream_id: int, + current_stream_id: int, + limit: int = 100, + ) -> Tuple[List[dict], int]: + def get_new_messages_for_device_txn(txn): + sql = ( + "SELECT stream_id, message_json, device_id, user_id FROM device_inbox" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute( + sql, (last_stream_id, current_stream_id, limit) + ) + messages = [] + + for row in txn: + stream_pos = row[0] + if service.is_interested_in_user(row.user_id): + messages.append(db_to_json(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return messages, stream_pos + + return await self.db_pool.runInteraction( + "get_new_messages_for_device", get_new_messages_for_device_txn + ) + async def get_new_messages_for_device( self, user_id: str, diff --git a/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql new file mode 100644 index 0000000000..d4abfb6183 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/18as_device_stream.sql @@ -0,0 +1,25 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /* for some reason, we have accumulated duplicate entries in + * device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less + * efficient. + */ + +ALTER TABLE application_services_state + ADD COLUMN device_list_stream_id INT; + +ALTER TABLE application_services_state + ADD COLUMN device_message_stream_id INT; \ No newline at end of file