diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 920cde1dd0..e87b2b80a7 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -83,9 +83,6 @@ class CounterMetric(BaseMetric): def render(self): return map_concat(self.render_item, sorted(self.counts.keys())) - def unregister_counter(self, *values): - self.counts.pop(values, None) - class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 95ea256e77..d4d672aafe 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -61,6 +61,7 @@ from commands import ( from streams import STREAMS_MAP from synapse.util.stringutils import random_string +from synapse.metrics.metric import CounterMetric import logging import synapse.metrics @@ -70,12 +71,6 @@ import fcntl metrics = synapse.metrics.get_metrics_for(__name__) -inbound_commands_counter = metrics.register_counter( - "inbound_commands", labels=["command", "name", "conn_id"], -) -outbound_commands_counter = metrics.register_counter( - "outbound_commands", labels=["command", "name", "conn_id"], -) connection_close_counter = metrics.register_counter( "close_reason", labels=["reason_type"], ) @@ -139,6 +134,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # The LoopingCall for sending pings. self._send_ping_loop = None + self.inbound_commands_counter = CounterMetric( + "inbound_commands", labels=["command"], + ) + self.outbound_commands_counter = CounterMetric( + "outbound_commands", labels=["command"], + ) + def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -197,7 +199,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - inbound_commands_counter.inc(cmd_name, self.name, self.conn_id) + self.inbound_commands_counter.inc(cmd_name) cmd_cls = COMMAND_MAP[cmd_name] try: @@ -246,7 +248,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id) + self.outbound_commands_counter.inc(cmd.NAME) string = "%s %s" % (cmd.NAME, cmd.to_line(),) if "\n" in string: @@ -334,14 +336,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.state = ConnectionStates.CLOSED self.pending_commands = [] - for cmd in COMMAND_MAP: - outbound_commands_counter.unregister_counter( - cmd, self.name, self.conn_id - ) - inbound_commands_counter.unregister_counter( - cmd, self.name, self.conn_id - ) - if self.transport: self.transport.unregisterProducer() @@ -620,3 +614,24 @@ metrics.register_callback( }, labels=["name", "conn_id"], ) + + +metrics.register_callback( + "inbound_commands", + lambda: { + (k[0], p.name, p.conn_id): count + for p in connected_connections + for k, count in p.inbound_commands_counter.counts.iteritems() + }, + labels=["command", "name", "conn_id"], +) + +metrics.register_callback( + "outbound_commands", + lambda: { + (k[0], p.name, p.conn_id): count + for p in connected_connections + for k, count in p.outbound_commands_counter.counts.iteritems() + }, + labels=["command", "name", "conn_id"], +)