Compare commits
4 Commits
7fe855498b
...
c30c38cb6a
Author | SHA1 | Date |
---|---|---|
Will Hunt | c30c38cb6a | |
Will Hunt | de469f56b1 | |
Will Hunt | b8ff93eec7 | |
Will Hunt | 218e22ae56 |
|
@ -17,7 +17,6 @@ import re
|
|||
from typing import TYPE_CHECKING, List
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.appservice.api import ApplicationServiceApi
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import GroupID, UserID, get_domain_from_id
|
||||
from synapse.util.caches.descriptors import _CacheContext, cached
|
||||
|
@ -164,7 +163,7 @@ class ApplicationService:
|
|||
|
||||
@cached(num_args=1, cache_context=True)
|
||||
async def matches_user_in_member_list(
|
||||
self, room_id: str, store: DataStore, cache_context: _CacheContext
|
||||
self, room_id: str, store, cache_context: _CacheContext
|
||||
):
|
||||
member_list = await store.get_users_in_room(room_id)
|
||||
|
||||
|
@ -212,7 +211,7 @@ class ApplicationService:
|
|||
|
||||
@cached(num_args=1, cache_context=True)
|
||||
async def is_interested_in_presence(
|
||||
self, user_id: UserID, store: DataStore, cache_context: _CacheContext
|
||||
self, user_id: UserID, store, cache_context: _CacheContext
|
||||
):
|
||||
# Find all the rooms the sender is in
|
||||
if self.is_interested_in_user(user_id.to_string()):
|
||||
|
@ -303,7 +302,7 @@ class AppServiceTransaction:
|
|||
self.events = events
|
||||
self.ephemeral = ephemeral
|
||||
|
||||
async def send(self, as_api: ApplicationServiceApi) -> bool:
|
||||
async def send(self, as_api) -> bool:
|
||||
"""Sends this transaction using the provided AS API interface.
|
||||
|
||||
Args:
|
||||
|
|
|
@ -14,19 +14,21 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
import urllib
|
||||
from typing import Any, List, Optional
|
||||
from typing import TYPE_CHECKING, Any, List, Optional
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from synapse.api.constants import EventTypes, ThirdPartyEntityKind
|
||||
from synapse.api.errors import CodeMessageException
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.types import JsonDict, ThirdPartyInstanceID
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.appservice import ApplicationService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_transactions_counter = Counter(
|
||||
|
@ -202,7 +204,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
|||
|
||||
async def push_bulk(
|
||||
self,
|
||||
service: ApplicationService,
|
||||
service: "ApplicationService",
|
||||
events: List[EventBase],
|
||||
ephemeral: Optional[Any] = None,
|
||||
txn_id: Optional[int] = None,
|
||||
|
|
|
@ -134,9 +134,7 @@ class _ServiceQueuer:
|
|||
"as-sender-%s" % (service.id), self._send_request, service
|
||||
)
|
||||
|
||||
async def _send_request(
|
||||
self, service: ApplicationService, ephemeral: Optional[Any] = None
|
||||
):
|
||||
async def _send_request(self, service: ApplicationService):
|
||||
# sanity-check: we shouldn't get here if this service already has a sender
|
||||
# running.
|
||||
assert service.id not in self.requests_in_flight
|
||||
|
@ -145,7 +143,7 @@ class _ServiceQueuer:
|
|||
try:
|
||||
while True:
|
||||
events = self.queued_events.pop(service.id, [])
|
||||
ephemeral = self.queued_ephemeral.pop(service.id, [])
|
||||
ephemeral = self.queued_ephemeral.pop(service.id, None)
|
||||
if not events and not ephemeral:
|
||||
return
|
||||
try:
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Collection, List, Union
|
||||
from typing import List, Union
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
|
@ -30,7 +30,7 @@ from synapse.metrics import (
|
|||
event_processing_loop_room_count,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import RoomStreamToken, UserID
|
||||
from synapse.types import Collection, RoomStreamToken, UserID
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
|
@ -60,7 +60,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
|
|||
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
|
||||
|
||||
self.store.create_appservice_txn.assert_called_once_with(
|
||||
service=service, events=events # txn made and saved
|
||||
service=service, events=events, ephemeral=None # txn made and saved
|
||||
)
|
||||
self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made
|
||||
txn.complete.assert_called_once_with(self.store) # txn completed
|
||||
|
@ -81,7 +81,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
|
|||
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
|
||||
|
||||
self.store.create_appservice_txn.assert_called_once_with(
|
||||
service=service, events=events # txn made and saved
|
||||
service=service, events=events, ephemeral=None # txn made and saved
|
||||
)
|
||||
self.assertEquals(0, txn.send.call_count) # txn not sent though
|
||||
self.assertEquals(0, txn.complete.call_count) # or completed
|
||||
|
@ -106,7 +106,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
|
|||
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
|
||||
|
||||
self.store.create_appservice_txn.assert_called_once_with(
|
||||
service=service, events=events
|
||||
service=service, events=events, ephemeral=None
|
||||
)
|
||||
self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made
|
||||
self.assertEquals(1, self.recoverer.recover.call_count) # and invoked
|
||||
|
@ -203,7 +203,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
|
|||
service = Mock(id=4)
|
||||
event = Mock()
|
||||
self.queuer.enqueue(service, event)
|
||||
self.txn_ctrl.send.assert_called_once_with(service, [event])
|
||||
self.txn_ctrl.send.assert_called_once_with(service, [event], None)
|
||||
|
||||
def test_send_single_event_with_queue(self):
|
||||
d = defer.Deferred()
|
||||
|
@ -217,11 +217,11 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
|
|||
# Send more events: expect send() to NOT be called multiple times.
|
||||
self.queuer.enqueue(service, event2)
|
||||
self.queuer.enqueue(service, event3)
|
||||
self.txn_ctrl.send.assert_called_with(service, [event])
|
||||
self.txn_ctrl.send.assert_called_with(service, [event], None)
|
||||
self.assertEquals(1, self.txn_ctrl.send.call_count)
|
||||
# Resolve the send event: expect the queued events to be sent
|
||||
d.callback(service)
|
||||
self.txn_ctrl.send.assert_called_with(service, [event2, event3])
|
||||
self.txn_ctrl.send.assert_called_with(service, [event2, event3], None)
|
||||
self.assertEquals(2, self.txn_ctrl.send.call_count)
|
||||
|
||||
def test_multiple_service_queues(self):
|
||||
|
@ -247,13 +247,13 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
|
|||
# send events for different ASes and make sure they are sent
|
||||
self.queuer.enqueue(srv1, srv_1_event)
|
||||
self.queuer.enqueue(srv1, srv_1_event2)
|
||||
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event])
|
||||
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], None)
|
||||
self.queuer.enqueue(srv2, srv_2_event)
|
||||
self.queuer.enqueue(srv2, srv_2_event2)
|
||||
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event])
|
||||
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], None)
|
||||
|
||||
# make sure callbacks for a service only send queued events for THAT
|
||||
# service
|
||||
srv_2_defer.callback(srv2)
|
||||
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2])
|
||||
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], None)
|
||||
self.assertEquals(3, self.txn_ctrl.send.call_count)
|
||||
|
|
Loading…
Reference in New Issue