Remove import loop

pull/7024/head
Erik Johnston 2020-03-24 11:47:57 +00:00
parent 3204b0e79f
commit 2380e401e4
4 changed files with 22 additions and 8 deletions

View File

@ -47,9 +47,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
def __init__(self, hs): def __init__(self, hs):
super().__init__(hs) super().__init__(hs)
from synapse.replication.tcp.streams import STREAMS_MAP # We pull the streams from the replication steamer (if we try and make
# them ourselves we end up in an import loop).
self.streams = {stream.NAME: stream(hs) for stream in STREAMS_MAP.values()} self.streams = hs.get_replication_streamer().get_streams()
@staticmethod @staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit): def _serialize_payload(stream_name, from_token, upto_token, limit):

View File

@ -79,11 +79,15 @@ from synapse.replication.tcp.commands import (
UserSyncCommand, UserSyncCommand,
) )
from synapse.replication.tcp.streams import STREAMS_MAP, Stream from synapse.replication.tcp.streams import STREAMS_MAP, Stream
from synapse.server import HomeServer
from synapse.types import Collection from synapse.types import Collection
from synapse.util import Clock from synapse.util import Clock
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
MYPY = False
if MYPY:
from synapse.server import HomeServer
connection_close_counter = Counter( connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"] "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
) )
@ -539,7 +543,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
def __init__( def __init__(
self, self,
hs: HomeServer, hs: "HomeServer",
client_name: str, client_name: str,
server_name: str, server_name: str,
clock: Clock, clock: Clock,

View File

@ -17,7 +17,7 @@
import logging import logging
import random import random
from typing import Any, List from typing import Any, Dict, List
from six import itervalues from six import itervalues
@ -30,7 +30,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol from .protocol import ServerReplicationStreamProtocol
from .streams import STREAMS_MAP from .streams import STREAMS_MAP, Stream
from .streams.federation import FederationStream from .streams.federation import FederationStream
stream_updates_counter = Counter( stream_updates_counter = Counter(
@ -52,7 +52,7 @@ class ReplicationStreamProtocolFactory(Factory):
""" """
def __init__(self, hs): def __init__(self, hs):
self.streamer = ReplicationStreamer(hs) self.streamer = hs.get_replication_streamer()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.server_name = hs.config.server_name self.server_name = hs.config.server_name
@ -133,6 +133,11 @@ class ReplicationStreamer(object):
for conn in self.connections: for conn in self.connections:
conn.send_error("server shutting down") conn.send_error("server shutting down")
def get_streams(self) -> Dict[str, Stream]:
"""Get a mapp from stream name to stream instance.
"""
return self.streams_by_name
def on_notifier_poke(self): def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the """Checks if there is actually any new data and sends it to the
connections if there are. connections if there are.

View File

@ -85,6 +85,7 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier from synapse.notifier import Notifier
from synapse.push.action_generator import ActionGenerator from synapse.push.action_generator import ActionGenerator
from synapse.push.pusherpool import PusherPool from synapse.push.pusherpool import PusherPool
from synapse.replication.tcp.resource import ReplicationStreamer
from synapse.rest.media.v1.media_repository import ( from synapse.rest.media.v1.media_repository import (
MediaRepository, MediaRepository,
MediaRepositoryResource, MediaRepositoryResource,
@ -199,6 +200,7 @@ class HomeServer(object):
"saml_handler", "saml_handler",
"event_client_serializer", "event_client_serializer",
"storage", "storage",
"replication_streamer",
] ]
REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"] REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
@ -536,6 +538,9 @@ class HomeServer(object):
def build_storage(self) -> Storage: def build_storage(self) -> Storage:
return Storage(self, self.datastores) return Storage(self, self.datastores)
def build_replication_streamer(self) -> ReplicationStreamer:
return ReplicationStreamer(self)
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)