Merge pull request #780 from matrix-org/dbkr/email_notifs_on_pusher

Make email notifs work on the pusher synapse
pull/785/head
Mark Haines 2016-05-13 17:27:46 +01:00
commit 077468f6a9
3 changed files with 43 additions and 12 deletions

View File

@ -23,10 +23,11 @@ from synapse.config.logger import LoggingConfig
from synapse.config.emailconfig import EmailConfig from synapse.config.emailconfig import EmailConfig
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.storage.state import StateStore from synapse.storage.roommember import RoomMemberStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.util.async import sleep from synapse.util.async import sleep
@ -60,6 +61,7 @@ class SlaveConfig(DatabaseConfig):
self.soft_file_limit = config.get("soft_file_limit") self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize") self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file")) self.pid_file = self.abspath(config.get("pid_file"))
self.public_baseurl = config["public_baseurl"]
def default_config(self, server_name, **kwargs): def default_config(self, server_name, **kwargs):
pid_file = self.abspath("pusher.pid") pid_file = self.abspath("pusher.pid")
@ -98,7 +100,8 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
class PusherSlaveStore( class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
SlavedAccountDataStore
): ):
update_pusher_last_stream_ordering_and_success = ( update_pusher_last_stream_ordering_and_success = (
DataStore.update_pusher_last_stream_ordering_and_success.__func__ DataStore.update_pusher_last_stream_ordering_and_success.__func__
@ -128,16 +131,13 @@ class PusherSlaveStore(
DataStore.get_profile_displayname.__func__ DataStore.get_profile_displayname.__func__
) )
get_state_groups = ( # XXX: This is a bit broken because we don't persist forgotten rooms
DataStore.get_state_groups.__func__ # in a way that they can be streamed. This means that we don't have a
) # way to invalidate the forgotten rooms cache correctly.
# For now we expire the cache every 10 minutes.
_get_state_group_for_events = ( BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
StateStore.__dict__["_get_state_group_for_events"] who_forgot_in_room = (
) RoomMemberStore.__dict__["who_forgot_in_room"]
_get_state_group_for_event = (
StateStore.__dict__["_get_state_group_for_event"]
) )
@ -219,6 +219,7 @@ class PusherServer(HomeServer):
store = self.get_datastore() store = self.get_datastore()
replication_url = self.config.replication_url replication_url = self.config.replication_url
pusher_pool = self.get_pusherpool() pusher_pool = self.get_pusherpool()
clock = self.get_clock()
def stop_pusher(user_id, app_id, pushkey): def stop_pusher(user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey) key = "%s:%s" % (app_id, pushkey)
@ -270,11 +271,21 @@ class PusherServer(HomeServer):
min_stream_id, max_stream_id, affected_room_ids min_stream_id, max_stream_id, affected_room_ids
) )
def expire_broken_caches():
store.who_forgot_in_room.invalidate_all()
next_expire_broken_caches_ms = 0
while True: while True:
try: try:
args = store.stream_positions() args = store.stream_positions()
args["timeout"] = 30000 args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args) result = yield http_client.get_json(replication_url, args=args)
now_ms = clock.time_msec()
if now_ms > next_expire_broken_caches_ms:
expire_broken_caches()
next_expire_broken_caches_ms = (
now_ms + store.BROKEN_CACHE_EXPIRY_MS
)
yield store.process_replication(result) yield store.process_replication(result)
poke_pushers(result) poke_pushers(result)
except: except:

View File

@ -397,6 +397,7 @@ class Mailer(object):
return "" return ""
serverAndMediaId = value[6:] serverAndMediaId = value[6:]
fragment = None
if '#' in serverAndMediaId: if '#' in serverAndMediaId:
(serverAndMediaId, fragment) = serverAndMediaId.split('#', 1) (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1)
fragment = "#" + fragment fragment = "#" + fragment

View File

@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = ( get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
) )
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)
_get_state_group_for_event = (
StateStore.__dict__["_get_state_group_for_event"]
)
_get_state_groups_from_groups = (
StateStore.__dict__["_get_state_groups_from_groups"]
)
_get_state_group_from_group = (
StateStore.__dict__["_get_state_group_from_group"]
)
get_unread_push_actions_for_user_in_range = ( get_unread_push_actions_for_user_in_range = (
DataStore.get_unread_push_actions_for_user_in_range.__func__ DataStore.get_unread_push_actions_for_user_in_range.__func__
@ -96,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore):
get_room_events_stream_for_room = ( get_room_events_stream_for_room = (
DataStore.get_room_events_stream_for_room.__func__ DataStore.get_room_events_stream_for_room.__func__
) )
get_events_around = DataStore.get_events_around.__func__
get_state_for_events = DataStore.get_state_for_events.__func__
get_state_groups = DataStore.get_state_groups.__func__
_set_before_and_after = DataStore._set_before_and_after _set_before_and_after = DataStore._set_before_and_after
@ -116,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
DataStore._get_rooms_for_user_where_membership_is_txn.__func__ DataStore._get_rooms_for_user_where_membership_is_txn.__func__
) )
_get_members_rows_txn = DataStore._get_members_rows_txn.__func__ _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
_get_state_for_groups = DataStore._get_state_for_groups.__func__
_get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
_get_events_around_txn = DataStore._get_events_around_txn.__func__
_get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
def stream_positions(self): def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions() result = super(SlavedEventStore, self).stream_positions()