diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 70edae4328..55ff31fd18 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -22,3 +22,4 @@ export SYNAPSE_CACHE_FACTOR=1 --federation-reader \ --client-reader \ --appservice \ + --federation-sender \ diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py new file mode 100644 index 0000000000..80ea4c8062 --- /dev/null +++ b/synapse/app/federation_sender.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.crypto import context_factory +from synapse.http.site import SynapseSite +from synapse.federation import send_queue +from synapse.federation.units import Edu +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + +from synapse import events + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc +import ujson as json + +logger = logging.getLogger("synapse.app.appservice") + + +class FederationSenderSlaveStore( + SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, + SlavedRegistrationStore, +): + pass + + +class FederationSenderServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse federation_sender now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + send_handler = FederationSenderHandler(self) + + send_handler.on_start() + + while True: + try: + args = store.stream_positions() + args.update((yield send_handler.stream_positions())) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + yield send_handler.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(30) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse federation sender", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.federation_sender" + + setup_logging(config.worker_log_config, config.worker_log_file) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + if config.send_federation: + sys.stderr.write( + "\nThe send_federation must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``send_federation: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.send_federation = True + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ps = FederationSenderServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.replicate() + ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-federation-sender", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +class FederationSenderHandler(object): + """Processes the replication stream and forwards the appropriate entries + to the federation sender. + """ + def __init__(self, hs): + self.store = hs.get_datastore() + self.federation_sender = hs.get_federation_sender() + + self._room_serials = {} + self._room_typing = {} + + def on_start(self): + # There may be some events that are persisted but haven't been sent, + # so send them now. + self.federation_sender.notify_new_events( + self.store.get_room_max_stream_ordering() + ) + + @defer.inlineCallbacks + def stream_positions(self): + stream_id = yield self.store.get_federation_out_pos("federation") + defer.returnValue({ + "federation": stream_id, + + # Ack stuff we've "processed", this should only be called from + # one process. + "federation_ack": stream_id, + }) + + @defer.inlineCallbacks + def process_replication(self, result): + # The federation stream contains things that we want to send out, e.g. + # presence, typing, etc. + fed_stream = result.get("federation") + if fed_stream: + latest_id = int(fed_stream["position"]) + + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. + presence_to_send = {} + keyed_edus = {} + edus = {} + failures = {} + device_destinations = set() + + # Parse the rows in the stream + for row in fed_stream["rows"]: + position, typ, content_js = row + content = json.loads(content_js) + + if typ == send_queue.PRESENCE_TYPE: + destination = content["destination"] + state = UserPresenceState.from_dict(content["state"]) + + presence_to_send.setdefault(destination, []).append(state) + elif typ == send_queue.KEYED_EDU_TYPE: + key = content["key"] + edu = Edu(**content["edu"]) + + keyed_edus.setdefault( + edu.destination, {} + )[(edu.destination, tuple(key))] = edu + elif typ == send_queue.EDU_TYPE: + edu = Edu(**content) + + edus.setdefault(edu.destination, []).append(edu) + elif typ == send_queue.FAILURE_TYPE: + destination = content["destination"] + failure = content["failure"] + + failures.setdefault(destination, []).append(failure) + elif typ == send_queue.DEVICE_MESSAGE_TYPE: + device_destinations.add(content["destination"]) + else: + raise Exception("Unrecognised federation type: %r", typ) + + # We've finished collecting, send everything off + for destination, states in presence_to_send.items(): + self.federation_sender.send_presence(destination, states) + + for destination, edu_map in keyed_edus.items(): + for key, edu in edu_map.items(): + self.federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=key, + ) + + for destination, edu_list in edus.items(): + for edu in edu_list: + self.federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=None, + ) + + for destination, failure_list in failures.items(): + for failure in failure_list: + self.federation_sender.send_failure(destination, failure) + + for destination in device_destinations: + self.federation_sender.send_device_messages(destination) + + # Record where we are in the stream. + yield self.store.update_federation_out_pos( + "federation", latest_id + ) + + # We also need to poke the federation sender when new events happen + event_stream = result.get("events") + if event_stream: + latest_pos = event_stream["position"] + self.federation_sender.notify_new_events(latest_pos) + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/server.py b/synapse/config/server.py index ed5417d0c3..634d8e6fe5 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -30,6 +30,11 @@ class ServerConfig(Config): self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") + # Whether to send federation traffic out in this process. This only + # applies to some federation traffic, and so shouldn't be used to + # "disable" federation + self.send_federation = config.get("send_federation", True) + if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': self.public_baseurl += '/' diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py index 979fdf2431..2e32d245ba 100644 --- a/synapse/federation/__init__.py +++ b/synapse/federation/__init__.py @@ -17,10 +17,9 @@ """ from .replication import ReplicationLayer -from .transport.client import TransportLayerClient -def initialize_http_replication(homeserver): - transport = TransportLayerClient(homeserver) +def initialize_http_replication(hs): + transport = hs.get_federation_transport_client() - return ReplicationLayer(homeserver, transport) + return ReplicationLayer(hs, transport) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 94e76b1978..b255709165 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -18,7 +18,6 @@ from twisted.internet import defer from .federation_base import FederationBase from synapse.api.constants import Membership -from .units import Edu from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, @@ -45,10 +44,6 @@ logger = logging.getLogger(__name__) # synapse.federation.federation_client is a silly name metrics = synapse.metrics.get_metrics_for("synapse.federation.client") -sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations") - -sent_edus_counter = metrics.register_counter("sent_edus") - sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) @@ -92,63 +87,6 @@ class FederationClient(FederationBase): self._get_pdu_cache.start() - @log_function - def send_pdu(self, pdu, destinations): - """Informs the replication layer about a new PDU generated within the - home server that should be transmitted to others. - - TODO: Figure out when we should actually resolve the deferred. - - Args: - pdu (Pdu): The new Pdu. - - Returns: - Deferred: Completes when we have successfully processed the PDU - and replicated it to any interested remote home servers. - """ - order = self._order - self._order += 1 - - sent_pdus_destination_dist.inc_by(len(destinations)) - - logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) - - # TODO, add errback, etc. - self._transaction_queue.enqueue_pdu(pdu, destinations, order) - - logger.debug( - "[%s] transaction_layer.enqueue_pdu... done", - pdu.event_id - ) - - def send_presence(self, destination, states): - if destination != self.server_name: - self._transaction_queue.enqueue_presence(destination, states) - - @log_function - def send_edu(self, destination, edu_type, content, key=None): - edu = Edu( - origin=self.server_name, - destination=destination, - edu_type=edu_type, - content=content, - ) - - sent_edus_counter.inc() - - self._transaction_queue.enqueue_edu(edu, key=key) - - @log_function - def send_device_messages(self, destination): - """Sends the device messages in the local database to the remote - destination""" - self._transaction_queue.enqueue_device_messages(destination) - - @log_function - def send_failure(self, failure, destination): - self._transaction_queue.enqueue_failure(failure, destination) - return defer.succeed(None) - @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=False): diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index ea66a5dcbc..62d865ec4b 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -20,8 +20,6 @@ a given transport. from .federation_client import FederationClient from .federation_server import FederationServer -from .transaction_queue import TransactionQueue - from .persistence import TransactionActions import logging @@ -66,9 +64,6 @@ class ReplicationLayer(FederationClient, FederationServer): self._clock = hs.get_clock() self.transaction_actions = TransactionActions(self.store) - self._transaction_queue = TransactionQueue(hs, transport_layer) - - self._order = 0 self.hs = hs diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py new file mode 100644 index 0000000000..5c9f7a86f0 --- /dev/null +++ b/synapse/federation/send_queue.py @@ -0,0 +1,298 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A federation sender that forwards things to be sent across replication to +a worker process. + +It assumes there is a single worker process feeding off of it. + +Each row in the replication stream consists of a type and some json, where the +types indicate whether they are presence, or edus, etc. + +Ephemeral or non-event data are queued up in-memory. When the worker requests +updates since a particular point, all in-memory data since before that point is +dropped. We also expire things in the queue after 5 minutes, to ensure that a +dead worker doesn't cause the queues to grow limitlessly. + +Events are replicated via a separate events stream. +""" + +from .units import Edu + +from synapse.util.metrics import Measure +import synapse.metrics + +from blist import sorteddict +import ujson + + +metrics = synapse.metrics.get_metrics_for(__name__) + + +PRESENCE_TYPE = "p" +KEYED_EDU_TYPE = "k" +EDU_TYPE = "e" +FAILURE_TYPE = "f" +DEVICE_MESSAGE_TYPE = "d" + + +class FederationRemoteSendQueue(object): + """A drop in replacement for TransactionQueue""" + + def __init__(self, hs): + self.server_name = hs.hostname + self.clock = hs.get_clock() + + self.presence_map = {} + self.presence_changed = sorteddict() + + self.keyed_edu = {} + self.keyed_edu_changed = sorteddict() + + self.edus = sorteddict() + + self.failures = sorteddict() + + self.device_messages = sorteddict() + + self.pos = 1 + self.pos_time = sorteddict() + + # EVERYTHING IS SAD. In particular, python only makes new scopes when + # we make a new function, so we need to make a new function so the inner + # lambda binds to the queue rather than to the name of the queue which + # changes. ARGH. + def register(name, queue): + metrics.register_callback( + queue_name + "_size", + lambda: len(queue), + ) + + for queue_name in [ + "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", + "edus", "failures", "device_messages", "pos_time", + ]: + register(queue_name, getattr(self, queue_name)) + + self.clock.looping_call(self._clear_queue, 30 * 1000) + + def _next_pos(self): + pos = self.pos + self.pos += 1 + self.pos_time[self.clock.time_msec()] = pos + return pos + + def _clear_queue(self): + """Clear the queues for anything older than N minutes""" + + FIVE_MINUTES_AGO = 5 * 60 * 1000 + now = self.clock.time_msec() + + keys = self.pos_time.keys() + time = keys.bisect_left(now - FIVE_MINUTES_AGO) + if not keys[:time]: + return + + position_to_delete = max(keys[:time]) + for key in keys[:time]: + del self.pos_time[key] + + self._clear_queue_before_pos(position_to_delete) + + def _clear_queue_before_pos(self, position_to_delete): + """Clear all the queues from before a given position""" + with Measure(self.clock, "send_queue._clear"): + # Delete things out of presence maps + keys = self.presence_changed.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.presence_changed[key] + + user_ids = set( + user_id for uids in self.presence_changed.values() for _, user_id in uids + ) + + to_del = [ + user_id for user_id in self.presence_map if user_id not in user_ids + ] + for user_id in to_del: + del self.presence_map[user_id] + + # Delete things out of keyed edus + keys = self.keyed_edu_changed.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.keyed_edu_changed[key] + + live_keys = set() + for edu_key in self.keyed_edu_changed.values(): + live_keys.add(edu_key) + + to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys] + for edu_key in to_del: + del self.keyed_edu[edu_key] + + # Delete things out of edu map + keys = self.edus.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.edus[key] + + # Delete things out of failure map + keys = self.failures.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.failures[key] + + # Delete things out of device map + keys = self.device_messages.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.device_messages[key] + + def notify_new_events(self, current_id): + """As per TransactionQueue""" + # We don't need to replicate this as it gets sent down a different + # stream. + pass + + def send_edu(self, destination, edu_type, content, key=None): + """As per TransactionQueue""" + pos = self._next_pos() + + edu = Edu( + origin=self.server_name, + destination=destination, + edu_type=edu_type, + content=content, + ) + + if key: + assert isinstance(key, tuple) + self.keyed_edu[(destination, key)] = edu + self.keyed_edu_changed[pos] = (destination, key) + else: + self.edus[pos] = edu + + def send_presence(self, destination, states): + """As per TransactionQueue""" + pos = self._next_pos() + + self.presence_map.update({ + state.user_id: state + for state in states + }) + + self.presence_changed[pos] = [ + (destination, state.user_id) for state in states + ] + + def send_failure(self, failure, destination): + """As per TransactionQueue""" + pos = self._next_pos() + + self.failures[pos] = (destination, str(failure)) + + def send_device_messages(self, destination): + """As per TransactionQueue""" + pos = self._next_pos() + self.device_messages[pos] = destination + + def get_current_token(self): + return self.pos - 1 + + def get_replication_rows(self, token, limit, federation_ack=None): + """ + Args: + token (int) + limit (int) + federation_ack (int): Optional. The position where the worker is + explicitly acknowledged it has handled. Allows us to drop + data from before that point + """ + # TODO: Handle limit. + + # To handle restarts where we wrap around + if token > self.pos: + token = -1 + + rows = [] + + # There should be only one reader, so lets delete everything its + # acknowledged its seen. + if federation_ack: + self._clear_queue_before_pos(federation_ack) + + # Fetch changed presence + keys = self.presence_changed.keys() + i = keys.bisect_right(token) + dest_user_ids = set( + (pos, dest_user_id) + for pos in keys[i:] + for dest_user_id in self.presence_changed[pos] + ) + + for (key, (dest, user_id)) in dest_user_ids: + rows.append((key, PRESENCE_TYPE, ujson.dumps({ + "destination": dest, + "state": self.presence_map[user_id].as_dict(), + }))) + + # Fetch changes keyed edus + keys = self.keyed_edu_changed.keys() + i = keys.bisect_right(token) + keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) + + for (pos, (destination, edu_key)) in keyed_edus: + rows.append( + (pos, KEYED_EDU_TYPE, ujson.dumps({ + "key": edu_key, + "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), + })) + ) + + # Fetch changed edus + keys = self.edus.keys() + i = keys.bisect_right(token) + edus = set((k, self.edus[k]) for k in keys[i:]) + + for (pos, edu) in edus: + rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) + + # Fetch changed failures + keys = self.failures.keys() + i = keys.bisect_right(token) + failures = set((k, self.failures[k]) for k in keys[i:]) + + for (pos, (destination, failure)) in failures: + rows.append((pos, FAILURE_TYPE, ujson.dumps({ + "destination": destination, + "failure": failure, + }))) + + # Fetch changed device messages + keys = self.device_messages.keys() + i = keys.bisect_right(token) + device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + + for (pos, destination) in device_messages: + rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + "destination": destination, + }))) + + # Sort rows based on pos + rows.sort() + + return rows diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f8ca93e4c3..c94c74a67e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn @@ -26,6 +27,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state import synapse.metrics @@ -36,6 +38,12 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) +client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client") +sent_pdus_destination_dist = client_metrics.register_distribution( + "sent_pdu_destinations" +) +sent_edus_counter = client_metrics.register_counter("sent_edus") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -44,13 +52,14 @@ class TransactionQueue(object): It batches pending PDUs into single transactions. """ - def __init__(self, hs, transport_layer): + def __init__(self, hs): self.server_name = hs.hostname self.store = hs.get_datastore() + self.state = hs.get_state_handler() self.transaction_actions = TransactionActions(self.store) - self.transport_layer = transport_layer + self.transport_layer = hs.get_federation_transport_client() self.clock = hs.get_clock() @@ -95,6 +104,11 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + self._order = 1 + + self._is_processing = False + self._last_poked_id = -1 + def can_send_to(self, destination): """Can we send messages to the given server? @@ -115,11 +129,61 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - def enqueue_pdu(self, pdu, destinations, order): + @defer.inlineCallbacks + def notify_new_events(self, current_id): + """This gets called when we have some new events we might want to + send out to other servers. + """ + self._last_poked_id = max(current_id, self._last_poked_id) + + if self._is_processing: + return + + try: + self._is_processing = True + while True: + last_token = yield self.store.get_federation_out_pos("events") + next_token, events = yield self.store.get_all_new_events_stream( + last_token, self._last_poked_id, limit=20, + ) + + logger.debug("Handling %s -> %s", last_token, next_token) + + if not events and next_token >= self._last_poked_id: + break + + for event in events: + users_in_room = yield self.state.get_current_user_in_room( + event.room_id, latest_event_ids=[event.event_id], + ) + + destinations = set( + get_domain_from_id(user_id) for user_id in users_in_room + ) + + if event.type == EventTypes.Member: + if event.content["membership"] == Membership.JOIN: + destinations.add(get_domain_from_id(event.state_key)) + + logger.debug("Sending %s to %r", event, destinations) + + self._send_pdu(event, destinations) + + yield self.store.update_federation_out_pos( + "events", next_token + ) + + finally: + self._is_processing = False + + def _send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. + order = self._order + self._order += 1 + destinations = set(destinations) destinations = set( dest for dest in destinations if self.can_send_to(dest) @@ -130,6 +194,8 @@ class TransactionQueue(object): if not destinations: return + sent_pdus_destination_dist.inc_by(len(destinations)) + for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, order) @@ -139,7 +205,10 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_presence(self, destination, states): + def send_presence(self, destination, states): + if not self.can_send_to(destination): + return + self.pending_presence_by_dest.setdefault(destination, {}).update({ state.user_id: state for state in states }) @@ -148,12 +217,19 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_edu(self, edu, key=None): - destination = edu.destination + def send_edu(self, destination, edu_type, content, key=None): + edu = Edu( + origin=self.server_name, + destination=destination, + edu_type=edu_type, + content=content, + ) if not self.can_send_to(destination): return + sent_edus_counter.inc() + if key: self.pending_edus_keyed_by_dest.setdefault( destination, {} @@ -165,7 +241,7 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_failure(self, failure, destination): + def send_failure(self, failure, destination): if destination == self.server_name or destination == "localhost": return @@ -180,7 +256,7 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_device_messages(self, destination): + def send_device_messages(self, destination): if destination == self.server_name or destination == "localhost": return @@ -191,6 +267,9 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) + def get_current_token(self): + return 0 + @defer.inlineCallbacks def _attempt_new_transaction(self, destination): # list of (pending_pdu, deferred, order) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 63d05f2531..5ad408f549 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -24,7 +24,6 @@ from .profile import ProfileHandler from .directory import DirectoryHandler from .admin import AdminHandler from .identity import IdentityHandler -from .receipts import ReceiptsHandler from .search import SearchHandler @@ -56,7 +55,6 @@ class Handlers(object): self.profile_handler = ProfileHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) - self.receipts_handler = ReceiptsHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c5368e5df2..f7fad15c62 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -34,9 +34,9 @@ class DeviceMessageHandler(object): self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_sender() - self.federation.register_edu_handler( + hs.get_replication_layer().register_edu_handler( "m.direct_to_device", self.on_direct_to_device_edu ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2d801bad47..771ab3bc43 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -80,22 +80,6 @@ class FederationHandler(BaseHandler): # When joining a room we need to queue any events for that room up self.room_queues = {} - def handle_new_event(self, event, destinations): - """ Takes in an event from the client to server side, that has already - been authed and handled by the state module, and sends it to any - remote home servers that may be interested. - - Args: - event: The event to send - destinations: A list of destinations to send it to - - Returns: - Deferred: Resolved when it has successfully been queued for - processing. - """ - - return self.replication_layer.send_pdu(event, destinations) - @log_function @defer.inlineCallbacks def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): @@ -830,25 +814,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - new_pdu = event - - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = set( - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ) - - destinations.discard(origin) - - logger.debug( - "on_send_join_request: Sending event: %s, signatures: %s", - event.event_id, - event.signatures, - ) - - self.replication_layer.send_pdu(new_pdu, destinations) - state_ids = context.prev_state_ids.values() auth_chain = yield self.store.get_auth_chain(set( [event.event_id] + state_ids @@ -1055,24 +1020,6 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - new_pdu = event - - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = set( - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ) - destinations.discard(origin) - - logger.debug( - "on_send_leave_request: Sending event: %s, signatures: %s", - event.event_id, - event.signatures, - ) - - self.replication_layer.send_pdu(new_pdu, destinations) - defer.returnValue(None) @defer.inlineCallbacks diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index fbfa5a0281..e0ade4c164 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -372,11 +372,12 @@ class InitialSyncHandler(BaseHandler): @defer.inlineCallbacks def get_receipts(): - receipts_handler = self.hs.get_handlers().receipts_handler - receipts = yield receipts_handler.get_receipts_for_room( + receipts = yield self.store.get_linearized_receipts_for_room( room_id, - now_token.receipt_key + to_key=now_token.receipt_key, ) + if not receipts: + receipts = [] defer.returnValue(receipts) presence, receipts, (messages, token) = yield defer.gatherResults( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81df45177a..fd09397226 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.push.action_generator import ActionGenerator from synapse.types import ( - UserID, RoomAlias, RoomStreamToken, get_domain_from_id + UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock from synapse.util.logcontext import preserve_fn @@ -599,13 +599,6 @@ class MessageHandler(BaseHandler): event_stream_id, max_stream_id ) - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = [ - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ] - @defer.inlineCallbacks def _notify(): yield run_on_reactor() @@ -618,7 +611,3 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - - preserve_fn(federation_handler.handle_new_event)( - event, destinations=destinations, - ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b047ae2250..1b89dc6274 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -91,28 +91,29 @@ class PresenceHandler(object): self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.federation = hs.get_replication_layer() + self.replication = hs.get_replication_layer() + self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence", self.incoming_presence ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_invite", lambda origin, content: self.invite_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_accept", lambda origin, content: self.accept_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_deny", lambda origin, content: self.deny_presence( observed_user=UserID.from_string(content["observed_user"]), diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index e536a909d0..916e80a48e 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler): self.server_name = hs.config.server_name self.store = hs.get_datastore() self.hs = hs - self.federation = hs.get_replication_layer() - self.federation.register_edu_handler( + self.federation = hs.get_federation_sender() + hs.get_replication_layer().register_edu_handler( "m.receipt", self._received_remote_receipt ) self.clock = self.hs.get_clock() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 27ee715ff0..0eea7f8f9c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -55,9 +55,9 @@ class TypingHandler(object): self.clock = hs.get_clock() self.wheel_timer = WheelTimer(bucket_size=5000) - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_sender() - self.federation.register_edu_handler("m.typing", self._recv_edu) + hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu) hs.get_distributor().observe("user_left_room", self.user_left_room) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48653ae843..054ca59ad2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -143,6 +143,12 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + + if hs.should_send_federation(): + self.federation_sender = hs.get_federation_sender() + else: + self.federation_sender = None + self.state_handler = hs.get_state_handler() self.clock.looping_call( @@ -220,6 +226,9 @@ class Notifier(object): # poke any interested application service. self.appservice_handler.notify_interested_services(room_stream_id) + if self.federation_sender: + self.federation_sender.notify_new_events(room_stream_id) + if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/replication/expire_cache.py b/synapse/replication/expire_cache.py new file mode 100644 index 0000000000..c05a50d7a6 --- /dev/null +++ b/synapse/replication/expire_cache.py @@ -0,0 +1,60 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET + + +class ExpireCacheResource(Resource): + """ + HTTP endpoint for expiring storage caches. + + POST /_synapse/replication/expire_cache HTTP/1.1 + Content-Type: application/json + + { + "invalidate": [ + { + "name": "func_name", + "keys": ["key1", "key2"] + } + ] + } + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.store = hs.get_datastore() + self.version_string = hs.version_string + self.clock = hs.get_clock() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + for row in content["invalidate"]: + name = row["name"] + keys = tuple(row["keys"]) + + getattr(self.store, name).invalidate(keys) + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 5a14c51d23..d79b421cba 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource from synapse.replication.presence_resource import PresenceResource +from synapse.replication.expire_cache import ExpireCacheResource from synapse.api.errors import SynapseError from twisted.web.resource import Resource @@ -44,6 +45,7 @@ STREAM_NAMES = ( ("caches",), ("to_device",), ("public_rooms",), + ("federation",), ) @@ -116,11 +118,14 @@ class ReplicationResource(Resource): self.sources = hs.get_event_sources() self.presence_handler = hs.get_presence_handler() self.typing_handler = hs.get_typing_handler() + self.federation_sender = hs.get_federation_sender() self.notifier = hs.notifier self.clock = hs.get_clock() + self.config = hs.get_config() self.putChild("remove_pushers", PusherResource(hs)) self.putChild("syncing_users", PresenceResource(hs)) + self.putChild("expire_cache", ExpireCacheResource(hs)) def render_GET(self, request): self._async_render_GET(request) @@ -134,6 +139,7 @@ class ReplicationResource(Resource): pushers_token = self.store.get_pushers_stream_token() caches_token = self.store.get_cache_stream_token() public_rooms_token = self.store.get_current_public_room_stream_id() + federation_token = self.federation_sender.get_current_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -148,6 +154,7 @@ class ReplicationResource(Resource): caches_token, int(stream_token.to_device_key), int(public_rooms_token), + int(federation_token), )) @request_handler() @@ -164,8 +171,13 @@ class ReplicationResource(Resource): } request_streams["streams"] = parse_string(request, "streams") + federation_ack = parse_integer(request, "federation_ack", None) + def replicate(): - return self.replicate(request_streams, limit) + return self.replicate( + request_streams, limit, + federation_ack=federation_ack + ) writer = yield self.notifier.wait_for_replication(replicate, timeout) result = writer.finish() @@ -183,7 +195,7 @@ class ReplicationResource(Resource): finish_request(request) @defer.inlineCallbacks - def replicate(self, request_streams, limit): + def replicate(self, request_streams, limit, federation_ack=None): writer = _Writer() current_token = yield self.current_replication_token() logger.debug("Replicating up to %r", current_token) @@ -202,6 +214,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) + self.federation(writer, current_token, limit, request_streams, federation_ack) self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) @@ -465,6 +478,23 @@ class ReplicationResource(Resource): "position", "room_id", "visibility" ), position=upto_token) + def federation(self, writer, current_token, limit, request_streams, federation_ack): + if self.config.send_federation: + return + + current_position = current_token.federation + + federation = request_streams.get("federation") + + if federation is not None and federation != current_position: + federation_rows = self.federation_sender.get_replication_rows( + federation, limit, federation_ack=federation_ack, + ) + upto_token = _position_from_rows(federation_rows, current_position) + writer.write_header_and_rows("federation", federation_rows, ( + "position", "type", "content", + ), position=upto_token) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -497,6 +527,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", + "federation", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index f19540d6bb..18076e0f3b 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore): else: self._cache_id_gen = None + self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache" + self.http_client = hs.get_simple_http_client() + def stream_positions(self): pos = {} if self._cache_id_gen: @@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore): logger.info("Got unexpected cache_func: %r", cache_func) self._cache_id_gen.advance(int(stream["position"])) return defer.succeed(None) + + def _invalidate_cache_and_stream(self, txn, cache_func, keys): + txn.call_after(cache_func.invalidate, keys) + txn.call_after(self._send_invalidation_poke, cache_func, keys) + + @defer.inlineCallbacks + def _send_invalidation_poke(self, cache_func, keys): + try: + yield self.http_client.post_json_get_json(self.expire_cache_url, { + "invalidate": [{ + "name": cache_func.__name__, + "keys": list(keys), + }] + }) + except: + logger.exception("Failed to poke on expire_cache") diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 3bfd5e8213..cc860f9f9b 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -29,10 +29,16 @@ class SlavedDeviceInboxStore(BaseSlavedStore): "DeviceInboxStreamChangeCache", self._device_inbox_id_gen.get_current_token() ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceFederationOutboxStreamChangeCache", + self._device_inbox_id_gen.get_current_token() + ) get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ + get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__ delete_messages_for_device = DataStore.delete_messages_for_device.__func__ + delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__ def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() @@ -45,9 +51,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore): self._device_inbox_id_gen.advance(int(stream["position"])) for row in stream["rows"]: stream_id = row[0] - user_id = row[1] - self._device_inbox_stream_cache.entity_has_changed( - user_id, stream_id - ) + entity = row[1] + + if entity.startswith("@"): + self._device_inbox_stream_cache.entity_has_changed( + entity, stream_id + ) + else: + self._device_federation_outbox_stream_cache.entity_has_changed( + entity, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 0c26e96e98..64f18bbb3e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -26,6 +26,11 @@ from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache import ujson as json +import logging + + +logger = logging.getLogger(__name__) + # So, um, we want to borrow a load of functions intended for reading from # a DataStore, but we don't want to take functions that either write to the @@ -180,6 +185,11 @@ class SlavedEventStore(BaseSlavedStore): EventFederationStore.__dict__["_get_forward_extremeties_for_room"] ) + get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ + + get_federation_out_pos = DataStore.get_federation_out_pos.__func__ + update_federation_out_pos = DataStore.update_federation_out_pos.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() @@ -194,6 +204,10 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("events") if stream: self._stream_id_gen.advance(int(stream["position"])) + + if stream["rows"]: + logger.info("Got %d event rows", len(stream["rows"])) + for row in stream["rows"]: self._process_replication_row( row, backfilled=False, state_resets=state_resets diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index 6f2ba98af5..fbb58f35da 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.storage.transactions import TransactionStore @@ -22,9 +21,10 @@ from synapse.storage.transactions import TransactionStore class TransactionStore(BaseSlavedStore): get_destination_retry_timings = TransactionStore.__dict__[ "get_destination_retry_timings" - ].orig + ] _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__ + _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__ - # For now, don't record the destination rety timings - def set_destination_retry_timings(*args, **kwargs): - return defer.succeed(None) + prep_send_transaction = DataStore.prep_send_transaction.__func__ + delivered_txn = DataStore.delivered_txn.__func__ diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py index 891cef99c6..1fbff2edd8 100644 --- a/synapse/rest/client/v2_alpha/receipts.py +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -36,7 +36,7 @@ class ReceiptRestServlet(RestServlet): super(ReceiptRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.receipts_handler = hs.get_handlers().receipts_handler + self.receipts_handler = hs.get_receipts_handler() self.presence_handler = hs.get_presence_handler() @defer.inlineCallbacks diff --git a/synapse/server.py b/synapse/server.py index 374124a147..0bfb411269 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -32,6 +32,9 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory from synapse.federation import initialize_http_replication +from synapse.federation.send_queue import FederationRemoteSendQueue +from synapse.federation.transport.client import TransportLayerClient +from synapse.federation.transaction_queue import TransactionQueue from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler @@ -44,6 +47,7 @@ from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler +from synapse.handlers.receipts import ReceiptsHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -124,6 +128,9 @@ class HomeServer(object): 'http_client_context_factory', 'simple_http_client', 'media_repository', + 'federation_transport_client', + 'federation_sender', + 'receipts_handler', ] def __init__(self, hostname, **kwargs): @@ -265,9 +272,30 @@ class HomeServer(object): def build_media_repository(self): return MediaRepository(self) + def build_federation_transport_client(self): + return TransportLayerClient(self) + + def build_federation_sender(self): + if self.should_send_federation(): + return TransactionQueue(self) + elif not self.config.worker_app: + return FederationRemoteSendQueue(self) + else: + raise Exception("Workers cannot send federation traffic") + + def build_receipts_handler(self): + return ReceiptsHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def should_send_federation(self): + "Should this server be sending federation traffic directly?" + return self.config.send_federation and ( + not self.config.worker_app + or self.config.worker_app == "synapse.app.federation_sender" + ) + def _make_dependency_method(depname): def _get(hs): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d828d6ee1d..b62c459d8b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -561,12 +561,17 @@ class SQLBaseStore(object): @staticmethod def _simple_select_onecol_txn(txn, table, keyvalues, retcol): + if keyvalues: + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" + "SELECT %(retcol)s FROM %(table)s %(where)s" ) % { "retcol": retcol, "table": table, - "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), + "where": where, } txn.execute(sql, keyvalues.values()) @@ -744,10 +749,15 @@ class SQLBaseStore(object): @staticmethod def _simple_update_one_txn(txn, table, keyvalues, updatevalues): - update_sql = "UPDATE %s SET %s WHERE %s" % ( + if keyvalues: + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + + update_sql = "UPDATE %s SET %s %s" % ( table, ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) + where, ) txn.execute( diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index f640e73714..87398d60bc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore): return defer.succeed([]) def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id FROM device_inbox" - " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY stream_id" - " ORDER BY stream_id ASC" - " LIMIT ?" - ) - txn.execute(sql, (last_pos, current_pos, limit)) - stream_ids = txn.fetchall() - if not stream_ids: - return [] - max_stream_id_in_limit = stream_ids[-1] - - sql = ( - "SELECT stream_id, user_id, device_id, message_json" + "SELECT stream_id, user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" ) - txn.execute(sql, (last_pos, max_stream_id_in_limit)) - return txn.fetchall() + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() + + sql = ( + "SELECT stream_id, destination" + " FROM device_federation_outbox" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + ) + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn.fetchall()) + + return rows return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6576a30098..e46ae6502e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 38 +SCHEMA_VERSION = 39 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 21d0696640..7460f98a1f 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState", status_msg (str): User set status message. """ + def as_dict(self): + return dict(self._asdict()) + + @staticmethod + def from_dict(d): + return UserPresenceState(**d) + def copy_and_replace(self, **kwargs): return self._replace(**kwargs) diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql new file mode 100644 index 0000000000..00be801e90 --- /dev/null +++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id); diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql new file mode 100644 index 0000000000..edbd8e132f --- /dev/null +++ b/synapse/storage/schema/delta/39/federation_out_position.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + CREATE TABLE federation_stream_position( + type TEXT NOT NULL, + stream_id INTEGER NOT NULL + ); + + INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1); + INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 888b1cb35d..7fa63b58a7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -765,3 +765,50 @@ class StreamStore(SQLBaseStore): "token": end_token, }, } + + @defer.inlineCallbacks + def get_all_new_events_stream(self, from_id, current_id, limit): + """Get all new events""" + + def get_all_new_events_stream_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e" + " WHERE" + " ? < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (from_id, current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_all_new_events_stream", get_all_new_events_stream_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) + + def get_federation_out_pos(self, typ): + return self._simple_select_one_onecol( + table="federation_stream_position", + retcol="stream_id", + keyvalues={"type": typ}, + desc="get_federation_out_pos" + ) + + def update_federation_out_pos(self, typ, stream_id): + return self._simple_update_one( + table="federation_stream_position", + keyvalues={"type": typ}, + updatevalues={"stream_id": stream_id}, + desc="update_federation_out_pos", + ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index adab520c78..809fdd311f 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -200,25 +200,48 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) + self.database_engine.lock_table(txn, "destinations") - self._simple_upsert_txn( + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + + # We need to be careful here as the data may have changed from under us + # due to a worker setting the timings. + + prev_row = self._simple_select_one_txn( txn, - "destinations", + table="destinations", keyvalues={ "destination": destination, }, - values={ - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - }, - insertion_values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } + retcols=("retry_last_ts", "retry_interval"), + allow_none=True, ) + if not prev_row: + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) + elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + self._simple_update_one_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + updatevalues={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + ) + def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py index 3fd5c3d9fd..d668e5a6b8 100644 --- a/synapse/util/jsonobject.py +++ b/synapse/util/jsonobject.py @@ -76,15 +76,26 @@ class JsonEncodedObject(object): d.update(self.unrecognized_keys) return d + def get_internal_dict(self): + d = { + k: _encode(v, internal=True) for (k, v) in self.__dict__.items() + if k in self.valid_keys + } + d.update(self.unrecognized_keys) + return d + def __str__(self): return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) -def _encode(obj): +def _encode(obj, internal=False): if type(obj) is list: - return [_encode(o) for o in obj] + return [_encode(o, internal=internal) for o in obj] if isinstance(obj, JsonEncodedObject): - return obj.get_dict() + if internal: + return obj.get_internal_dict() + else: + return obj.get_dict() return obj diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 49527f4d21..46ef5a8ec7 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -121,12 +121,6 @@ class RetryDestinationLimiter(object): pass def __exit__(self, exc_type, exc_val, exc_tb): - def err(failure): - logger.exception( - "Failed to store set_destination_retry_timings", - failure.value - ) - valid_err_code = False if exc_type is not None and issubclass(exc_type, CodeMessageException): valid_err_code = 0 <= exc_val.code < 500 @@ -151,6 +145,15 @@ class RetryDestinationLimiter(object): retry_last_ts = int(self.clock.time_msec()) - self.store.set_destination_retry_timings( - self.destination, retry_last_ts, self.retry_interval - ).addErrback(err) + @defer.inlineCallbacks + def store_retry_timings(): + try: + yield self.store.set_destination_retry_timings( + self.destination, retry_last_ts, self.retry_interval + ) + except: + logger.exception( + "Failed to store set_destination_retry_timings", + ) + + store_retry_timings() diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index f406934a62..93b9fad012 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -103,7 +103,7 @@ class ReplicationResourceCase(unittest.TestCase): room_id = yield self.create_room() event_id = yield self.send_text_message(room_id, "Hello, World") get = self.get(receipts="-1") - yield self.hs.get_handlers().receipts_handler.received_client_receipt( + yield self.hs.get_receipts_handler().received_client_receipt( room_id, "m.read", self.user_id, event_id ) code, body = yield get diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 02a67b733d..9ff1abcd80 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -39,7 +39,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): event_cache_size=1, password_providers=[], ) - hs = yield setup_test_homeserver(config=config) + hs = yield setup_test_homeserver(config=config, federation_sender=Mock()) self.as_token = "token1" self.as_url = "some_url" @@ -112,7 +112,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): event_cache_size=1, password_providers=[], ) - hs = yield setup_test_homeserver(config=config) + hs = yield setup_test_homeserver(config=config, federation_sender=Mock()) self.db_pool = hs.get_db_pool() self.as_list = [ @@ -443,7 +443,11 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): app_service_config_files=[f1, f2], event_cache_size=1, password_providers=[] ) - hs = yield setup_test_homeserver(config=config, datastore=Mock()) + hs = yield setup_test_homeserver( + config=config, + datastore=Mock(), + federation_sender=Mock() + ) ApplicationServiceStore(hs) @@ -456,7 +460,11 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): app_service_config_files=[f1, f2], event_cache_size=1, password_providers=[] ) - hs = yield setup_test_homeserver(config=config, datastore=Mock()) + hs = yield setup_test_homeserver( + config=config, + datastore=Mock(), + federation_sender=Mock() + ) with self.assertRaises(ConfigError) as cm: ApplicationServiceStore(hs) @@ -475,7 +483,11 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): app_service_config_files=[f1, f2], event_cache_size=1, password_providers=[] ) - hs = yield setup_test_homeserver(config=config, datastore=Mock()) + hs = yield setup_test_homeserver( + config=config, + datastore=Mock(), + federation_sender=Mock() + ) with self.assertRaises(ConfigError) as cm: ApplicationServiceStore(hs) diff --git a/tests/utils.py b/tests/utils.py index 5929f1c729..2d0bd205fd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -53,6 +53,8 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.trusted_third_party_id_servers = [] config.room_invite_state_types = [] config.password_providers = [] + config.worker_replication_url = "" + config.worker_app = None config.use_frozen_dicts = True config.database_config = {"name": "sqlite3"} @@ -70,6 +72,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): database_engine=create_engine(config.database_config), get_db_conn=db_pool.get_db_conn, room_list_handler=object(), + tls_server_context_factory=Mock(), **kargs ) hs.setup() @@ -79,6 +82,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): version_string="Synapse/tests", database_engine=create_engine(config.database_config), room_list_handler=object(), + tls_server_context_factory=Mock(), **kargs )