Appservice API changes

hs/super-wip-edus-down-sync
Will Hunt 2020-09-21 15:09:31 +01:00
parent babc027543
commit 78911ca46a
4 changed files with 35 additions and 4 deletions

View File

@ -91,6 +91,7 @@ class ApplicationService:
protocols=None,
rate_limited=True,
ip_range_whitelist=None,
supports_ephemeral=False,
):
self.token = token
self.url = (
@ -102,6 +103,7 @@ class ApplicationService:
self.namespaces = self._check_namespaces(namespaces)
self.id = id
self.ip_range_whitelist = ip_range_whitelist
self.supports_ephemeral = supports_ephemeral
if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
@ -188,11 +190,11 @@ class ApplicationService:
if not store:
return False
does_match = await self._matches_user_in_member_list(event.room_id, store)
does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match
@cached(num_args=1, cache_context=True)
async def _matches_user_in_member_list(self, room_id, store, cache_context):
async def matches_user_in_member_list(self, room_id, store, cache_context):
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)

View File

@ -201,6 +201,28 @@ class ApplicationServiceApi(SimpleHttpClient):
key = (service.id, protocol)
return await self.protocol_meta_cache.wrap(key, _get)
async def push_ephemeral(self, service, events):
if service.url is None:
return True
if service.supports_ephemeral is False:
return True
uri = service.url + (
"%s/uk.half-shot.appservice/ephemeral" % APP_SERVICE_PREFIX
)
try:
await self.put_json(
uri=uri,
json_body={"events": events},
args={"access_token": service.hs_token},
)
return True
except CodeMessageException as e:
logger.warning("push_ephemeral to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_ephemeral to %s threw exception %s", uri, ex)
return False
async def push_bulk(self, service, events, txn_id=None):
if service.url is None:
return True

View File

@ -85,6 +85,10 @@ class ApplicationServiceScheduler:
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
async def submit_ephemeral_events_for_as(self, service, events):
if self.txn_ctrl.is_service_up(service):
await self.as_api.push_ephemeral(service, events)
class _ServiceQueuer:
"""Queue of events waiting to be sent to appservices.
@ -161,7 +165,7 @@ class _TransactionController:
async def send(self, service, events):
try:
txn = await self.store.create_appservice_txn(service=service, events=events)
service_is_up = await self._is_service_up(service)
service_is_up = await self.is_service_up(service)
if service_is_up:
sent = await txn.send(self.as_api)
if sent:
@ -204,7 +208,7 @@ class _TransactionController:
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
async def _is_service_up(self, service):
async def is_service_up(self, service):
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None

View File

@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename):
if as_info.get("ip_range_whitelist"):
ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist"))
supports_ephemeral = as_info.get("uk.half-shot.appservice.push_ephemeral", False)
return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename):
hs_token=as_info["hs_token"],
sender=user_id,
id=as_info["id"],
supports_ephemeral=supports_ephemeral,
protocols=protocols,
rate_limited=rate_limited,
ip_range_whitelist=ip_range_whitelist,