68 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			68 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
| import collections
 | |
| import json
 | |
| import sys
 | |
| import time
 | |
| 
 | |
| import requests
 | |
| 
 | |
| Entry = collections.namedtuple("Entry", "name position rows")
 | |
| 
 | |
| ROW_TYPES = {}
 | |
| 
 | |
| 
 | |
| def row_type_for_columns(name, column_names):
 | |
|     column_names = tuple(column_names)
 | |
|     row_type = ROW_TYPES.get((name, column_names))
 | |
|     if row_type is None:
 | |
|         row_type = collections.namedtuple(name, column_names)
 | |
|         ROW_TYPES[(name, column_names)] = row_type
 | |
|     return row_type
 | |
| 
 | |
| 
 | |
| def parse_response(content):
 | |
|     streams = json.loads(content)
 | |
|     result = {}
 | |
|     for name, value in streams.items():
 | |
|         row_type = row_type_for_columns(name, value["field_names"])
 | |
|         position = value["position"]
 | |
|         rows = [row_type(*row) for row in value["rows"]]
 | |
|         result[name] = Entry(name, position, rows)
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def replicate(server, streams):
 | |
|     return parse_response(
 | |
|         requests.get(
 | |
|             server + "/_synapse/replication", verify=False, params=streams
 | |
|         ).content
 | |
|     )
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     server = sys.argv[1]
 | |
| 
 | |
|     streams = None
 | |
|     while not streams:
 | |
|         try:
 | |
|             streams = {
 | |
|                 row.name: row.position
 | |
|                 for row in replicate(server, {"streams": "-1"})["streams"].rows
 | |
|             }
 | |
|         except requests.exceptions.ConnectionError:
 | |
|             time.sleep(0.1)
 | |
| 
 | |
|     while True:
 | |
|         try:
 | |
|             results = replicate(server, streams)
 | |
|         except Exception:
 | |
|             sys.stdout.write("connection_lost(" + repr(streams) + ")\n")
 | |
|             break
 | |
|         for update in results.values():
 | |
|             for row in update.rows:
 | |
|                 sys.stdout.write(repr(row) + "\n")
 | |
|             streams[update.name] = update.position
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 |