Send `USER_IP` commands on a different Redis channel, in order to reduce traffic to workers that do not process these commands. (#12809)
parent
10280fc943
commit
39dee30f01
|
@ -0,0 +1 @@
|
||||||
|
Send `USER_IP` commands on a different Redis channel, in order to reduce traffic to workers that do not process these commands.
|
|
@ -1 +0,0 @@
|
||||||
Lay some foundation work to allow workers to only subscribe to some kinds of messages, reducing replication traffic.
|
|
|
@ -0,0 +1 @@
|
||||||
|
Send `USER_IP` commands on a different Redis channel, in order to reduce traffic to workers that do not process these commands.
|
|
@ -58,6 +58,15 @@ class Command(metaclass=abc.ABCMeta):
|
||||||
# by default, we just use the command name.
|
# by default, we just use the command name.
|
||||||
return self.NAME
|
return self.NAME
|
||||||
|
|
||||||
|
def redis_channel_name(self, prefix: str) -> str:
|
||||||
|
"""
|
||||||
|
Returns the Redis channel name upon which to publish this command.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
prefix: The prefix for the channel.
|
||||||
|
"""
|
||||||
|
return prefix
|
||||||
|
|
||||||
|
|
||||||
SC = TypeVar("SC", bound="_SimpleCommand")
|
SC = TypeVar("SC", bound="_SimpleCommand")
|
||||||
|
|
||||||
|
@ -395,6 +404,9 @@ class UserIpCommand(Command):
|
||||||
f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
|
f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def redis_channel_name(self, prefix: str) -> str:
|
||||||
|
return f"{prefix}/USER_IP"
|
||||||
|
|
||||||
|
|
||||||
class RemoteServerUpCommand(_SimpleCommand):
|
class RemoteServerUpCommand(_SimpleCommand):
|
||||||
"""Sent when a worker has detected that a remote server is no longer
|
"""Sent when a worker has detected that a remote server is no longer
|
||||||
|
|
|
@ -221,10 +221,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||||
# remote instances.
|
# remote instances.
|
||||||
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
|
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
|
||||||
|
|
||||||
|
channel_name = cmd.redis_channel_name(self.synapse_stream_prefix)
|
||||||
|
|
||||||
await make_deferred_yieldable(
|
await make_deferred_yieldable(
|
||||||
self.synapse_outbound_redis_connection.publish(
|
self.synapse_outbound_redis_connection.publish(channel_name, encoded_string)
|
||||||
self.synapse_stream_prefix, encoded_string
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue