Send EDUs over /transaction and drop device stuff

hs/push-reports-to-as
Will Hunt 2020-10-01 14:14:10 +01:00
parent f807c7291f
commit 5cb3d237ae
8 changed files with 52 additions and 185 deletions

View File

@ -35,10 +35,11 @@ class ApplicationServiceState:
class AppServiceTransaction:
"""Represents an application service transaction."""
def __init__(self, service, id, events):
def __init__(self, service, id, events, ephemeral=None):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
async def send(self, as_api: ApplicationServiceApi) -> bool:
"""Sends this transaction using the provided AS API interface.
@ -49,7 +50,10 @@ class AppServiceTransaction:
True if the transaction was sent.
"""
return await as_api.push_bulk(
service=self.service, events=self.events, txn_id=self.id
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
txn_id=self.id,
)
async def complete(self, store: "DataStore") -> None:

View File

@ -201,33 +201,7 @@ class ApplicationServiceApi(SimpleHttpClient):
key = (service.id, protocol)
return await self.protocol_meta_cache.wrap(key, _get)
async def push_ephemeral(self, service, events, to_device=None, device_lists=None):
if service.url is None:
return True
if service.supports_ephemeral is False:
return True
uri = service.url + (
"%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX
)
try:
await self.put_json(
uri=uri,
json_body={
"events": events,
"device_messages": to_device,
"device_lists": device_lists,
},
args={"access_token": service.hs_token},
)
return True
except CodeMessageException as e:
logger.warning("push_ephemeral to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_ephemeral to %s threw exception %s", uri, ex)
return False
async def push_bulk(self, service, events, txn_id=None):
async def push_bulk(self, service, events, ephemeral=None, txn_id=None):
if service.url is None:
return True
@ -241,11 +215,12 @@ class ApplicationServiceApi(SimpleHttpClient):
txn_id = str(txn_id)
uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
body = {"events": events}
if ephemeral:
body["uk.half-shot.appservice.ephemeral"] = ephemeral
try:
await self.put_json(
uri=uri,
json_body={"events": events},
args={"access_token": service.hs_token},
uri=uri, json_body=body, args={"access_token": service.hs_token},
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))

View File

@ -85,9 +85,8 @@ class ApplicationServiceScheduler:
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
async def submit_ephemeral_events_for_as(self, service, events):
if self.txn_ctrl.is_service_up(service):
await self.as_api.push_ephemeral(service, events)
def submit_ephemeral_events_for_as(self, service, events):
self.queuer.enqueue_ephemeral(service, events)
class _ServiceQueuer:
@ -100,6 +99,7 @@ class _ServiceQueuer:
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
self.queued_ephemeral = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
@ -115,10 +115,22 @@ class _ServiceQueuer:
return
run_as_background_process(
"as-sender-%s" % (service.id,), self._send_request, service
"as-sender-%s" % (service.id), self._send_request, service
)
async def _send_request(self, service):
def enqueue_ephemeral(self, service, events):
self.queued_ephemeral.setdefault(service.id, []).extend(events)
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
run_as_background_process(
"as-sender-%s" % (service.id), self._send_request, service
)
async def _send_request(self, service, ephemeral=None):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert service.id not in self.requests_in_flight
@ -127,10 +139,11 @@ class _ServiceQueuer:
try:
while True:
events = self.queued_events.pop(service.id, [])
if not events:
ephemeral = self.queued_ephemeral.pop(service.id, [])
if not events and not ephemeral:
return
try:
await self.txn_ctrl.send(service, events)
await self.txn_ctrl.send(service, events, ephemeral)
except Exception:
logger.exception("AS request failed")
finally:
@ -162,9 +175,9 @@ class _TransactionController:
# for UTs
self.RECOVERER_CLASS = _Recoverer
async def send(self, service, events):
async def send(self, service, events, ephemeral=None):
try:
txn = await self.store.create_appservice_txn(service=service, events=events)
txn = await self.store.create_appservice_txn(service=service, events=events, ephemeral=ephemeral)
service_is_up = await self.is_service_up(service)
if service_is_up:
sent = await txn.send(self.as_api)

View File

@ -179,39 +179,27 @@ class ApplicationServicesHandler:
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
events = []
if stream_key == "typing_key":
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# We don't persist the token for typing_key for performance reasons
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
elif stream_key == "presence_key":
events = await self._handle_as_presence(service, users)
elif stream_key == "device_list_key":
# Check if the device lists have changed for any of the users we are interested in
events = await self._handle_device_list(service, users, new_token)
elif stream_key == "to_device_key":
# Check the inbox for any users the bridge owns
events = await self._handle_to_device(service, users, new_token)
if events:
# TODO: Do in background?
await self.scheduler.submit_ephemeral_events_for_as(
service, events, new_token
)
# We don't persist the token for typing_key
if stream_key == "presence_key":
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
elif stream_key == "receipt_key":
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "to_device_key":
await self.store.set_type_stream_id_for_appservice(
service, "to_device", new_token
)
elif stream_key == "presence_key":
events = await self._handle_as_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
async def _handle_typing(self, service, new_token):
async def _handle_typing(self, service: ApplicationService, new_token: int):
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _key = await typing_source.get_new_events_as(
@ -223,7 +211,7 @@ class ApplicationServicesHandler:
)
return typing
async def _handle_receipts(self, service, token: int):
async def _handle_receipts(self, service: ApplicationService, token: int):
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
@ -233,36 +221,7 @@ class ApplicationServicesHandler:
)
return receipts
async def _handle_device_list(
self, service: ApplicationService, users: List[str], new_token: int
):
# TODO: Determine if any user have left and report those
from_token = await self.store.get_type_stream_id_for_appservice(
service, "device_list"
)
changed_user_ids = await self.store.get_device_changes_for_as(
service, from_token, new_token
)
# Return the
return {
"type": "m.device_list_update",
"content": {"changed": changed_user_ids,},
}
async def _handle_to_device(self, service, users, token):
if not any([True for u in users if service.is_interested_in_user(u)]):
return False
since_token = await self.store.get_type_stream_id_for_appservice(
service, "to_device"
)
messages, _ = await self.store.get_new_messages_for_as(
service, since_token, token
)
# This returns user_id -> device_id -> message
return messages
async def _handle_as_presence(self, service, users):
async def _handle_as_presence(self, service: ApplicationService, users: List[str]):
events = []
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(

View File

@ -172,7 +172,7 @@ class ApplicationServiceTransactionWorkerStore(
"application_services_state", {"as_id": service.id}, {"state": state}
)
async def create_appservice_txn(self, service, events):
async def create_appservice_txn(self, service, events, ephemeral=None):
"""Atomically creates a new transaction for this application service
with the given list of events.
@ -207,7 +207,9 @@ class ApplicationServiceTransactionWorkerStore(
"VALUES(?,?,?)",
(service.id, new_txn_id, event_ids),
)
return AppServiceTransaction(service=service, id=new_txn_id, events=events)
return AppServiceTransaction(
service=service, id=new_txn_id, events=events, ephemeral=ephemeral
)
return await self.db_pool.runInteraction(
"create_appservice_txn", _create_appservice_txn

View File

@ -16,7 +16,6 @@
import logging
from typing import List, Tuple
from synapse.appservice import ApplicationService
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
@ -30,40 +29,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def get_to_device_stream_token(self):
return self._device_inbox_id_gen.get_current_token()
async def get_new_messages_for_as(
self,
service: ApplicationService,
last_stream_id: int,
current_stream_id: int,
limit: int = 100,
) -> Tuple[List[dict], int]:
def get_new_messages_for_device_txn(txn):
sql = (
"SELECT stream_id, message_json, device_id, user_id FROM device_inbox"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
" LIMIT ?"
)
txn.execute(sql, (last_stream_id, current_stream_id, limit))
messages = []
for row in txn:
stream_pos = row[0]
if service.is_interested_in_user(row.user_id):
msg = db_to_json(row[1])
msg.recipient = {
"device_id": row.device_id,
"user_id": row.user_id,
}
messages.append(msg)
if len(messages) < limit:
stream_pos = current_stream_id
return messages, stream_pos
return await self.db_pool.runInteraction(
"get_new_messages_for_device", get_new_messages_for_device_txn
)
async def get_new_messages_for_device(
self,
user_id: str,

View File

@ -19,7 +19,6 @@ import logging
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api.errors import Codes, StoreError
from synapse.appservice import ApplicationService
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
@ -526,31 +525,6 @@ class DeviceWorkerStore(SQLBaseStore):
"get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
)
async def get_device_changes_for_as(
self,
service: ApplicationService,
last_stream_id: int,
current_stream_id: int,
limit: int = 100,
) -> Tuple[List[dict], int]:
def get_device_changes_for_as_txn(txn):
sql = (
"SELECT DISTINCT user_ids FROM device_lists_stream"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
" LIMIT ?"
)
txn.execute(sql, (last_stream_id, current_stream_id, limit))
rows = txn.fetchall()
users = []
for user in db_to_json(rows[0]):
if await service.is_interested_in_presence(user):
users.append(user)
return await self.db_pool.runInteraction(
"get_device_changes_for_as", get_device_changes_for_as_txn
)
async def get_users_whose_signatures_changed(
self, user_id: str, from_key: int
) -> Set[str]:

View File

@ -1,25 +0,0 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* for some reason, we have accumulated duplicate entries in
* device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
* efficient.
*/
ALTER TABLE application_services_state
ADD COLUMN device_list_stream_id INT;
ALTER TABLE application_services_state
ADD COLUMN device_message_stream_id INT;