Compare commits

...

4 Commits

Author SHA1 Message Date
Will Hunt c30c38cb6a Fix 3 more tests 2020-10-01 17:36:59 +01:00
Will Hunt de469f56b1 fix collection bug 2020-10-01 17:26:03 +01:00
Will Hunt b8ff93eec7 Default to None if no ephemeral events are ready to be sent 2020-10-01 17:13:48 +01:00
Will Hunt 218e22ae56 Fix circular dependency 2020-10-01 16:23:43 +01:00
5 changed files with 21 additions and 22 deletions

View File

@ -17,7 +17,6 @@ import re
from typing import TYPE_CHECKING, List
from synapse.api.constants import EventTypes
from synapse.appservice.api import ApplicationServiceApi
from synapse.events import EventBase
from synapse.types import GroupID, UserID, get_domain_from_id
from synapse.util.caches.descriptors import _CacheContext, cached
@ -164,7 +163,7 @@ class ApplicationService:
@cached(num_args=1, cache_context=True)
async def matches_user_in_member_list(
self, room_id: str, store: DataStore, cache_context: _CacheContext
self, room_id: str, store, cache_context: _CacheContext
):
member_list = await store.get_users_in_room(room_id)
@ -212,7 +211,7 @@ class ApplicationService:
@cached(num_args=1, cache_context=True)
async def is_interested_in_presence(
self, user_id: UserID, store: DataStore, cache_context: _CacheContext
self, user_id: UserID, store, cache_context: _CacheContext
):
# Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()):
@ -303,7 +302,7 @@ class AppServiceTransaction:
self.events = events
self.ephemeral = ephemeral
async def send(self, as_api: ApplicationServiceApi) -> bool:
async def send(self, as_api) -> bool:
"""Sends this transaction using the provided AS API interface.
Args:

View File

@ -14,19 +14,21 @@
# limitations under the License.
import logging
import urllib
from typing import Any, List, Optional
from typing import TYPE_CHECKING, Any, List, Optional
from prometheus_client import Counter
from synapse.api.constants import EventTypes, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.appservice import ApplicationService
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
if TYPE_CHECKING:
from synapse.appservice import ApplicationService
logger = logging.getLogger(__name__)
sent_transactions_counter = Counter(
@ -202,7 +204,7 @@ class ApplicationServiceApi(SimpleHttpClient):
async def push_bulk(
self,
service: ApplicationService,
service: "ApplicationService",
events: List[EventBase],
ephemeral: Optional[Any] = None,
txn_id: Optional[int] = None,

View File

@ -134,9 +134,7 @@ class _ServiceQueuer:
"as-sender-%s" % (service.id), self._send_request, service
)
async def _send_request(
self, service: ApplicationService, ephemeral: Optional[Any] = None
):
async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert service.id not in self.requests_in_flight
@ -145,7 +143,7 @@ class _ServiceQueuer:
try:
while True:
events = self.queued_events.pop(service.id, [])
ephemeral = self.queued_ephemeral.pop(service.id, [])
ephemeral = self.queued_ephemeral.pop(service.id, None)
if not events and not ephemeral:
return
try:

View File

@ -14,7 +14,7 @@
# limitations under the License.
import logging
from typing import Collection, List, Union
from typing import List, Union
from prometheus_client import Counter
@ -30,7 +30,7 @@ from synapse.metrics import (
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken, UserID
from synapse.types import Collection, RoomStreamToken, UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)

View File

@ -60,7 +60,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
service=service, events=events, ephemeral=None # txn made and saved
)
self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made
txn.complete.assert_called_once_with(self.store) # txn completed
@ -81,7 +81,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
service=service, events=events, ephemeral=None # txn made and saved
)
self.assertEquals(0, txn.send.call_count) # txn not sent though
self.assertEquals(0, txn.complete.call_count) # or completed
@ -106,7 +106,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events)))
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events
service=service, events=events, ephemeral=None
)
self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made
self.assertEquals(1, self.recoverer.recover.call_count) # and invoked
@ -203,7 +203,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
service = Mock(id=4)
event = Mock()
self.queuer.enqueue(service, event)
self.txn_ctrl.send.assert_called_once_with(service, [event])
self.txn_ctrl.send.assert_called_once_with(service, [event], None)
def test_send_single_event_with_queue(self):
d = defer.Deferred()
@ -217,11 +217,11 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
# Send more events: expect send() to NOT be called multiple times.
self.queuer.enqueue(service, event2)
self.queuer.enqueue(service, event3)
self.txn_ctrl.send.assert_called_with(service, [event])
self.txn_ctrl.send.assert_called_with(service, [event], None)
self.assertEquals(1, self.txn_ctrl.send.call_count)
# Resolve the send event: expect the queued events to be sent
d.callback(service)
self.txn_ctrl.send.assert_called_with(service, [event2, event3])
self.txn_ctrl.send.assert_called_with(service, [event2, event3], None)
self.assertEquals(2, self.txn_ctrl.send.call_count)
def test_multiple_service_queues(self):
@ -247,13 +247,13 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
# send events for different ASes and make sure they are sent
self.queuer.enqueue(srv1, srv_1_event)
self.queuer.enqueue(srv1, srv_1_event2)
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event])
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], None)
self.queuer.enqueue(srv2, srv_2_event)
self.queuer.enqueue(srv2, srv_2_event2)
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event])
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], None)
# make sure callbacks for a service only send queued events for THAT
# service
srv_2_defer.callback(srv2)
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2])
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], None)
self.assertEquals(3, self.txn_ctrl.send.call_count)