258 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Markdown
		
	
	
			
		
		
	
	
			258 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Markdown
		
	
	
# TCP Replication
 | 
						|
 | 
						|
## Motivation
 | 
						|
 | 
						|
Previously the workers used an HTTP long poll mechanism to get updates
 | 
						|
from the master, which had the problem of causing a lot of duplicate
 | 
						|
work on the server. This TCP protocol replaces those APIs with the aim
 | 
						|
of increased efficiency.
 | 
						|
 | 
						|
## Overview
 | 
						|
 | 
						|
The protocol is based on fire and forget, line based commands. An
 | 
						|
example flow would be (where '>' indicates master to worker and
 | 
						|
'<' worker to master flows):
 | 
						|
 | 
						|
    > SERVER example.com
 | 
						|
    < REPLICATE
 | 
						|
    > POSITION events master 53 53
 | 
						|
    > RDATA events master 54 ["$foo1:bar.com", ...]
 | 
						|
    > RDATA events master 55 ["$foo4:bar.com", ...]
 | 
						|
 | 
						|
The example shows the server accepting a new connection and sending its identity
 | 
						|
with the `SERVER` command, followed by the client server to respond with the
 | 
						|
position of all streams. The server then periodically sends `RDATA` commands
 | 
						|
which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
 | 
						|
the format of `<row>` is defined by the individual streams. The
 | 
						|
`<instance_name>` is the name of the Synapse process that generated the data
 | 
						|
(usually "master").
 | 
						|
 | 
						|
Error reporting happens by either the client or server sending an ERROR
 | 
						|
command, and usually the connection will be closed.
 | 
						|
 | 
						|
Since the protocol is a simple line based, its possible to manually
 | 
						|
connect to the server using a tool like netcat. A few things should be
 | 
						|
noted when manually using the protocol:
 | 
						|
 | 
						|
-   The federation stream is only available if federation sending has
 | 
						|
    been disabled on the main process.
 | 
						|
-   The server will only time connections out that have sent a `PING`
 | 
						|
    command. If a ping is sent then the connection will be closed if no
 | 
						|
    further commands are receieved within 15s. Both the client and
 | 
						|
    server protocol implementations will send an initial PING on
 | 
						|
    connection and ensure at least one command every 5s is sent (not
 | 
						|
    necessarily `PING`).
 | 
						|
-   `RDATA` commands *usually* include a numeric token, however if the
 | 
						|
    stream has multiple rows to replicate per token the server will send
 | 
						|
    multiple `RDATA` commands, with all but the last having a token of
 | 
						|
    `batch`. See the documentation on `commands.RdataCommand` for
 | 
						|
    further details.
 | 
						|
 | 
						|
## Architecture
 | 
						|
 | 
						|
The basic structure of the protocol is line based, where the initial
 | 
						|
word of each line specifies the command. The rest of the line is parsed
 | 
						|
based on the command. For example, the RDATA command is defined as:
 | 
						|
 | 
						|
    RDATA <stream_name> <instance_name> <token> <row_json>
 | 
						|
 | 
						|
(Note that <row_json> may contains spaces, but cannot contain
 | 
						|
newlines.)
 | 
						|
 | 
						|
Blank lines are ignored.
 | 
						|
 | 
						|
### Keep alives
 | 
						|
 | 
						|
Both sides are expected to send at least one command every 5s or so, and
 | 
						|
should send a `PING` command if necessary. If either side do not receive
 | 
						|
a command within e.g. 15s then the connection should be closed.
 | 
						|
 | 
						|
Because the server may be connected to manually using e.g. netcat, the
 | 
						|
timeouts aren't enabled until an initial `PING` command is seen. Both
 | 
						|
the client and server implementations below send a `PING` command
 | 
						|
immediately on connection to ensure the timeouts are enabled.
 | 
						|
 | 
						|
This ensures that both sides can quickly realize if the tcp connection
 | 
						|
has gone and handle the situation appropriately.
 | 
						|
 | 
						|
### Start up
 | 
						|
 | 
						|
When a new connection is made, the server:
 | 
						|
 | 
						|
-   Sends a `SERVER` command, which includes the identity of the server,
 | 
						|
    allowing the client to detect if its connected to the expected
 | 
						|
    server
 | 
						|
-   Sends a `PING` command as above, to enable the client to time out
 | 
						|
    connections promptly.
 | 
						|
 | 
						|
The client:
 | 
						|
 | 
						|
-   Sends a `NAME` command, allowing the server to associate a human
 | 
						|
    friendly name with the connection. This is optional.
 | 
						|
-   Sends a `PING` as above
 | 
						|
-   Sends a `REPLICATE` to get the current position of all streams.
 | 
						|
-   On receipt of a `SERVER` command, checks that the server name
 | 
						|
    matches the expected server name.
 | 
						|
 | 
						|
### Error handling
 | 
						|
 | 
						|
If either side detects an error it can send an `ERROR` command and close
 | 
						|
the connection.
 | 
						|
 | 
						|
If the client side loses the connection to the server it should
 | 
						|
reconnect, following the steps above.
 | 
						|
 | 
						|
### Congestion
 | 
						|
 | 
						|
If the server sends messages faster than the client can consume them the
 | 
						|
server will first buffer a (fairly large) number of commands and then
 | 
						|
disconnect the client. This ensures that we don't queue up an unbounded
 | 
						|
number of commands in memory and gives us a potential oppurtunity to
 | 
						|
squawk loudly. When/if the client recovers it can reconnect to the
 | 
						|
server and ask for missed messages.
 | 
						|
 | 
						|
### Reliability
 | 
						|
 | 
						|
In general the replication stream should be considered an unreliable
 | 
						|
transport since e.g. commands are not resent if the connection
 | 
						|
disappears.
 | 
						|
 | 
						|
The exception to that are the replication streams, i.e. RDATA commands,
 | 
						|
since these include tokens which can be used to restart the stream on
 | 
						|
connection errors.
 | 
						|
 | 
						|
The client should keep track of the token in the last RDATA command
 | 
						|
received for each stream so that on reconneciton it can start streaming
 | 
						|
from the correct place. Note: not all RDATA have valid tokens due to
 | 
						|
batching. See `RdataCommand` for more details.
 | 
						|
 | 
						|
### Example
 | 
						|
 | 
						|
An example iteraction is shown below. Each line is prefixed with '>'
 | 
						|
or '<' to indicate which side is sending, these are *not* included on
 | 
						|
the wire:
 | 
						|
 | 
						|
    * connection established *
 | 
						|
    > SERVER localhost:8823
 | 
						|
    > PING 1490197665618
 | 
						|
    < NAME synapse.app.appservice
 | 
						|
    < PING 1490197665618
 | 
						|
    < REPLICATE
 | 
						|
    > POSITION events master 1 1
 | 
						|
    > POSITION backfill master 1 1
 | 
						|
    > POSITION caches master 1 1
 | 
						|
    > RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
 | 
						|
    > RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
 | 
						|
        "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
 | 
						|
    < PING 1490197675618
 | 
						|
    > ERROR server stopping
 | 
						|
    * connection closed by server *
 | 
						|
 | 
						|
The `POSITION` command sent by the server is used to set the clients
 | 
						|
position without needing to send data with the `RDATA` command.
 | 
						|
 | 
						|
An example of a batched set of `RDATA` is:
 | 
						|
 | 
						|
    > RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
 | 
						|
    > RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
 | 
						|
    > RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
 | 
						|
    > RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
 | 
						|
 | 
						|
In this case the client shouldn't advance their caches token until it
 | 
						|
sees the the last `RDATA`.
 | 
						|
 | 
						|
### List of commands
 | 
						|
 | 
						|
The list of valid commands, with which side can send it: server (S) or
 | 
						|
client (C):
 | 
						|
 | 
						|
#### SERVER (S)
 | 
						|
 | 
						|
   Sent at the start to identify which server the client is talking to
 | 
						|
 | 
						|
#### RDATA (S)
 | 
						|
 | 
						|
   A single update in a stream
 | 
						|
 | 
						|
#### POSITION (S)
 | 
						|
 | 
						|
   On receipt of a POSITION command clients should check if they have missed any
 | 
						|
   updates, and if so then fetch them out of band. Sent in response to a
 | 
						|
   REPLICATE command (but can happen at any time).
 | 
						|
 | 
						|
   The POSITION command includes the source of the stream. Currently all streams
 | 
						|
   are written by a single process (usually "master"). If fetching missing
 | 
						|
   updates via HTTP API, rather than via the DB, then processes should make the
 | 
						|
   request to the appropriate process.
 | 
						|
 | 
						|
   Two positions are included, the "new" position and the last position sent respectively.
 | 
						|
   This allows servers to tell instances that the positions have advanced but no
 | 
						|
   data has been written, without clients needlessly checking to see if they
 | 
						|
   have missed any updates.
 | 
						|
 | 
						|
#### ERROR (S, C)
 | 
						|
 | 
						|
   There was an error
 | 
						|
 | 
						|
#### PING (S, C)
 | 
						|
 | 
						|
   Sent periodically to ensure the connection is still alive
 | 
						|
 | 
						|
#### NAME (C)
 | 
						|
 | 
						|
   Sent at the start by client to inform the server who they are
 | 
						|
 | 
						|
#### REPLICATE (C)
 | 
						|
 | 
						|
Asks the server for the current position of all streams.
 | 
						|
 | 
						|
#### USER_SYNC (C)
 | 
						|
 | 
						|
   A user has started or stopped syncing on this process.
 | 
						|
 | 
						|
#### CLEAR_USER_SYNC (C)
 | 
						|
 | 
						|
   The server should clear all associated user sync data from the worker.
 | 
						|
 | 
						|
   This is used when a worker is shutting down.
 | 
						|
 | 
						|
#### FEDERATION_ACK (C)
 | 
						|
 | 
						|
   Acknowledge receipt of some federation data
 | 
						|
 | 
						|
### REMOTE_SERVER_UP (S, C)
 | 
						|
 | 
						|
   Inform other processes that a remote server may have come back online.
 | 
						|
 | 
						|
See `synapse/replication/tcp/commands.py` for a detailed description and
 | 
						|
the format of each command.
 | 
						|
 | 
						|
### Cache Invalidation Stream
 | 
						|
 | 
						|
The cache invalidation stream is used to inform workers when they need
 | 
						|
to invalidate any of their caches in the data store. This is done by
 | 
						|
streaming all cache invalidations done on master down to the workers,
 | 
						|
assuming that any caches on the workers also exist on the master.
 | 
						|
 | 
						|
Each individual cache invalidation results in a row being sent down
 | 
						|
replication, which includes the cache name (the name of the function)
 | 
						|
and they key to invalidate. For example:
 | 
						|
 | 
						|
    > RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
 | 
						|
 | 
						|
Alternatively, an entire cache can be invalidated by sending down a `null`
 | 
						|
instead of the key. For example:
 | 
						|
 | 
						|
    > RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252]
 | 
						|
 | 
						|
However, there are times when a number of caches need to be invalidated
 | 
						|
at the same time with the same key. To reduce traffic we batch those
 | 
						|
invalidations into a single poke by defining a special cache name that
 | 
						|
workers understand to mean to expand to invalidate the correct caches.
 | 
						|
 | 
						|
Currently the special cache names are declared in
 | 
						|
`synapse/storage/_base.py` and are:
 | 
						|
 | 
						|
1.  `cs_cache_fake` ─ invalidates caches that depend on the current
 | 
						|
    state
 |