Add support for device messages, start support for device lists

hs/super-wip-edus-down-sync
Will Hunt 2020-09-22 11:31:59 +01:00
parent 4392526bf0
commit 316ad09a64
5 changed files with 153 additions and 28 deletions

View File

@ -201,7 +201,7 @@ class ApplicationServiceApi(SimpleHttpClient):
key = (service.id, protocol)
return await self.protocol_meta_cache.wrap(key, _get)
async def push_ephemeral(self, service, events):
async def push_ephemeral(self, service, events, to_device=None, device_lists=None):
if service.url is None:
return True
if service.supports_ephemeral is False:
@ -213,7 +213,7 @@ class ApplicationServiceApi(SimpleHttpClient):
try:
await self.put_json(
uri=uri,
json_body={"events": events},
json_body={"events": events, "device_messages": to_device, "device_lists": device_lists},
args={"access_token": service.hs_token},
)
return True

View File

@ -201,35 +201,61 @@ class ApplicationServicesHandler:
)
events = receipts
elif stream_key == "presence_key":
events = []
presence_source = self.event_sources.sources["presence"]
for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _key = await presence_source.get_new_events(
user=user,
service=service,
from_key=None, # I don't think this is required
)
time_now = self.clock.time_msec()
presence_events = [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in presence_events
]
events = events + presence_events
events = await self._handle_as_presence(service, users)
elif stream_key == "device_list_key":
# Check if the device lists have changed for any of the users we are interested in
print("device_list_key", users)
elif stream_key == "to_device_key":
print("to_device_key", users)
# Check the inbox for any users the bridge owns
events, to_device_token = await self._handle_to_device(service, users, new_token)
if events:
# TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
if stream_key == "to_device_key":
# Update database with new token
await self.store.set_device_messages_token_for_appservice(service, to_device_token)
return
if events:
# TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.scheduler.submit_ephemeral_events_for_as(service, events, new_token)
async def _handle_device_list(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False
async def _handle_to_device(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False
since_token = await self.store.get_device_messages_token_for_appservice(service)
messages, new_token = await self.store.get_new_messages_for_as(service, since_token, token)
return messages, new_token
async def _handle_as_presence(self, service, users):
events = []
presence_source = self.event_sources.sources["presence"]
for user in users:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
presence_events, _key = await presence_source.get_new_events(
user=user,
service=service,
from_key=None, # TODO: I don't think this is required?
)
time_now = self.clock.time_msec()
presence_events = [
{
"type": "m.presence",
"sender": event.user_id,
"content": format_user_presence_state(
event, time_now, include_user_id=False
),
}
for event in presence_events
]
events = events + presence_events
async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.

View File

@ -320,7 +320,7 @@ class ApplicationServiceTransactionWorkerStore(
)
async def get_new_events_for_appservice(self, current_id, limit):
"""Get all new evnets"""
"""Get all new events for an appservice"""
def get_new_events_for_appservice_txn(txn):
sql = (
@ -350,6 +350,48 @@ class ApplicationServiceTransactionWorkerStore(
events = await self.get_events_as_list(event_ids)
return upper_bound, events
async def get_device_messages_token_for_appservice(self, service):
txn.execute(
"SELECT device_message_stream_id FROM application_services_state WHERE as_id=?",
(service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col
async def set_device_messages_token_for_appservice(self, service, pos) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE application_services_state SET device_message_stream_id = ? WHERE as_id=?", (pos, service.id)
)
await self.db_pool.runInteraction(
"set_device_messages_token_for_appservice", set_appservice_last_pos_txn
)
async def get_device_list_token_for_appservice(self, service):
txn.execute(
"SELECT device_list_stream_id FROM application_services_state WHERE as_id=?",
(service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col
async def set_device_list_token_for_appservice(self, service, pos) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE application_services_state SET device_list_stream_id = ?", (pos, service.id)
)
await self.db_pool.runInteraction(
"set_device_list_token_for_appservice", set_appservice_last_pos_txn
)
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):

View File

@ -21,6 +21,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
from synapse.storage.database import DatabasePool
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.appservice import ApplicationService
logger = logging.getLogger(__name__)
@ -29,6 +30,37 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def get_to_device_stream_token(self):
return self._device_inbox_id_gen.get_current_token()
async def get_new_messages_for_as(
self,
service: ApplicationService,
last_stream_id: int,
current_stream_id: int,
limit: int = 100,
) -> Tuple[List[dict], int]:
def get_new_messages_for_device_txn(txn):
sql = (
"SELECT stream_id, message_json, device_id, user_id FROM device_inbox"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
" LIMIT ?"
)
txn.execute(
sql, (last_stream_id, current_stream_id, limit)
)
messages = []
for row in txn:
stream_pos = row[0]
if service.is_interested_in_user(row.user_id):
messages.append(db_to_json(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return messages, stream_pos
return await self.db_pool.runInteraction(
"get_new_messages_for_device", get_new_messages_for_device_txn
)
async def get_new_messages_for_device(
self,
user_id: str,

View File

@ -0,0 +1,25 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* for some reason, we have accumulated duplicate entries in
* device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
* efficient.
*/
ALTER TABLE application_services_state
ADD COLUMN device_list_stream_id INT;
ALTER TABLE application_services_state
ADD COLUMN device_message_stream_id INT;