Optionally split out the pusher into a separate process

markjh/split_pusher
Mark Haines 2016-04-14 11:20:48 +01:00
parent aa5ce4d450
commit 1209d3174e
7 changed files with 275 additions and 1 deletions

182
synapse/app/pusher.py Normal file
View File

@ -0,0 +1,182 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse
from synapse.server import HomeServer
from synapse.util.versionstring import get_version_string
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.event_push_actions import SlavedPushActionsStore
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.logcontext import (LoggingContext, preserve_fn)
from twisted.internet import reactor, defer
import sys
import logging
logger = logging.getLogger("synapse.app.pusher")
class SlaveConfig(DatabaseConfig):
def read_config(self, config):
self.replication_url = config["replication_url"]
self.server_name = config["server_name"]
self.use_insecure_ssl_client_just_for_testing_do_not_use = False
self.user_agent_suffix = None
self.start_pushers = True
def default_config(self, **kwargs):
return """\
## Slave ##
#replication_url: https://localhost:{replication_port}/_synapse/replication
report_stats: False
"""
class PusherSlaveConfig(SlaveConfig, LoggingConfig):
pass
class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedPushActionsStore
):
pass
class PusherServer(HomeServer):
def get_db_conn(self, run_new_connection=True):
# Any param beginning with cp_ is a parameter for adbapi, and should
# not be passed to the database engine.
db_params = {
k: v for k, v in self.db_config.get("args", {}).items()
if not k.startswith("cp_")
}
db_conn = self.database_engine.module.connect(**db_params)
if run_new_connection:
self.database_engine.on_new_connection(db_conn)
return db_conn
def setup(self):
logger.info("Setting up.")
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.replication_url
pusher_pool = self.get_pusherpool()
def stop_pusher(user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
pushers_for_user = pusher_pool.pushers.get(user_id, {})
pusher = pushers_for_user.pop(key, None)
if pusher is None:
return
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()
def start_pusher(user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
return pusher_pool._refresh_pusher(app_id, pushkey, user_id)
@defer.inlineCallbacks
def poke_pushers(results):
pushers_rows = set(
map(tuple, results.get("pushers", {}).get("rows", []))
)
deleted_pushers_rows = set(
map(tuple, results.get("deleted_pushers", {}).get("rows", []))
)
for row in sorted(pushers_rows | deleted_pushers_rows):
if row in deleted_pushers_rows:
user_id, app_id, pushkey = row[1:4]
stop_pusher(user_id, app_id, pushkey)
elif row in pushers_rows:
user_id = row[1]
app_id = row[5]
pushkey = row[8]
yield start_pusher(user_id, app_id, pushkey)
stream = results.get("events")
if stream:
min_stream_id = stream["rows"][0][0]
max_stream_id = stream["position"]
preserve_fn(pusher_pool.on_new_notifications)(
min_stream_id, max_stream_id
)
while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
logger.error("FNARG %r", result)
yield store.process_replication(result)
poke_pushers(result)
except:
logger.exception("Error replicating from %r", replication_url)
sleep(30)
def setup(config_options):
try:
config = PusherSlaveConfig.load_config(
"Synapse pusher", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
config.setup_logging()
database_engine = create_engine(config.database_config)
ps = PusherServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string=get_version_string("Synapse", synapse),
database_engine=database_engine,
)
ps.setup()
def start():
ps.replicate()
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
reactor.callWhenRunning(start)
return ps
if __name__ == '__main__':
with LoggingContext("main"):
ps = setup(sys.argv[1:])
reactor.run()

View File

@ -28,6 +28,7 @@ class ServerConfig(Config):
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
self.start_pushers = config.get("start_pushers", True)
self.listeners = config.get("listeners", [])

View File

@ -29,6 +29,7 @@ logger = logging.getLogger(__name__)
class PusherPool:
def __init__(self, _hs):
self.hs = _hs
self.start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.pushers = {}
@ -178,6 +179,9 @@ class PusherPool:
self._start_pushers([p])
def _start_pushers(self, pushers):
if not self.start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
try:

View File

@ -343,7 +343,7 @@ class ReplicationResource(Resource):
"app_id", "app_display_name", "device_display_name", "pushkey",
"ts", "lang", "data"
))
writer.write_header_and_rows("deleted", deleted, (
writer.write_header_and_rows("deleted_pushers", deleted, (
"position", "user_id", "app_id", "pushkey"
))

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import BaseSlavedStore
from synapse.storage import DataStore
class SlavedPushActionsStore(BaseSlavedStore):
get_unread_push_actions_for_user_in_range = (
DataStore.get_unread_push_actions_for_user_in_range.__func__
)
get_push_action_users_in_range = (
DataStore.get_push_action_users_in_range.__func__
)

View File

@ -68,6 +68,9 @@ class SlavedEventStore(BaseSlavedStore):
_get_current_state_for_key = StateStore.__dict__[
"_get_current_state_for_key"
]
get_invited_rooms_for_user = RoomMemberStore.__dict__[
"get_invited_rooms_for_user"
]
get_event = DataStore.get_event.__func__
get_current_state = DataStore.get_current_state.__func__
@ -82,6 +85,7 @@ class SlavedEventStore(BaseSlavedStore):
get_room_events_stream_for_room = (
DataStore.get_room_events_stream_for_room.__func__
)
_set_before_and_after = DataStore._set_before_and_after
_get_events = DataStore._get_events.__func__
@ -182,6 +186,7 @@ class SlavedEventStore(BaseSlavedStore):
# self._membership_stream_cache.entity_has_changed(
# event.state_key, event.internal_metadata.stream_ordering
# )
self.get_invited_rooms_for_user.invalidate((event.state_key,))
if not event.is_state():
return

View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
class SlavedPusherStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedPusherStore, self).__init__(db_conn, hs)
self._pushers_id_gen = SlavedIdTracker(
db_conn, "pushers", "id",
extra_tables=[("deleted_pushers", "stream_id")],
)
get_all_pushers = DataStore.get_all_pushers.__func__
get_pushers_by = DataStore.get_pushers_by.__func__
get_pushers_by_app_id_and_pushkey = (
DataStore.get_pushers_by_app_id_and_pushkey.__func__
)
_decode_pushers_rows = DataStore._decode_pushers_rows.__func__
def stream_positions(self):
result = super(SlavedPusherStore, self).stream_positions()
result["pushers"] = self._pushers_id_gen.get_current_token()
return result
def process_replication(self, result):
stream = result.get("pushers")
if stream:
self._pushers_id_gen.advance(stream["position"])
stream = result.get("deleted_pushers")
if stream:
self._pushers_id_gen.advance(stream["position"])
return super(SlavedPusherStore, self).process_replication(result)