Poke the notifier to wake up /syncs

markjh/synchrotron
Mark Haines 2016-05-31 13:23:58 +01:00
parent e50861eeb7
commit 86a746982b
3 changed files with 23 additions and 0 deletions

View File

@ -16,10 +16,12 @@
import synapse import synapse
from synapse.api.constants import EventTypes
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig from synapse.config.logger import LoggingConfig
from synapse.config.appservice import AppServiceConfig from synapse.config.appservice import AppServiceConfig
from synapse.events import FrozenEvent
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
@ -49,6 +51,7 @@ from daemonize import Daemonize
import sys import sys
import logging import logging
import contextlib import contextlib
import ujson as json
logger = logging.getLogger("synapse.app.synchrotron") logger = logging.getLogger("synapse.app.synchrotron")
@ -226,10 +229,27 @@ class SynchrotronServer(HomeServer):
store = self.get_datastore() store = self.get_datastore()
replication_url = self.config.replication_url replication_url = self.config.replication_url
clock = self.get_clock() clock = self.get_clock()
notifier = self.get_notifier()
def expire_broken_caches(): def expire_broken_caches():
store.who_forgot_in_room.invalidate_all() store.who_forgot_in_room.invalidate_all()
def notify(result):
stream = result.get("events")
if stream:
max_position = stream["position"]
for row in stream["rows"]:
position = row[0]
internal = json.loads(row[1])
event_json = json.loads(row[2])
event = FrozenEvent(event_json, internal_metadata_dict=internal)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
notifier.on_new_room_event(
event, position, max_position, extra_users
)
next_expire_broken_caches_ms = 0 next_expire_broken_caches_ms = 0
while True: while True:
try: try:
@ -244,6 +264,7 @@ class SynchrotronServer(HomeServer):
now_ms + store.BROKEN_CACHE_EXPIRY_MS now_ms + store.BROKEN_CACHE_EXPIRY_MS
) )
yield store.process_replication(result) yield store.process_replication(result)
notify(result)
except: except:
logger.exception("Error replicating from %r", replication_url) logger.exception("Error replicating from %r", replication_url)
sleep(5) sleep(5)

View File

@ -123,6 +123,7 @@ class SlavedEventStore(BaseSlavedStore):
get_room_events_stream_for_rooms = ( get_room_events_stream_for_rooms = (
DataStore.get_room_events_stream_for_rooms.__func__ DataStore.get_room_events_stream_for_rooms.__func__
) )
get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
_set_before_and_after = staticmethod(DataStore._set_before_and_after) _set_before_and_after = staticmethod(DataStore._set_before_and_after)

View File

@ -153,6 +153,7 @@ class SQLBaseStore(object):
def __init__(self, hs): def __init__(self, hs):
self.hs = hs self.hs = hs
self._db_pool = hs.get_db_pool() self._db_pool = hs.get_db_pool()
self._clock = hs.get_clock()
self._previous_txn_total_time = 0 self._previous_txn_total_time = 0
self._current_txn_total_time = 0 self._current_txn_total_time = 0