Merge pull request #2067 from matrix-org/erikj/notify_on_fed

Notify on new federation traffic
pull/2078/head
Erik Johnston 2017-03-29 11:41:37 +01:00 committed by GitHub
commit c5b0bdd542
1 changed files with 7 additions and 0 deletions

View File

@ -54,6 +54,7 @@ class FederationRemoteSendQueue(object):
def __init__(self, hs): def __init__(self, hs):
self.server_name = hs.hostname self.server_name = hs.hostname
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.presence_map = {} self.presence_map = {}
self.presence_changed = sorteddict() self.presence_changed = sorteddict()
@ -186,6 +187,8 @@ class FederationRemoteSendQueue(object):
else: else:
self.edus[pos] = edu self.edus[pos] = edu
self.notifier.on_new_replication_data()
def send_presence(self, destination, states): def send_presence(self, destination, states):
"""As per TransactionQueue""" """As per TransactionQueue"""
pos = self._next_pos() pos = self._next_pos()
@ -199,16 +202,20 @@ class FederationRemoteSendQueue(object):
(destination, state.user_id) for state in states (destination, state.user_id) for state in states
] ]
self.notifier.on_new_replication_data()
def send_failure(self, failure, destination): def send_failure(self, failure, destination):
"""As per TransactionQueue""" """As per TransactionQueue"""
pos = self._next_pos() pos = self._next_pos()
self.failures[pos] = (destination, str(failure)) self.failures[pos] = (destination, str(failure))
self.notifier.on_new_replication_data()
def send_device_messages(self, destination): def send_device_messages(self, destination):
"""As per TransactionQueue""" """As per TransactionQueue"""
pos = self._next_pos() pos = self._next_pos()
self.device_messages[pos] = destination self.device_messages[pos] = destination
self.notifier.on_new_replication_data()
def get_current_token(self): def get_current_token(self):
return self.pos - 1 return self.pos - 1