diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 27271e468d..19fe8e11e8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -16,17 +16,11 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: - ___________ - \O/ --- event -->| | +--------------+ - | - event ---->| event_pool|<-- poll 1/s for events ---| EventSorter | - / \ ---- event ->|___________| +--------------+ - USERS ____________________________| - | | | - V V V - ASa ASb ASc - [e,e] [e] [e,e,e] - | - V + _________ +---ASa[e]-->| Event | +----ASb[e]->| Grouper |<-poll 1/s--+ +--ASa[e]--->|_________| | ASa[e,e] ASb[e] + V -````````- +------------+ |````````|<--StoreTxn-|Transaction | |Database| | Controller |---> SEND TO AS @@ -43,11 +37,11 @@ Recoverer attempts to recover ASes who have died. The flow for this looks like: V | START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE backoff DB and try to send it - ^ |__________ -Mark AS as | V -UP & quit +---------- YES SUCCESS - | | | - NO <--- Have more txns? <------ Mark txn success & nuke -+ + ^ |___________ +Mark AS as | V +UP & quit +---------- YES SUCCESS + | | | + NO <--- Have more txns? <------ Mark txn success & nuke <-+ from db; incr AS pos. Reset backoff. @@ -62,24 +56,28 @@ class AppServiceScheduler(object): case is a simple array. """ - def __init__(self, store, as_api, services): - self.app_services = services - self.event_pool = [] + def __init__(self, clock, store, as_api): + self.clock = clock + self.store = store + self.as_api = as_api + self.event_grouper = _EventGrouper() - def create_recoverer(service): - return _Recoverer(store, as_api, service) - self.txn_ctrl = _TransactionController(store, as_api, create_recoverer) + def create_recoverer(service, callback): + return _Recoverer(clock, store, as_api, service, callback) - self.event_sorter = _EventSorter(self, self.txn_ctrl, services) + self.txn_ctrl = _TransactionController( + clock, store, as_api, self.event_grouper, create_recoverer + ) def start(self): - self.event_sorter.start_polling() + # check for any DOWN ASes and start recoverers for them. + _Recoverer.start( + self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + ) + self.txn_ctrl.start_polling() - def store_event(self, event): # event_pool - self.event_pool.append(event) - - def drain_events(self): # event_pool - return self.event_pool + def submit_event_for_as(self, service, event): + self.event_grouper.on_receive(service, event) class AppServiceTransaction(object): @@ -99,71 +97,99 @@ class AppServiceTransaction(object): pass -class _EventSorter(object): +class _EventGrouper(object): + """Groups events for the same application service together. + """ - def __init__(self, event_pool, txn_ctrl, services): - self.event_pool = event_pool - self.txn_ctrl = txn_ctrl - self.services = services + def __init__(self): + self.groups = {} # dict of {service: [events]} - def start_polling(self): - events = self.event_pool.drain_events() - if events: - self._process(events) - # TODO repoll later on - - def _process(self, events): - # TODO sort events - # TODO fe (AS, events) => poke transaction controller on_receive_events + def on_receive(self, service, event): + # TODO group this pass + def drain_groups(self): + return self.groups + class _TransactionController(object): - def __init__(self, store, as_api, recoverer_fn): + def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): + self.clock = clock self.store = store self.as_api = as_api + self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn - def on_receive_events(self, service, events): - txn = self._store_txn(service, events) - if txn.send(self.as_api): - txn.complete(self.store) - else: - self._start_recoverer(service) + def start_polling(self): + groups = self.event_grouper.drain_groups() + for service in groups: + txn_id = self._get_next_txn_id(service) + txn = AppServiceTransaction(service, txn_id, groups[service]) + self._store_txn(txn) + if self._is_service_up(service): + if txn.send(self.as_api): + txn.complete(self.store) + else: + # TODO mark AS as down + self._start_recoverer(service) + self.clock.call_later(1000, self.start_polling) + + + def on_recovered(self, service): + # TODO mark AS as UP + pass def _start_recoverer(self, service): - recoverer = self.recoverer_fn(service) + recoverer = self.recoverer_fn(service, self.on_recovered) recoverer.recover() - def _store_txn(self, service, events): - pass # returns AppServiceTransaction + def _is_service_up(self, service): + pass + + def _get_next_txn_id(self, service): + pass # TODO work out the next txn_id for this service + + def _store_txn(self, txn): + pass class _Recoverer(object): - def __init__(self, store, as_api, service): + @staticmethod + def start(clock, store, as_api, callback): + # TODO check for DOWN ASes and init recoverers + pass + + def __init__(self, clock, store, as_api, service, callback): + self.clock = clock self.store = store self.as_api = as_api self.service = service + self.callback = callback self.backoff_counter = 1 def recover(self): - # TODO wait a bit + self.clock.call_later(2000 ** self.backoff_counter, self.retry) + + def retry(self): txn = self._get_oldest_txn() if txn: if txn.send(self.as_api): txn.complete(self.store) + # reset the backoff counter and retry immediately self.backoff_counter = 1 + self.retry() + return else: self.backoff_counter += 1 - self.recover(self.service) + self.recover() return else: - self._set_service_recovered(self.service) + self._set_service_recovered() - def _set_service_recovered(self, service): - pass + def _set_service_recovered(self): + self.callback(self.service) def _get_oldest_txn(self): pass # returns AppServiceTransaction