Fix types / add docstrings

hs/push-reports-to-as
Will Hunt 2020-10-08 15:05:09 +01:00
parent 6c843884e8
commit d0dd953c27
8 changed files with 95 additions and 66 deletions

View File

@ -14,14 +14,15 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re import re
from typing import TYPE_CHECKING, List from typing import TYPE_CHECKING, List, Optional
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.events import EventBase from synapse.events import EventBase
from synapse.types import GroupID, UserID, get_domain_from_id from synapse.types import GroupID, JsonDict, RoomAlias, UserID, get_domain_from_id
from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.caches.descriptors import cached
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.appservice.api import ApplicationServiceApi
from synapse.storage.databases.main import DataStore from synapse.storage.databases.main import DataStore
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -131,19 +132,19 @@ class ApplicationService:
raise ValueError("Expected string for 'regex' in ns '%s'" % ns) raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
return namespaces return namespaces
def _matches_regex(self, test_string, namespace_key): def _matches_regex(self, test_string: str, namespace_key: str):
for regex_obj in self.namespaces[namespace_key]: for regex_obj in self.namespaces[namespace_key]:
if regex_obj["regex"].match(test_string): if regex_obj["regex"].match(test_string):
return regex_obj return regex_obj
return None return None
def _is_exclusive(self, ns_key, test_string): def _is_exclusive(self, ns_key: str, test_string: str):
regex_obj = self._matches_regex(test_string, ns_key) regex_obj = self._matches_regex(test_string, ns_key)
if regex_obj: if regex_obj:
return regex_obj["exclusive"] return regex_obj["exclusive"]
return False return False
async def _matches_user(self, event, store): async def _matches_user(self, event, store: "DataStore"):
if not event: if not event:
return False return False
@ -161,10 +162,16 @@ class ApplicationService:
does_match = await self.matches_user_in_member_list(event.room_id, store) does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match return does_match
@cached(num_args=1, cache_context=True) @cached(num_args=1)
async def matches_user_in_member_list( async def matches_user_in_member_list(self, room_id: str, store: "DataStore"):
self, room_id: str, store, cache_context: _CacheContext """Check if this service is interested a room based upon it's membership
):
Args:
room_id(RoomId): The room to check.
store(DataStore)
Returns:
True if this service would like to know about this room.
"""
member_list = await store.get_users_in_room(room_id) member_list = await store.get_users_in_room(room_id)
# check joined member events # check joined member events
@ -178,7 +185,7 @@ class ApplicationService:
return self.is_interested_in_room(event.room_id) return self.is_interested_in_room(event.room_id)
return False return False
async def _matches_aliases(self, event, store): async def _matches_aliases(self, event, store: "DataStore"):
if not store or not event: if not store or not event:
return False return False
@ -188,7 +195,7 @@ class ApplicationService:
return True return True
return False return False
async def is_interested(self, event, store=None) -> bool: async def is_interested(self, event, store: "DataStore") -> bool:
"""Check if this service is interested in this event. """Check if this service is interested in this event.
Args: Args:
@ -209,10 +216,16 @@ class ApplicationService:
return False return False
@cached(num_args=1, cache_context=True) @cached(num_args=1)
async def is_interested_in_presence( async def is_interested_in_presence(self, user_id: UserID, store: "DataStore"):
self, user_id: UserID, store, cache_context: _CacheContext """Check if this service is interested a user's presence
):
Args:
user_id(UserID): The user to check.
store(DataStore)
Returns:
True if this service would like to know about presence for this user.
"""
# Find all the rooms the sender is in # Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()): if self.is_interested_in_user(user_id.to_string()):
return True return True
@ -224,31 +237,31 @@ class ApplicationService:
return True return True
return False return False
def is_interested_in_user(self, user_id): def is_interested_in_user(self, user_id: UserID):
return ( return (
self._matches_regex(user_id, ApplicationService.NS_USERS) self._matches_regex(user_id, ApplicationService.NS_USERS)
or user_id == self.sender or user_id == self.sender
) )
def is_interested_in_alias(self, alias): def is_interested_in_alias(self, alias: RoomAlias):
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
def is_interested_in_room(self, room_id): def is_interested_in_room(self, room_id: UserID):
return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS)) return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
def is_exclusive_user(self, user_id): def is_exclusive_user(self, user_id: UserID):
return ( return (
self._is_exclusive(ApplicationService.NS_USERS, user_id) self._is_exclusive(ApplicationService.NS_USERS, user_id)
or user_id == self.sender or user_id == self.sender
) )
def is_interested_in_protocol(self, protocol): def is_interested_in_protocol(self, protocol: str):
return protocol in self.protocols return protocol in self.protocols
def is_exclusive_alias(self, alias): def is_exclusive_alias(self, alias: str):
return self._is_exclusive(ApplicationService.NS_ALIASES, alias) return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
def is_exclusive_room(self, room_id): def is_exclusive_room(self, room_id: str):
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id) return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
def get_exclusive_user_regexes(self): def get_exclusive_user_regexes(self):
@ -261,7 +274,7 @@ class ApplicationService:
if regex_obj["exclusive"] if regex_obj["exclusive"]
] ]
def get_groups_for_user(self, user_id): def get_groups_for_user(self, user_id: str):
"""Get the groups that this user is associated with by this AS """Get the groups that this user is associated with by this AS
Args: Args:
@ -295,18 +308,18 @@ class AppServiceTransaction:
service: ApplicationService, service: ApplicationService,
id: int, id: int,
events: List[EventBase], events: List[EventBase],
ephemeral=None, ephemeral: Optional[List[JsonDict]] = None,
): ):
self.service = service self.service = service
self.id = id self.id = id
self.events = events self.events = events
self.ephemeral = ephemeral self.ephemeral = ephemeral
async def send(self, as_api) -> bool: async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface. """Sends this transaction using the provided AS API interface.
Args: Args:
as_api: The API to use to send. as_api(ApplicationServiceApi): The API to use to send.
Returns: Returns:
True if the transaction was sent. True if the transaction was sent.
""" """

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import urllib import urllib
from typing import TYPE_CHECKING, Any, List, Optional from typing import TYPE_CHECKING, List, Optional
from prometheus_client import Counter from prometheus_client import Counter
@ -206,7 +206,7 @@ class ApplicationServiceApi(SimpleHttpClient):
self, self,
service: "ApplicationService", service: "ApplicationService",
events: List[EventBase], events: List[EventBase],
ephemeral: Optional[Any] = None, ephemeral: Optional[JsonDict] = None,
txn_id: Optional[int] = None, txn_id: Optional[int] = None,
): ):
if service.url is None: if service.url is None:

View File

@ -55,6 +55,7 @@ from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.events import EventBase from synapse.events import EventBase
from synapse.logging.context import run_in_background from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -110,11 +111,8 @@ class _ServiceQueuer:
self.txn_ctrl = txn_ctrl self.txn_ctrl = txn_ctrl
self.clock = clock self.clock = clock
def enqueue(self, service, event): def _start_background_request(self, service):
self.queued_events.setdefault(service.id, []).append(event)
# start a sender for this appservice if we don't already have one # start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight: if service.id in self.requests_in_flight:
return return
@ -122,17 +120,13 @@ class _ServiceQueuer:
"as-sender-%s" % (service.id,), self._send_request, service "as-sender-%s" % (service.id,), self._send_request, service
) )
def enqueue(self, service, event):
self.queued_events.setdefault(service.id, []).append(event)
self._start_background_request(service)
def enqueue_ephemeral(self, service: ApplicationService, events: List[Any]): def enqueue_ephemeral(self, service: ApplicationService, events: List[Any]):
self.queued_ephemeral.setdefault(service.id, []).extend(events) self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
run_as_background_process(
"as-sender-%s" % (service.id,), self._send_request, service
)
async def _send_request(self, service: ApplicationService): async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender # sanity-check: we shouldn't get here if this service already has a sender
@ -183,13 +177,13 @@ class _TransactionController:
self, self,
service: ApplicationService, service: ApplicationService,
events: List[EventBase], events: List[EventBase],
ephemeral: Optional[Any] = None, ephemeral: Optional[JsonDict] = None,
): ):
try: try:
txn = await self.store.create_appservice_txn( txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral service=service, events=events, ephemeral=ephemeral
) )
service_is_up = await self.is_service_up(service) service_is_up = await self._is_service_up(service)
if service_is_up: if service_is_up:
sent = await txn.send(self.as_api) sent = await txn.send(self.as_api)
if sent: if sent:
@ -232,7 +226,7 @@ class _TransactionController:
recoverer.recover() recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers)) logger.info("Now %i active recoverers", len(self.recoverers))
async def is_service_up(self, service: ApplicationService): async def _is_service_up(self, service: ApplicationService) -> bool:
state = await self.store.get_appservice_state(service) state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None return state == ApplicationServiceState.UP or state is None

View File

@ -52,14 +52,14 @@ class ApplicationServicesHandler:
self.current_max = 0 self.current_max = 0
self.is_processing = False self.is_processing = False
async def notify_interested_services(self, current_id): async def notify_interested_services(self, current_id: int):
"""Notifies (pushes) all application services interested in this event. """Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any Pushing is done asynchronously, so this method won't block for any
prolonged length of time. prolonged length of time.
Args: Args:
current_id(int): The current maximum ID. current_id: The current maximum ID.
""" """
services = self.store.get_app_services() services = self.store.get_app_services()
if not services or not self.notify_appservices: if not services or not self.notify_appservices:
@ -169,6 +169,17 @@ class ApplicationServicesHandler:
new_token: Union[int, RoomStreamToken], new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [], users: Collection[UserID] = [],
): ):
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
This will determine which appservices
are interested in the event, and submit them.
Args:
stream_key: The stream the event came from.
new_token: The latest stream token
users: The user(s) involved with the event.
"""
services = [ services = [
service service
for service in self.store.get_app_services() for service in self.store.get_app_services()
@ -192,7 +203,7 @@ class ApplicationServicesHandler:
service, "read_receipt", new_token service, "read_receipt", new_token
) )
elif stream_key == "presence_key": elif stream_key == "presence_key":
events = await self._handle_as_presence(service, users) events = await self._handle_presence(service, users)
if events: if events:
self.scheduler.submit_ephemeral_events_for_as(service, events) self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice( await self.store.set_type_stream_id_for_appservice(
@ -211,7 +222,7 @@ class ApplicationServicesHandler:
) )
return typing return typing
async def _handle_receipts(self, service: ApplicationService, token: int): async def _handle_receipts(self, service: ApplicationService):
from_key = await self.store.get_type_stream_id_for_appservice( from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt" service, "read_receipt"
) )
@ -221,7 +232,7 @@ class ApplicationServicesHandler:
) )
return receipts return receipts
async def _handle_as_presence(self, service: ApplicationService, users: List[str]): async def _handle_presence(self, service: ApplicationService, users: List[str]):
events = [] events = []
presence_source = self.event_sources.sources["presence"] presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice( from_key = await self.store.get_type_stream_id_for_appservice(

View File

@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import List, Tuple
from synapse.appservice import ApplicationService from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler from synapse.handlers._base import BaseHandler
from synapse.types import ReadReceipt, get_domain_from_id from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
from synapse.util.async_helpers import maybe_awaitable from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -142,8 +143,15 @@ class ReceiptEventSource:
return (events, to_key) return (events, to_key)
async def get_new_events_as( async def get_new_events_as(
self, from_key: int, service: ApplicationService, **kwargs self, from_key: int, service: ApplicationService
): ) -> Tuple[List[JsonDict], int]:
"""Returns a set of new receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
"""
from_key = int(from_key) from_key = int(from_key)
to_key = self.get_current_key() to_key = self.get_current_key()

View File

@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import itertools import itertools
import logging import logging
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple

View File

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
import random import random
from collections import namedtuple from collections import namedtuple
@ -22,7 +21,7 @@ from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream from synapse.replication.tcp.streams import TypingStream
from synapse.types import UserID, get_domain_from_id from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer from synapse.util.wheel_timer import WheelTimer
@ -432,8 +431,15 @@ class TypingNotificationEventSource:
} }
async def get_new_events_as( async def get_new_events_as(
self, from_key: int, service: ApplicationService, **kwargs self, from_key: int, service: ApplicationService
): ) -> Tuple[List[JsonDict], int]:
"""Returns a set of new typing events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
"""
with Measure(self.clock, "typing.get_new_events_as"): with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key) from_key = int(from_key)
handler = self.get_typing_handler() handler = self.get_typing_handler()
@ -441,7 +447,6 @@ class TypingNotificationEventSource:
events = [] events = []
for room_id in handler._room_serials.keys(): for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key: if handler._room_serials[room_id] <= from_key:
print("Key too old")
continue continue
if not await service.matches_user_in_member_list( if not await service.matches_user_in_member_list(
room_id, handler.store room_id, handler.store

View File

@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re import re
from typing import Any, List, Optional from typing import List, Optional
from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.appservice import ApplicationService, AppServiceTransaction
from synapse.config.appservice import load_appservices from synapse.config.appservice import load_appservices
@ -23,6 +23,7 @@ from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder from synapse.util import json_encoder
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -178,16 +179,14 @@ class ApplicationServiceTransactionWorkerStore(
self, self,
service: ApplicationService, service: ApplicationService,
events: List[EventBase], events: List[EventBase],
ephemeral: Optional[Any] = None, ephemeral: Optional[JsonDict] = None,
): ) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service """Atomically creates a new transaction for this application service
with the given list of events. with the given list of events.
Args: Args:
service(ApplicationService): The service who the transaction is for. service: The service who the transaction is for.
events(list<Event>): A list of events to put in the transaction. events: A list of events to put in the transaction.
Returns:
AppServiceTransaction: A new transaction.
""" """
def _create_appservice_txn(txn): def _create_appservice_txn(txn):