Logcontexts for replication command handlers
Run the handlers for replication commands as background processes. This should improve the visibility in our metrics, and reduce the number of "running db transaction from sentinel context" warnings. Ideally it means converting the things that fire off deferreds into the night into things that actually return a Deferred when they are done. I've made a bit of a stab at this, but it will probably be leaky.pull/3709/head
parent
b4d6db5c4a
commit
0e8d78f6aa
|
@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler):
|
|||
super(ASReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
|
||||
if stream_name == "events":
|
||||
max_stream_id = self.store.get_room_max_stream_ordering()
|
||||
|
|
|
@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
|
|||
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.send_handler = FederationSenderHandler(hs, self)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(FederationSenderReplicationHandler, self).on_rdata(
|
||||
yield super(FederationSenderReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
self.send_handler.process_replication_rows(stream_name, token, rows)
|
||||
|
|
|
@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
|||
|
||||
self.pusher_pool = hs.get_pusherpool()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.poke_pushers, stream_name, token, rows)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -332,8 +332,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
|||
self.presence_handler = hs.get_presence_handler()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
|
|
|
@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
|
|||
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.user_directory = hs.get_user_directory_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
yield super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
if stream_name == "current_state_deltas":
|
||||
|
|
|
@ -107,7 +107,7 @@ class ReplicationClientHandler(object):
|
|||
Can be overriden in subclasses to handle more.
|
||||
"""
|
||||
logger.info("Received rdata %s -> %s", stream_name, token)
|
||||
self.store.process_replication_rows(stream_name, token, rows)
|
||||
return self.store.process_replication_rows(stream_name, token, rows)
|
||||
|
||||
def on_position(self, stream_name, token):
|
||||
"""Called when we get new position data. By default this just pokes
|
||||
|
@ -115,7 +115,7 @@ class ReplicationClientHandler(object):
|
|||
|
||||
Can be overriden in subclasses to handle more.
|
||||
"""
|
||||
self.store.process_replication_rows(stream_name, token, [])
|
||||
return self.store.process_replication_rows(stream_name, token, [])
|
||||
|
||||
def on_sync(self, data):
|
||||
"""When we received a SYNC we wake up any deferreds that were waiting
|
||||
|
|
|
@ -59,6 +59,12 @@ class Command(object):
|
|||
"""
|
||||
return self.data
|
||||
|
||||
def get_logcontext_id(self):
|
||||
"""Get a suitable string for the logcontext when processing this command"""
|
||||
|
||||
# by default, we just use the command name.
|
||||
return self.NAME
|
||||
|
||||
|
||||
class ServerCommand(Command):
|
||||
"""Sent by the server on new connection and includes the server_name.
|
||||
|
@ -116,6 +122,9 @@ class RdataCommand(Command):
|
|||
_json_encoder.encode(self.row),
|
||||
))
|
||||
|
||||
def get_logcontext_id(self):
|
||||
return "RDATA-" + self.stream_name
|
||||
|
||||
|
||||
class PositionCommand(Command):
|
||||
"""Sent by the client to tell the client the stream postition without
|
||||
|
@ -190,6 +199,9 @@ class ReplicateCommand(Command):
|
|||
def to_line(self):
|
||||
return " ".join((self.stream_name, str(self.token),))
|
||||
|
||||
def get_logcontext_id(self):
|
||||
return "REPLICATE-" + self.stream_name
|
||||
|
||||
|
||||
class UserSyncCommand(Command):
|
||||
"""Sent by the client to inform the server that a user has started or
|
||||
|
|
|
@ -63,6 +63,8 @@ from twisted.protocols.basic import LineOnlyReceiver
|
|||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
from .commands import (
|
||||
|
@ -222,7 +224,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
|
|||
|
||||
# Now lets try and call on_<CMD_NAME> function
|
||||
try:
|
||||
getattr(self, "on_%s" % (cmd_name,))(cmd)
|
||||
run_as_background_process(
|
||||
"replication-" + cmd.get_logcontext_id(),
|
||||
getattr(self, "on_%s" % (cmd_name,)),
|
||||
cmd,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("[%s] Failed to handle line: %r", self.id(), line)
|
||||
|
||||
|
@ -387,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
|||
self.name = cmd.data
|
||||
|
||||
def on_USER_SYNC(self, cmd):
|
||||
self.streamer.on_user_sync(
|
||||
return self.streamer.on_user_sync(
|
||||
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
|
||||
)
|
||||
|
||||
|
@ -397,22 +403,33 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
|||
|
||||
if stream_name == "ALL":
|
||||
# Subscribe to all streams we're publishing to.
|
||||
for stream in iterkeys(self.streamer.streams_by_name):
|
||||
self.subscribe_to_stream(stream, token)
|
||||
deferreds = [
|
||||
run_in_background(
|
||||
self.subscribe_to_stream,
|
||||
stream, token,
|
||||
)
|
||||
for stream in iterkeys(self.streamer.streams_by_name)
|
||||
]
|
||||
|
||||
return make_deferred_yieldable(
|
||||
defer.gatherResults(deferreds, consumeErrors=True)
|
||||
)
|
||||
else:
|
||||
self.subscribe_to_stream(stream_name, token)
|
||||
return self.subscribe_to_stream(stream_name, token)
|
||||
|
||||
def on_FEDERATION_ACK(self, cmd):
|
||||
self.streamer.federation_ack(cmd.token)
|
||||
return self.streamer.federation_ack(cmd.token)
|
||||
|
||||
def on_REMOVE_PUSHER(self, cmd):
|
||||
self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
|
||||
return self.streamer.on_remove_pusher(
|
||||
cmd.app_id, cmd.push_key, cmd.user_id,
|
||||
)
|
||||
|
||||
def on_INVALIDATE_CACHE(self, cmd):
|
||||
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
|
||||
return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
|
||||
|
||||
def on_USER_IP(self, cmd):
|
||||
self.streamer.on_user_ip(
|
||||
return self.streamer.on_user_ip(
|
||||
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
|
||||
cmd.last_seen,
|
||||
)
|
||||
|
@ -542,14 +559,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
|||
# Check if this is the last of a batch of updates
|
||||
rows = self.pending_batches.pop(stream_name, [])
|
||||
rows.append(row)
|
||||
|
||||
self.handler.on_rdata(stream_name, cmd.token, rows)
|
||||
return self.handler.on_rdata(stream_name, cmd.token, rows)
|
||||
|
||||
def on_POSITION(self, cmd):
|
||||
self.handler.on_position(cmd.stream_name, cmd.token)
|
||||
return self.handler.on_position(cmd.stream_name, cmd.token)
|
||||
|
||||
def on_SYNC(self, cmd):
|
||||
self.handler.on_sync(cmd.data)
|
||||
return self.handler.on_sync(cmd.data)
|
||||
|
||||
def replicate(self, stream_name, token):
|
||||
"""Send the subscription request to the server
|
||||
|
|
Loading…
Reference in New Issue