Clean up _ServiceQueuer
parent
92d39126d7
commit
7321f45457
|
@ -48,9 +48,12 @@ UP & quit +---------- YES SUCCESS
|
||||||
This is all tied together by the AppServiceScheduler which DIs the required
|
This is all tied together by the AppServiceScheduler which DIs the required
|
||||||
components.
|
components.
|
||||||
"""
|
"""
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.appservice import ApplicationServiceState
|
from synapse.appservice import ApplicationServiceState
|
||||||
from twisted.internet import defer
|
from synapse.util.logcontext import preserve_fn
|
||||||
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object):
|
||||||
self.txn_ctrl = _TransactionController(
|
self.txn_ctrl = _TransactionController(
|
||||||
self.clock, self.store, self.as_api, create_recoverer
|
self.clock, self.store, self.as_api, create_recoverer
|
||||||
)
|
)
|
||||||
self.queuer = _ServiceQueuer(self.txn_ctrl)
|
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def start(self):
|
def start(self):
|
||||||
|
@ -94,38 +97,36 @@ class _ServiceQueuer(object):
|
||||||
this schedules any other events in the queue to run.
|
this schedules any other events in the queue to run.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, txn_ctrl):
|
def __init__(self, txn_ctrl, clock):
|
||||||
self.queued_events = {} # dict of {service_id: [events]}
|
self.queued_events = {} # dict of {service_id: [events]}
|
||||||
self.pending_requests = {} # dict of {service_id: Deferred}
|
self.requests_in_flight = set()
|
||||||
self.txn_ctrl = txn_ctrl
|
self.txn_ctrl = txn_ctrl
|
||||||
|
self.clock = clock
|
||||||
|
|
||||||
def enqueue(self, service, event):
|
def enqueue(self, service, event):
|
||||||
# if this service isn't being sent something
|
# if this service isn't being sent something
|
||||||
if not self.pending_requests.get(service.id):
|
self.queued_events.setdefault(service.id, []).append(event)
|
||||||
self._send_request(service, [event])
|
preserve_fn(self._send_request)(service)
|
||||||
else:
|
|
||||||
# add to queue for this service
|
|
||||||
if service.id not in self.queued_events:
|
|
||||||
self.queued_events[service.id] = []
|
|
||||||
self.queued_events[service.id].append(event)
|
|
||||||
|
|
||||||
def _send_request(self, service, events):
|
@defer.inlineCallbacks
|
||||||
# send request and add callbacks
|
def _send_request(self, service):
|
||||||
d = self.txn_ctrl.send(service, events)
|
if service.id in self.requests_in_flight:
|
||||||
d.addBoth(self._on_request_finish)
|
return
|
||||||
d.addErrback(self._on_request_fail)
|
|
||||||
self.pending_requests[service.id] = d
|
|
||||||
|
|
||||||
def _on_request_finish(self, service):
|
with Measure(self.clock, "_ServiceQueuer._send_request"):
|
||||||
self.pending_requests[service.id] = None
|
self.requests_in_flight.add(service.id)
|
||||||
# if there are queued events, then send them.
|
try:
|
||||||
if (service.id in self.queued_events
|
while True:
|
||||||
and len(self.queued_events[service.id]) > 0):
|
events = self.queued_events.pop(service.id, [])
|
||||||
self._send_request(service, self.queued_events[service.id])
|
if not events:
|
||||||
self.queued_events[service.id] = []
|
return
|
||||||
|
|
||||||
def _on_request_fail(self, err):
|
try:
|
||||||
logger.error("AS request failed: %s", err)
|
yield self.txn_ctrl.send(service, events)
|
||||||
|
except:
|
||||||
|
logger.exception("AS request failed")
|
||||||
|
finally:
|
||||||
|
self.requests_in_flight.discard(service.id)
|
||||||
|
|
||||||
|
|
||||||
class _TransactionController(object):
|
class _TransactionController(object):
|
||||||
|
@ -155,8 +156,6 @@ class _TransactionController(object):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(e)
|
logger.exception(e)
|
||||||
self._start_recoverer(service)
|
self._start_recoverer(service)
|
||||||
# request has finished
|
|
||||||
defer.returnValue(service)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_recovered(self, recoverer):
|
def on_recovered(self, recoverer):
|
||||||
|
|
|
@ -193,7 +193,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.txn_ctrl = Mock()
|
self.txn_ctrl = Mock()
|
||||||
self.queuer = _ServiceQueuer(self.txn_ctrl)
|
self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock())
|
||||||
|
|
||||||
def test_send_single_event_no_queue(self):
|
def test_send_single_event_no_queue(self):
|
||||||
# Expect the event to be sent immediately.
|
# Expect the event to be sent immediately.
|
||||||
|
|
Loading…
Reference in New Issue