From bc67e7d260631d3fa7bc78653376e15dc0771364 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Jan 2018 16:43:03 +0000 Subject: [PATCH 01/30] Add decent impl of a FileConsumer Twisted core doesn't have a general purpose one, so we need to write one ourselves. Features: - All writing happens in background thread - Supports both push and pull producers - Push producers get paused if the consumer falls behind --- synapse/util/file_consumer.py | 158 +++++++++++++++++++++++++++++++ tests/util/test_file_consumer.py | 138 +++++++++++++++++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 synapse/util/file_consumer.py create mode 100644 tests/util/test_file_consumer.py diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py new file mode 100644 index 0000000000..de478fcb3e --- /dev/null +++ b/synapse/util/file_consumer.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vecotr Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, threads, reactor + +from synapse.util.logcontext import make_deferred_yieldable + +import Queue + + +class BackgroundFileConsumer(object): + """A consumer that writes to a file like object. Supports both push + and pull producers + + Args: + file_obj (file): The file like object to write to. Closed when + finished. + """ + + # For PushProducers pause if we have this many unwritten slices + _PAUSE_ON_QUEUE_SIZE = 5 + # And resume once the size of the queue is less than this + _RESUME_ON_QUEUE_SIZE = 2 + + def __init__(self, file_obj): + self.file_obj = file_obj + + # Producer we're registered with + self.producer = None + + # True if PushProducer, false if PullProducer + self.streaming = False + + # Queue of slices of bytes to be written. When producer calls + # unregister a final None is sent. + self.bytes_queue = Queue.Queue() + + # Deferred that is resolved when finished writing + self.finished_deferred = None + + # If the _writer thread throws an exception it gets stored here. + self._write_exception = None + + # A deferred that gets resolved when the bytes_queue gets empty. + # Mainly used for tests. + self._notify_empty_deferred = None + + def registerProducer(self, producer, streaming): + """Part of IProducer interface + + Args: + producer (IProducer) + streaming (bool): True if push based producer, False if pull + based. + """ + self.producer = producer + self.streaming = streaming + self.finished_deferred = threads.deferToThread(self._writer) + if not streaming: + self.producer.resumeProducing() + + self.paused_producer = False + + def unregisterProducer(self): + """Part of IProducer interface + """ + self.producer = None + if not self.finished_deferred.called: + self.bytes_queue.put_nowait(None) + + def write(self, bytes): + """Part of IProducer interface + """ + if self._write_exception: + raise self._write_exception + + if self.finished_deferred.called: + raise Exception("consumer has closed") + + self.bytes_queue.put_nowait(bytes) + + # If this is a pushed based consumer and the queue is getting behind + # then we pause the producer. + if self.streaming and self.bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE: + self.paused_producer = True + self.producer.pauseProducing() + + def _writer(self): + """This is run in a background thread to write to the file. + """ + try: + while self.producer or not self.bytes_queue.empty(): + # If we've paused the producer check if we should resume the + # producer. + if self.producer and self.paused_producer: + if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: + reactor.callFromThread(self._resume_paused_producer) + + if self._notify_empty and self.bytes_queue.empty(): + reactor.callFromThread(self._notify_empty) + + bytes = self.bytes_queue.get() + + # If we get a None (or empty list) then that's a signal used + # to indicate we should check if we should stop. + if bytes: + self.file_obj.write(bytes) + + # If its a pull producer then we need to explicitly ask for + # more stuff. + if not self.streaming and self.producer: + reactor.callFromThread(self.producer.resumeProducing) + except Exception as e: + self._write_exception = e + raise + finally: + self.file_obj.close() + + def wait(self): + """Returns a deferred that resolves when finished writing to file + """ + return make_deferred_yieldable(self.finished_deferred) + + def _resume_paused_producer(self): + """Gets called if we should resume producing after being paused + """ + if self.paused_producer and self.producer: + self.paused_producer = False + self.producer.resumeProducing() + + def _notify_empty(self): + """Called when the _writer thread thinks the queue may be empty and + we should notify anything waiting on `wait_for_writes` + """ + if self._notify_empty_deferred and self.bytes_queue.empty(): + d = self._notify_empty_deferred + self._notify_empty_deferred = None + d.callback(None) + + def wait_for_writes(self): + """Wait for the write queue to be empty and for writes to have + finished. This is mainly useful for tests. + """ + if not self._notify_empty_deferred: + self._notify_empty_deferred = defer.Deferred() + return self._notify_empty_deferred diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py new file mode 100644 index 0000000000..8acb68f0c3 --- /dev/null +++ b/tests/util/test_file_consumer.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer +from mock import NonCallableMock + +from synapse.util.file_consumer import BackgroundFileConsumer + +from tests import unittest +from StringIO import StringIO + +import threading + + +class FileConsumerTests(unittest.TestCase): + + @defer.inlineCallbacks + def test_pull_consumer(self): + string_file = StringIO() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = DummyPullProducer() + + yield producer.register_with_consumer(consumer) + + yield producer.write_and_wait("Foo") + + self.assertEqual(string_file.getvalue(), "Foo") + + yield producer.write_and_wait("Bar") + + self.assertEqual(string_file.getvalue(), "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_consumer(self): + string_file = StringIO() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = NonCallableMock(spec_set=[]) + + consumer.registerProducer(producer, True) + + consumer.write("Foo") + yield consumer.wait_for_writes() + + self.assertEqual(string_file.getvalue(), "Foo") + + consumer.write("Bar") + yield consumer.wait_for_writes() + + self.assertEqual(string_file.getvalue(), "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_producer_feedback(self): + string_file = BlockingStringWrite() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"]) + + consumer.registerProducer(producer, True) + + with string_file.write_lock: + for _ in range(consumer._PAUSE_ON_QUEUE_SIZE): + consumer.write("Foo") + + producer.pauseProducing.assert_called_once() + + yield consumer.wait_for_writes() + producer.resumeProducing.assert_called_once() + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + +class DummyPullProducer(object): + def __init__(self): + self.consumer = None + self.deferred = defer.Deferred() + + def resumeProducing(self): + d = self.deferred + self.deferred = defer.Deferred() + d.callback(None) + + def write_and_wait(self, bytes): + d = self.deferred + self.consumer.write(bytes) + return d + + def register_with_consumer(self, consumer): + d = self.deferred + self.consumer = consumer + self.consumer.registerProducer(self, False) + return d + + +class BlockingStringWrite(object): + def __init__(self): + self.buffer = "" + self.closed = False + self.write_lock = threading.Lock() + + def write(self, bytes): + self.buffer += bytes + + def close(self): + self.closed = True From a177325b49be4793c8ed21147f8d301a0649a2b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 11:02:43 +0000 Subject: [PATCH 02/30] Fix comments --- synapse/util/file_consumer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index de478fcb3e..5284c7967e 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2018 New Vecotr Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -58,7 +58,7 @@ class BackgroundFileConsumer(object): self._notify_empty_deferred = None def registerProducer(self, producer, streaming): - """Part of IProducer interface + """Part of IConsumer interface Args: producer (IProducer) @@ -91,7 +91,7 @@ class BackgroundFileConsumer(object): self.bytes_queue.put_nowait(bytes) - # If this is a pushed based consumer and the queue is getting behind + # If this is a PushProducer and the queue is getting behind # then we pause the producer. if self.streaming and self.bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE: self.paused_producer = True From 28b338ed9bafc2017a635848e14a2a25b78d0016 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 11:04:41 +0000 Subject: [PATCH 03/30] Move definition of paused_producer to __init__ --- synapse/util/file_consumer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 5284c7967e..54c9da9573 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -43,6 +43,10 @@ class BackgroundFileConsumer(object): # True if PushProducer, false if PullProducer self.streaming = False + # For PushProducers, indicates whether we've paused the producer and + # need to call resumeProducing before we get more data. + self.paused_producer = False + # Queue of slices of bytes to be written. When producer calls # unregister a final None is sent. self.bytes_queue = Queue.Queue() @@ -71,8 +75,6 @@ class BackgroundFileConsumer(object): if not streaming: self.producer.resumeProducing() - self.paused_producer = False - def unregisterProducer(self): """Part of IProducer interface """ From 17b54389feb3855a33406149a8a59f0327bb3ad1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 11:05:34 +0000 Subject: [PATCH 04/30] Fix _notify_empty typo --- synapse/util/file_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 54c9da9573..479e480614 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -110,7 +110,7 @@ class BackgroundFileConsumer(object): if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: reactor.callFromThread(self._resume_paused_producer) - if self._notify_empty and self.bytes_queue.empty(): + if self._notify_empty_deferred and self.bytes_queue.empty(): reactor.callFromThread(self._notify_empty) bytes = self.bytes_queue.get() From dc519602ac0f35d39a70c91f0e6057e865a61dfc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 11:07:17 +0000 Subject: [PATCH 05/30] Ensure we registerProducer isn't called twice --- synapse/util/file_consumer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 479e480614..d7bbb0aeb8 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -69,6 +69,9 @@ class BackgroundFileConsumer(object): streaming (bool): True if push based producer, False if pull based. """ + if self.producer: + raise Exception("registerProducer called twice") + self.producer = producer self.streaming = streaming self.finished_deferred = threads.deferToThread(self._writer) From ce236f8ac890842e105fee0df96c79f3d8ab8783 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 18 Jan 2018 11:30:49 +0000 Subject: [PATCH 06/30] better exception logging in callbackmetrics when we fail to render a metric, give a clue as to which metric it was --- synapse/metrics/metric.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index f480aae614..1e783e5ff4 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -15,6 +15,9 @@ from itertools import chain +import logging + +logger = logging.getLogger(__name__) def flatten(items): @@ -153,7 +156,11 @@ class CallbackMetric(BaseMetric): self.callback = callback def render(self): - value = self.callback() + try: + value = self.callback() + except Exception: + logger.exception("Failed to render %s", self.name) + return ["# FAILED to render " + self.name] if self.is_scalar(): return list(self._render_for_labels([], value)) From 2f18a2647b6b9cc07c3cc5f2bec3e1bab67d0eea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 11:10:12 +0000 Subject: [PATCH 07/30] Make all fields private --- synapse/util/file_consumer.py | 62 +++++++++++++++++------------------ 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index d7bbb0aeb8..d19d48665c 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -35,24 +35,24 @@ class BackgroundFileConsumer(object): _RESUME_ON_QUEUE_SIZE = 2 def __init__(self, file_obj): - self.file_obj = file_obj + self._file_obj = file_obj # Producer we're registered with - self.producer = None + self._producer = None # True if PushProducer, false if PullProducer self.streaming = False # For PushProducers, indicates whether we've paused the producer and # need to call resumeProducing before we get more data. - self.paused_producer = False + self._paused_producer = False # Queue of slices of bytes to be written. When producer calls # unregister a final None is sent. - self.bytes_queue = Queue.Queue() + self._bytes_queue = Queue.Queue() # Deferred that is resolved when finished writing - self.finished_deferred = None + self._finished_deferred = None # If the _writer thread throws an exception it gets stored here. self._write_exception = None @@ -69,21 +69,21 @@ class BackgroundFileConsumer(object): streaming (bool): True if push based producer, False if pull based. """ - if self.producer: + if self._producer: raise Exception("registerProducer called twice") - self.producer = producer + self._producer = producer self.streaming = streaming - self.finished_deferred = threads.deferToThread(self._writer) + self._finished_deferred = threads.deferToThread(self._writer) if not streaming: - self.producer.resumeProducing() + self._producer.resumeProducing() def unregisterProducer(self): """Part of IProducer interface """ - self.producer = None - if not self.finished_deferred.called: - self.bytes_queue.put_nowait(None) + self._producer = None + if not self._finished_deferred.called: + self._bytes_queue.put_nowait(None) def write(self, bytes): """Part of IProducer interface @@ -91,65 +91,65 @@ class BackgroundFileConsumer(object): if self._write_exception: raise self._write_exception - if self.finished_deferred.called: + if self._finished_deferred.called: raise Exception("consumer has closed") - self.bytes_queue.put_nowait(bytes) + self._bytes_queue.put_nowait(bytes) # If this is a PushProducer and the queue is getting behind # then we pause the producer. - if self.streaming and self.bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE: - self.paused_producer = True - self.producer.pauseProducing() + if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE: + self._paused_producer = True + self._producer.pauseProducing() def _writer(self): """This is run in a background thread to write to the file. """ try: - while self.producer or not self.bytes_queue.empty(): + while self._producer or not self._bytes_queue.empty(): # If we've paused the producer check if we should resume the # producer. - if self.producer and self.paused_producer: - if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: + if self._producer and self._paused_producer: + if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: reactor.callFromThread(self._resume_paused_producer) - if self._notify_empty_deferred and self.bytes_queue.empty(): + if self._notify_empty_deferred and self._bytes_queue.empty(): reactor.callFromThread(self._notify_empty) - bytes = self.bytes_queue.get() + bytes = self._bytes_queue.get() # If we get a None (or empty list) then that's a signal used # to indicate we should check if we should stop. if bytes: - self.file_obj.write(bytes) + self._file_obj.write(bytes) # If its a pull producer then we need to explicitly ask for # more stuff. - if not self.streaming and self.producer: - reactor.callFromThread(self.producer.resumeProducing) + if not self.streaming and self._producer: + reactor.callFromThread(self._producer.resumeProducing) except Exception as e: self._write_exception = e raise finally: - self.file_obj.close() + self._file_obj.close() def wait(self): """Returns a deferred that resolves when finished writing to file """ - return make_deferred_yieldable(self.finished_deferred) + return make_deferred_yieldable(self._finished_deferred) def _resume_paused_producer(self): """Gets called if we should resume producing after being paused """ - if self.paused_producer and self.producer: - self.paused_producer = False - self.producer.resumeProducing() + if self._paused_producer and self._producer: + self._paused_producer = False + self._producer.resumeProducing() def _notify_empty(self): """Called when the _writer thread thinks the queue may be empty and we should notify anything waiting on `wait_for_writes` """ - if self._notify_empty_deferred and self.bytes_queue.empty(): + if self._notify_empty_deferred and self._bytes_queue.empty(): d = self._notify_empty_deferred self._notify_empty_deferred = None d.callback(None) From 1432f7ccd5a01e43d0c5417f3d2f4a6a0fbf5bfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 11:53:21 +0000 Subject: [PATCH 08/30] Move test stuff to tests --- synapse/util/file_consumer.py | 26 +-------------- tests/util/test_file_consumer.py | 54 +++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index d19d48665c..3241035247 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, threads, reactor +from twisted.internet import threads, reactor from synapse.util.logcontext import make_deferred_yieldable @@ -57,10 +57,6 @@ class BackgroundFileConsumer(object): # If the _writer thread throws an exception it gets stored here. self._write_exception = None - # A deferred that gets resolved when the bytes_queue gets empty. - # Mainly used for tests. - self._notify_empty_deferred = None - def registerProducer(self, producer, streaming): """Part of IConsumer interface @@ -113,9 +109,6 @@ class BackgroundFileConsumer(object): if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: reactor.callFromThread(self._resume_paused_producer) - if self._notify_empty_deferred and self._bytes_queue.empty(): - reactor.callFromThread(self._notify_empty) - bytes = self._bytes_queue.get() # If we get a None (or empty list) then that's a signal used @@ -144,20 +137,3 @@ class BackgroundFileConsumer(object): if self._paused_producer and self._producer: self._paused_producer = False self._producer.resumeProducing() - - def _notify_empty(self): - """Called when the _writer thread thinks the queue may be empty and - we should notify anything waiting on `wait_for_writes` - """ - if self._notify_empty_deferred and self._bytes_queue.empty(): - d = self._notify_empty_deferred - self._notify_empty_deferred = None - d.callback(None) - - def wait_for_writes(self): - """Wait for the write queue to be empty and for writes to have - finished. This is mainly useful for tests. - """ - if not self._notify_empty_deferred: - self._notify_empty_deferred = defer.Deferred() - return self._notify_empty_deferred diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py index 8acb68f0c3..76e2234255 100644 --- a/tests/util/test_file_consumer.py +++ b/tests/util/test_file_consumer.py @@ -14,7 +14,7 @@ # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor from mock import NonCallableMock from synapse.util.file_consumer import BackgroundFileConsumer @@ -53,7 +53,7 @@ class FileConsumerTests(unittest.TestCase): @defer.inlineCallbacks def test_push_consumer(self): - string_file = StringIO() + string_file = BlockingStringWrite() consumer = BackgroundFileConsumer(string_file) try: @@ -62,14 +62,14 @@ class FileConsumerTests(unittest.TestCase): consumer.registerProducer(producer, True) consumer.write("Foo") - yield consumer.wait_for_writes() + yield string_file.wait_for_n_writes(1) - self.assertEqual(string_file.getvalue(), "Foo") + self.assertEqual(string_file.buffer, "Foo") consumer.write("Bar") - yield consumer.wait_for_writes() + yield string_file.wait_for_n_writes(2) - self.assertEqual(string_file.getvalue(), "FooBar") + self.assertEqual(string_file.buffer, "FooBar") finally: consumer.unregisterProducer() @@ -85,15 +85,22 @@ class FileConsumerTests(unittest.TestCase): try: producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"]) + resume_deferred = defer.Deferred() + producer.resumeProducing.side_effect = lambda: resume_deferred.callback(None) + consumer.registerProducer(producer, True) + number_writes = 0 with string_file.write_lock: for _ in range(consumer._PAUSE_ON_QUEUE_SIZE): consumer.write("Foo") + number_writes += 1 producer.pauseProducing.assert_called_once() - yield consumer.wait_for_writes() + yield string_file.wait_for_n_writes(number_writes) + + yield resume_deferred producer.resumeProducing.assert_called_once() finally: consumer.unregisterProducer() @@ -131,8 +138,39 @@ class BlockingStringWrite(object): self.closed = False self.write_lock = threading.Lock() + self._notify_write_deferred = None + self._number_of_writes = 0 + def write(self, bytes): - self.buffer += bytes + with self.write_lock: + self.buffer += bytes + self._number_of_writes += 1 + + reactor.callFromThread(self._notify_write) def close(self): self.closed = True + + def _notify_write(self): + "Called by write to indicate a write happened" + with self.write_lock: + if not self._notify_write_deferred: + return + d = self._notify_write_deferred + self._notify_write_deferred = None + d.callback(None) + + @defer.inlineCallbacks + def wait_for_n_writes(self, n): + "Wait for n writes to have happened" + while True: + with self.write_lock: + if n <= self._number_of_writes: + return + + if not self._notify_write_deferred: + self._notify_write_deferred = defer.Deferred() + + d = self._notify_write_deferred + + yield d From be0dfcd4a29859f4c707c2b3cf1da38c5115d251 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 11:57:23 +0000 Subject: [PATCH 09/30] Do logcontexts correctly --- synapse/util/file_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3241035247..90a2608d6f 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -15,7 +15,7 @@ from twisted.internet import threads, reactor -from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn import Queue @@ -70,7 +70,7 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming - self._finished_deferred = threads.deferToThread(self._writer) + self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) if not streaming: self._producer.resumeProducing() From d57765fc8a1b54dae001bbb97b2b529991292fbc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 18 Jan 2018 12:23:04 +0000 Subject: [PATCH 10/30] Fix bugs in block metrics ... which I introduced in #2785 --- synapse/util/metrics.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 059bb7fedf..e4b5687a4b 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) # total number of times we have hit this block -response_count = metrics.register_counter( +block_counter = metrics.register_counter( "block_count", labels=["block_name"], alternative_names=( @@ -76,7 +76,7 @@ block_db_txn_count = metrics.register_counter( block_db_txn_duration = metrics.register_counter( "block_db_txn_duration_seconds", labels=["block_name"], alternative_names=( - metrics.name_prefix + "_block_db_txn_count:total", + metrics.name_prefix + "_block_db_txn_duration:total", ), ) @@ -131,6 +131,8 @@ class Measure(object): return duration = self.clock.time_msec() - self.start + + block_counter.inc(self.name) block_timer.inc_by(duration, self.name) context = LoggingContext.current_context() From 0af5dc63a8d180a2b610c14ce415fdf9be96e2ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Jan 2018 15:44:08 +0000 Subject: [PATCH 11/30] Make storage providers more configurable --- synapse/config/repository.py | 83 ++++++++++++++++++++--- synapse/rest/media/v1/media_repository.py | 18 ++--- synapse/rest/media/v1/storage_provider.py | 28 +++++--- 3 files changed, 98 insertions(+), 31 deletions(-) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 6baa474931..81db0193fb 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -16,6 +16,8 @@ from ._base import Config, ConfigError from collections import namedtuple +from synapse.util.module_loader import load_module + MISSING_NETADDR = ( "Missing netaddr library. This is required for URL preview API." @@ -36,6 +38,10 @@ ThumbnailRequirement = namedtuple( "ThumbnailRequirement", ["width", "height", "method", "media_type"] ) +MediaStorageProviderConfig = namedtuple( + "MediaStorageProviderConfig", ("store_local", "store_remote", "store_synchronous",) +) + def parse_thumbnail_requirements(thumbnail_sizes): """ Takes a list of dictionaries with "width", "height", and "method" keys @@ -73,16 +79,65 @@ class ContentRepositoryConfig(Config): self.media_store_path = self.ensure_directory(config["media_store_path"]) - self.backup_media_store_path = config.get("backup_media_store_path") - if self.backup_media_store_path: - self.backup_media_store_path = self.ensure_directory( + backup_media_store_path = config.get("backup_media_store_path") + if backup_media_store_path: + backup_media_store_path = self.ensure_directory( self.backup_media_store_path ) - self.synchronous_backup_media_store = config.get( + synchronous_backup_media_store = config.get( "synchronous_backup_media_store", False ) + storage_providers = config.get("media_storage_providers", []) + + if backup_media_store_path: + if storage_providers: + raise ConfigError( + "Cannot use both 'backup_media_store_path' and 'storage_providers'" + ) + + storage_providers = [{ + "module": "file_system", + "store_local": True, + "store_synchronous": synchronous_backup_media_store, + "store_remote": True, + "config": { + "directory": backup_media_store_path, + } + }] + + # This is a list of config that can be used to create the storage + # providers. The entries are tuples of (Class, class_config, + # MediaStorageProviderConfig), where Class is the class of the provider, + # the class_config the config to pass to it, and + # MediaStorageProviderConfig are options for StorageProviderWrapper. + # + # We don't create the storage providers here as not all workers need + # them to be started. + self.media_storage_providers = [] + + for provider_config in storage_providers: + # We special case the module "file_system" so as not to need to + # expose FileStorageProviderBackend + if provider_config["module"] == "file_system": + provider_config["module"] = ( + "synapse.rest.media.v1.storage_provider" + ".FileStorageProviderBackend" + ) + + provider_class, parsed_config = load_module(provider_config) + + wrapper_config = MediaStorageProviderConfig( + provider_config.get("store_local", False), + provider_config.get("store_remote", False), + provider_config.get("store_synchronous", False), + ) + + self.media_storage_providers.append( + (provider_class, provider_config, wrapper_config,) + ) + self.uploads_path = self.ensure_directory(config["uploads_path"]) self.dynamic_thumbnails = config["dynamic_thumbnails"] self.thumbnail_requirements = parse_thumbnail_requirements( @@ -127,13 +182,19 @@ class ContentRepositoryConfig(Config): # Directory where uploaded images and attachments are stored. media_store_path: "%(media_store)s" - # A secondary directory where uploaded images and attachments are - # stored as a backup. - # backup_media_store_path: "%(media_store)s" - - # Whether to wait for successful write to backup media store before - # returning successfully. - # synchronous_backup_media_store: false + # Media storage providers allow media to be stored in different + # locations. + # media_storage_providers: + # - module: file_system + # # Whether to write new local files. + # store_local: false + # # Whether to write new remote media + # store_remote: false + # # Whether to block upload requests waiting for write to this + # # provider to complete + # store_synchronous: false + # config: + # directory: /mnt/some/other/directory # Directory where in-progress uploads are stored. uploads_path: "%(uploads_path)s" diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 97c82c150e..4163c9e416 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -27,9 +27,7 @@ from .identicon_resource import IdenticonResource from .preview_url_resource import PreviewUrlResource from .filepath import MediaFilePaths from .thumbnailer import Thumbnailer -from .storage_provider import ( - StorageProviderWrapper, FileStorageProviderBackend, -) +from .storage_provider import StorageProviderWrapper from .media_storage import MediaStorage from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -80,17 +78,13 @@ class MediaRepository(object): # potentially upload to. storage_providers = [] - # TODO: Move this into config and allow other storage providers to be - # defined. - if hs.config.backup_media_store_path: - backend = FileStorageProviderBackend( - self.primary_base_path, hs.config.backup_media_store_path, - ) + for clz, provider_config, wrapper_config in hs.config.media_storage_providers: + backend = clz(hs, provider_config) provider = StorageProviderWrapper( backend, - store=True, - store_synchronous=hs.config.synchronous_backup_media_store, - store_remote=True, + store_local=wrapper_config.store_local, + store_remote=wrapper_config.store_remote, + store_synchronous=wrapper_config.store_synchronous, ) storage_providers.append(provider) diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 2ad602e101..0074d2d426 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -17,6 +17,7 @@ from twisted.internet import defer, threads from .media_storage import FileResponder +from synapse.config._base import Config from synapse.util.logcontext import preserve_fn import logging @@ -64,14 +65,14 @@ class StorageProviderWrapper(StorageProvider): Args: backend (StorageProvider) - store (bool): Whether to store new files or not. + store_local (bool): Whether to store new local files or not. store_synchronous (bool): Whether to wait for file to be successfully uploaded, or todo the upload in the backgroud. store_remote (bool): Whether remote media should be uploaded """ - def __init__(self, backend, store, store_synchronous, store_remote): + def __init__(self, backend, store_local, store_synchronous, store_remote): self.backend = backend - self.store = store + self.store_local = store_local self.store_synchronous = store_synchronous self.store_remote = store_remote @@ -97,13 +98,13 @@ class FileStorageProviderBackend(StorageProvider): """A storage provider that stores files in a directory on a filesystem. Args: - cache_directory (str): Base path of the local media repository - base_directory (str): Base path to store new files + hs (HomeServer) + config: The config returned by `parse_config`, i """ - def __init__(self, cache_directory, base_directory): - self.cache_directory = cache_directory - self.base_directory = base_directory + def __init__(self, hs, config): + self.cache_directory = hs.config.media_store_path + self.base_directory = config def store_file(self, path, file_info): """See StorageProvider.store_file""" @@ -125,3 +126,14 @@ class FileStorageProviderBackend(StorageProvider): backup_fname = os.path.join(self.base_directory, path) if os.path.isfile(backup_fname): return FileResponder(open(backup_fname, "rb")) + + def parse_config(config): + """Called on startup to parse config supplied. This should parse + the config and raise if there is a problem. + + The returned value is passed into the constructor. + + In this case we only care about a single param, the directory, so lets + just pull that out. + """ + return Config.ensure_directory(config["directory"]) From 9a89dae8c53140c74f968538f72a4045b6aca90d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 15:06:24 +0000 Subject: [PATCH 12/30] Fix typo in thumbnail resource causing access times to be incorrect --- synapse/rest/media/v1/thumbnail_resource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index c09f2dec41..12e84a2b7c 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -67,7 +67,7 @@ class ThumbnailResource(Resource): yield self._respond_local_thumbnail( request, media_id, width, height, method, m_type ) - self.media_repo.mark_recently_accessed(server_name, media_id) + self.media_repo.mark_recently_accessed(None, media_id) else: if self.dynamic_thumbnails: yield self._select_or_generate_remote_thumbnail( @@ -79,7 +79,7 @@ class ThumbnailResource(Resource): request, server_name, media_id, width, height, method, m_type ) - self.media_repo.mark_recently_accessed(None, media_id) + self.media_repo.mark_recently_accessed(server_name, media_id) @defer.inlineCallbacks def _respond_local_thumbnail(self, request, media_id, width, height, From aae77da73ffc89c31d0b17fa8ce5d8b58605de63 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 17:11:20 +0000 Subject: [PATCH 13/30] Fixup comments --- synapse/config/repository.py | 6 +++++- synapse/rest/media/v1/storage_provider.py | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 81db0193fb..8bbc16ba40 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -39,7 +39,11 @@ ThumbnailRequirement = namedtuple( ) MediaStorageProviderConfig = namedtuple( - "MediaStorageProviderConfig", ("store_local", "store_remote", "store_synchronous",) + "MediaStorageProviderConfig", ( + "store_local", # Whether to store newly uploaded local files + "store_remote", # Whether to store newly downloaded remote files + "store_synchronous", # Whether to wait for successful storage for local uploads + ), ) diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 0074d2d426..9bf88f01f9 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -99,7 +99,7 @@ class FileStorageProviderBackend(StorageProvider): Args: hs (HomeServer) - config: The config returned by `parse_config`, i + config: The config returned by `parse_config`. """ def __init__(self, hs, config): @@ -133,7 +133,7 @@ class FileStorageProviderBackend(StorageProvider): The returned value is passed into the constructor. - In this case we only care about a single param, the directory, so lets + In this case we only care about a single param, the directory, so let's just pull that out. """ return Config.ensure_directory(config["directory"]) From 3fe2bae857cda58055c32329e15d3fe9828cf8f8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 17:11:45 +0000 Subject: [PATCH 14/30] Missing staticmethod --- synapse/rest/media/v1/storage_provider.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 9bf88f01f9..0a84aba861 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -127,6 +127,7 @@ class FileStorageProviderBackend(StorageProvider): if os.path.isfile(backup_fname): return FileResponder(open(backup_fname, "rb")) + @staticmethod def parse_config(config): """Called on startup to parse config supplied. This should parse the config and raise if there is a problem. From 8e85220373ee7a0396f36ffd3fddab8b1d6a7a12 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 17:12:35 +0000 Subject: [PATCH 15/30] Remove duplicate directory test --- synapse/config/repository.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 8bbc16ba40..364e823cd3 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -84,10 +84,6 @@ class ContentRepositoryConfig(Config): self.media_store_path = self.ensure_directory(config["media_store_path"]) backup_media_store_path = config.get("backup_media_store_path") - if backup_media_store_path: - backup_media_store_path = self.ensure_directory( - self.backup_media_store_path - ) synchronous_backup_media_store = config.get( "synchronous_backup_media_store", False From d69768348fc053dd6e243479acceabbdd6167238 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 17:14:05 +0000 Subject: [PATCH 16/30] Fix passing wrong config to provider constructor --- synapse/config/repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 364e823cd3..25ea77738a 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -135,7 +135,7 @@ class ContentRepositoryConfig(Config): ) self.media_storage_providers.append( - (provider_class, provider_config, wrapper_config,) + (provider_class, parsed_config, wrapper_config,) ) self.uploads_path = self.ensure_directory(config["uploads_path"]) From cd871a305708bca1acb5289f93cb62c15ba438da Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jan 2018 18:37:59 +0000 Subject: [PATCH 17/30] Fix storage provider bug introduced when renamed to store_local --- synapse/rest/media/v1/storage_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 0a84aba861..c188192f2b 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -77,7 +77,7 @@ class StorageProviderWrapper(StorageProvider): self.store_remote = store_remote def store_file(self, path, file_info): - if not self.store: + if not file_info.server_name and not self.store_local: return defer.succeed(None) if file_info.server_name and not self.store_remote: From 28a6ccb49c57cc686761b9e674b501b3b402e616 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 00:19:58 +0000 Subject: [PATCH 18/30] add registrations_require_3pid lets homeservers specify a whitelist for 3PIDs that users are allowed to associate with. Typically useful for stopping people from registering with non-work emails --- synapse/api/errors.py | 1 + synapse/config/registration.py | 13 +++++ synapse/rest/client/v2_alpha/_base.py | 22 +++++++ synapse/rest/client/v2_alpha/account.py | 14 ++++- synapse/rest/client/v2_alpha/register.py | 73 ++++++++++++++++++++---- 5 files changed, 110 insertions(+), 13 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 79b35b3e7c..46b0d7b34c 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -46,6 +46,7 @@ class Codes(object): THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED" THREEPID_IN_USE = "M_THREEPID_IN_USE" THREEPID_NOT_FOUND = "M_THREEPID_NOT_FOUND" + THREEPID_DENIED = "M_THREEPID_DENIED" INVALID_USERNAME = "M_INVALID_USERNAME" SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" diff --git a/synapse/config/registration.py b/synapse/config/registration.py index ef917fc9f2..e5e4f77872 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -31,6 +31,7 @@ class RegistrationConfig(Config): strtobool(str(config["disable_registration"])) ) + self.registrations_require_3pid = config.get("registrations_require_3pid", []) self.registration_shared_secret = config.get("registration_shared_secret") self.bcrypt_rounds = config.get("bcrypt_rounds", 12) @@ -52,6 +53,18 @@ class RegistrationConfig(Config): # Enable registration for new users. enable_registration: False + # Mandate that registrations require a 3PID which matches one or more + # of these 3PIDs. N.B. regexp escape backslashes are doubled (once for + # YAML and once for the regexp itself) + # + # registrations_require_3pid: + # - medium: email + # pattern: ".*@matrix\\.org" + # - medium: email + # pattern: ".*@vector\\.im" + # - medium: msisdn + # pattern: "\\+44" + # If set, allows registration by anyone who also has the shared # secret, even if registration is otherwise disabled. registration_shared_secret: "%(registration_shared_secret)s" diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 77434937ff..7c46ef7cab 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -60,6 +60,28 @@ def set_timeline_upper_limit(filter_json, filter_timeline_limit): filter_timeline_limit) +def check_3pid_allowed(hs, medium, address): + # check whether the HS has whitelisted the given 3PID + + allow = False + if hs.config.registrations_require_3pid: + for constraint in hs.config.registrations_require_3pid: + logger.debug("Checking 3PID %s (%s) against %s (%s)" % ( + address, medium, constraint['pattern'], constraint['medium'] + ) + ) + if ( + medium == constraint['medium'] and + re.match(constraint['pattern'], address) + ): + allow = True + break + else: + allow = True + + return allow + + def interactive_auth_handler(orig): """Wraps an on_POST method to handle InteractiveAuthIncompleteErrors diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 385a3ad2ec..66221e8f00 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -26,7 +26,7 @@ from synapse.http.servlet import ( ) from synapse.util.async import run_on_reactor from synapse.util.msisdn import phone_number_to_msisdn -from ._base import client_v2_patterns, interactive_auth_handler +from ._base import client_v2_patterns, interactive_auth_handler, check_3pid_allowed logger = logging.getLogger(__name__) @@ -47,6 +47,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): 'id_server', 'client_secret', 'email', 'send_attempt' ]) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] ) @@ -78,6 +81,9 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn ) @@ -217,6 +223,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): if absent: raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + existingUid = yield self.datastore.get_user_id_by_threepid( 'email', body['email'] ) @@ -255,6 +264,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index e9d88a8895..762782c1f0 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -27,9 +27,10 @@ from synapse.http.servlet import ( ) from synapse.util.msisdn import phone_number_to_msisdn -from ._base import client_v2_patterns, interactive_auth_handler +from ._base import client_v2_patterns, interactive_auth_handler, check_3pid_allowed import logging +import re import hmac from hashlib import sha1 from synapse.util.async import run_on_reactor @@ -70,6 +71,9 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): 'id_server', 'client_secret', 'email', 'send_attempt' ]) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] ) @@ -105,6 +109,9 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'msisdn', msisdn ) @@ -305,31 +312,73 @@ class RegisterRestServlet(RestServlet): if 'x_show_msisdn' in body and body['x_show_msisdn']: show_msisdn = True + require_email = False + require_msisdn = False + for constraint in self.hs.config.registrations_require_3pid: + if constraint['medium'] == 'email': + require_email = True + elif constraint['medium'] == 'msisdn': + require_msisdn = True + else: + logger.warn( + "Unrecognised 3PID medium %s in registrations_require_3pid" % + constraint['medium'] + ) + + flows = [] if self.hs.config.enable_registration_captcha: - flows = [ - [LoginType.RECAPTCHA], - [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], - ] + if not require_email and not require_msisdn: + flows.extend([[LoginType.RECAPTCHA]]) + if require_email or not require_msisdn: + flows.extend([[LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]]) + if show_msisdn: + if not require_email or require_msisdn: + flows.extend([[LoginType.MSISDN, LoginType.RECAPTCHA]]) flows.extend([ - [LoginType.MSISDN, LoginType.RECAPTCHA], [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], ]) else: - flows = [ - [LoginType.DUMMY], - [LoginType.EMAIL_IDENTITY], - ] + if not require_email and not require_msisdn: + flows.extend([[LoginType.DUMMY]]) + if require_email or not require_msisdn: + flows.extend([[LoginType.EMAIL_IDENTITY]]) + if show_msisdn: + if not require_email or require_msisdn: + flows.extend([[LoginType.MSISDN]]) flows.extend([ - [LoginType.MSISDN], - [LoginType.MSISDN, LoginType.EMAIL_IDENTITY], + [LoginType.MSISDN, LoginType.EMAIL_IDENTITY] ]) auth_result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) ) + # doublecheck that we're not trying to register an denied 3pid. + # the user-facing checks should already have happened when we requested + # a 3PID token to validate them in /register/email/requestToken etc + + for constraint in self.hs.config.registrations_require_3pid: + if ( + constraint['medium'] == 'email' and + auth_result and LoginType.EMAIL_IDENTITY in auth_result and + re.match( + constraint['pattern'], + auth_result[LoginType.EMAIL_IDENTITY].threepid.address + ) + ): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + elif ( + constraint['medium'] == 'msisdn' and + auth_result and LoginType.MSISDN in auth_result and + re.match( + constraint['pattern'], + auth_result[LoginType.MSISDN].threepid.address + ) + ): + raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + if registered_user_id is not None: logger.info( "Already registered user ID %r for this session", From 81d037dbd8c6616b33339f198f70134f73bbff5f Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 00:28:08 +0000 Subject: [PATCH 19/30] mock registrations_require_3pid --- tests/rest/client/v2_alpha/test_register.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 096f771bea..8aba456510 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -49,6 +49,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.hs.get_auth_handler = Mock(return_value=self.auth_handler) self.hs.get_device_handler = Mock(return_value=self.device_handler) self.hs.config.enable_registration = True + self.hs.config.registrations_require_3pid = [] self.hs.config.auto_join_rooms = [] # init the thing we're testing From 0af58f14ee351e7d52d7139df9218ff692764f20 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 00:33:51 +0000 Subject: [PATCH 20/30] fix pep8 --- synapse/rest/client/v2_alpha/_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 7c46ef7cab..b286ff0d95 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -68,8 +68,7 @@ def check_3pid_allowed(hs, medium, address): for constraint in hs.config.registrations_require_3pid: logger.debug("Checking 3PID %s (%s) against %s (%s)" % ( address, medium, constraint['pattern'], constraint['medium'] - ) - ) + )) if ( medium == constraint['medium'] and re.match(constraint['pattern'], address) From 9d332e0f797e4f302a08b3708df4ac8b42b08216 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 00:53:58 +0000 Subject: [PATCH 21/30] fix up v1, and improve errors --- synapse/handlers/register.py | 13 +++++++- synapse/rest/client/v1/register.py | 40 +++++++++++++++++------- synapse/rest/client/v2_alpha/account.py | 16 +++++++--- synapse/rest/client/v2_alpha/register.py | 16 +++++++--- 4 files changed, 65 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 5b808beac1..157ebaf251 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -15,6 +15,7 @@ """Contains functions for registering clients.""" import logging +import re from twisted.internet import defer @@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler): """ for c in threepidCreds: - logger.info("validating theeepidcred sid %s on id server %s", + logger.info("validating threepidcred sid %s on id server %s", c['sid'], c['idServer']) try: identity_handler = self.hs.get_handlers().identity_handler @@ -307,6 +308,16 @@ class RegistrationHandler(BaseHandler): logger.info("got threepid with medium '%s' and address '%s'", threepid['medium'], threepid['address']) + for constraint in self.hs.config.registrations_require_3pid: + if ( + constraint['medium'] == 'email' and + threepid['medium'] == 'email' and + re.match(constraint['pattern'], threepid['address']) + ): + raise RegistrationError( + 403, "Third party identifier is not allowed" + ) + @defer.inlineCallbacks def bind_emails(self, user_id, threepidCreds): """Links emails with a user ID and informs an identity server. diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 32ed1d3ab2..f793542ad6 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -70,10 +70,24 @@ class RegisterRestServlet(ClientV1RestServlet): self.handlers = hs.get_handlers() def on_GET(self, request): + + require_email = False + require_msisdn = False + for constraint in self.hs.config.registrations_require_3pid: + if constraint['medium'] == 'email': + require_email = True + elif constraint['medium'] == 'msisdn': + require_msisdn = True + else: + logger.warn( + "Unrecognised 3PID medium %s in registrations_require_3pid" % + constraint['medium'] + ) + + flows = [] if self.hs.config.enable_registration_captcha: - return ( - 200, - {"flows": [ + if require_email or not require_msisdn: + flows.extend([ { "type": LoginType.RECAPTCHA, "stages": [ @@ -82,27 +96,31 @@ class RegisterRestServlet(ClientV1RestServlet): LoginType.PASSWORD ] }, + ]) + if not require_email and not require_msisdn: + flows.extend([ { "type": LoginType.RECAPTCHA, "stages": [LoginType.RECAPTCHA, LoginType.PASSWORD] } - ]} - ) + ]) else: - return ( - 200, - {"flows": [ + if require_email or not require_msisdn: + flows.extend([ { "type": LoginType.EMAIL_IDENTITY, "stages": [ LoginType.EMAIL_IDENTITY, LoginType.PASSWORD ] - }, + } + ]) + if not require_email and not require_msisdn: + flows.extend([ { "type": LoginType.PASSWORD } - ]} - ) + ]) + return (200, {"flows": flows}) @defer.inlineCallbacks def on_POST(self, request): diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 66221e8f00..2977ad439f 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -48,7 +48,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): ]) if not check_3pid_allowed(self.hs, "email", body['email']): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] @@ -82,7 +84,9 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) if not check_3pid_allowed(self.hs, "msisdn", msisdn): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn @@ -224,7 +228,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) if not check_3pid_allowed(self.hs, "email", body['email']): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) existingUid = yield self.datastore.get_user_id_by_threepid( 'email', body['email'] @@ -265,7 +271,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) if not check_3pid_allowed(self.hs, "msisdn", msisdn): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 762782c1f0..898d8b133a 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -72,7 +72,9 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): ]) if not check_3pid_allowed(self.hs, "email", body['email']): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] @@ -110,7 +112,9 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) if not check_3pid_allowed(self.hs, "msisdn", msisdn): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'msisdn', msisdn @@ -368,7 +372,9 @@ class RegisterRestServlet(RestServlet): auth_result[LoginType.EMAIL_IDENTITY].threepid.address ) ): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) elif ( constraint['medium'] == 'msisdn' and auth_result and LoginType.MSISDN in auth_result and @@ -377,7 +383,9 @@ class RegisterRestServlet(RestServlet): auth_result[LoginType.MSISDN].threepid.address ) ): - raise SynapseError(403, "3PID denied", Codes.THREEPID_DENIED) + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + ) if registered_user_id is not None: logger.info( From 447f4f0d5f136dcadd5fdc286ded2d6e24a3f686 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 15:33:55 +0000 Subject: [PATCH 22/30] rewrite based on PR feedback: * [ ] split config options into allowed_local_3pids and registrations_require_3pid * [ ] simplify and comment logic for picking registration flows * [ ] fix docstring and move check_3pid_allowed into a new util module * [ ] use check_3pid_allowed everywhere @erikjohnston PTAL --- synapse/config/registration.py | 12 +++- synapse/handlers/register.py | 15 ++--- synapse/rest/client/v1/register.py | 20 +++---- synapse/rest/client/v2_alpha/_base.py | 21 ------- synapse/rest/client/v2_alpha/account.py | 3 +- synapse/rest/client/v2_alpha/register.py | 73 +++++++++++------------- synapse/util/threepids.py | 45 +++++++++++++++ 7 files changed, 101 insertions(+), 88 deletions(-) create mode 100644 synapse/util/threepids.py diff --git a/synapse/config/registration.py b/synapse/config/registration.py index e5e4f77872..336959094b 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -32,6 +32,7 @@ class RegistrationConfig(Config): ) self.registrations_require_3pid = config.get("registrations_require_3pid", []) + self.allowed_local_3pids = config.get("allowed_local_3pids", []) self.registration_shared_secret = config.get("registration_shared_secret") self.bcrypt_rounds = config.get("bcrypt_rounds", 12) @@ -53,11 +54,16 @@ class RegistrationConfig(Config): # Enable registration for new users. enable_registration: False - # Mandate that registrations require a 3PID which matches one or more - # of these 3PIDs. N.B. regexp escape backslashes are doubled (once for - # YAML and once for the regexp itself) + # The user must provide all of the below types of 3PID when registering. # # registrations_require_3pid: + # - email + # - msisdn + + # Mandate that users are only allowed to associate certain formats of + # 3PIDs with accounts on this server. + # + # allowed_local_3pids: # - medium: email # pattern: ".*@matrix\\.org" # - medium: email diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 157ebaf251..9021d4d57f 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -15,7 +15,6 @@ """Contains functions for registering clients.""" import logging -import re from twisted.internet import defer @@ -26,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient from synapse import types from synapse.types import UserID from synapse.util.async import run_on_reactor +from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -308,15 +308,10 @@ class RegistrationHandler(BaseHandler): logger.info("got threepid with medium '%s' and address '%s'", threepid['medium'], threepid['address']) - for constraint in self.hs.config.registrations_require_3pid: - if ( - constraint['medium'] == 'email' and - threepid['medium'] == 'email' and - re.match(constraint['pattern'], threepid['address']) - ): - raise RegistrationError( - 403, "Third party identifier is not allowed" - ) + if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']): + raise RegistrationError( + 403, "Third party identifier is not allowed" + ) @defer.inlineCallbacks def bind_emails(self, user_id, threepidCreds): diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index f793542ad6..5c5fa8f7ab 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -71,22 +71,13 @@ class RegisterRestServlet(ClientV1RestServlet): def on_GET(self, request): - require_email = False - require_msisdn = False - for constraint in self.hs.config.registrations_require_3pid: - if constraint['medium'] == 'email': - require_email = True - elif constraint['medium'] == 'msisdn': - require_msisdn = True - else: - logger.warn( - "Unrecognised 3PID medium %s in registrations_require_3pid" % - constraint['medium'] - ) + require_email = 'email' in self.hs.config.registrations_require_3pid + require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid flows = [] if self.hs.config.enable_registration_captcha: - if require_email or not require_msisdn: + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: flows.extend([ { "type": LoginType.RECAPTCHA, @@ -97,6 +88,7 @@ class RegisterRestServlet(ClientV1RestServlet): ] }, ]) + # only support 3PIDless registration if no 3PIDs are required if not require_email and not require_msisdn: flows.extend([ { @@ -105,6 +97,7 @@ class RegisterRestServlet(ClientV1RestServlet): } ]) else: + # only support the email-only flow if we don't require MSISDN 3PIDs if require_email or not require_msisdn: flows.extend([ { @@ -114,6 +107,7 @@ class RegisterRestServlet(ClientV1RestServlet): ] } ]) + # only support 3PIDless registration if no 3PIDs are required if not require_email and not require_msisdn: flows.extend([ { diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index b286ff0d95..77434937ff 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -60,27 +60,6 @@ def set_timeline_upper_limit(filter_json, filter_timeline_limit): filter_timeline_limit) -def check_3pid_allowed(hs, medium, address): - # check whether the HS has whitelisted the given 3PID - - allow = False - if hs.config.registrations_require_3pid: - for constraint in hs.config.registrations_require_3pid: - logger.debug("Checking 3PID %s (%s) against %s (%s)" % ( - address, medium, constraint['pattern'], constraint['medium'] - )) - if ( - medium == constraint['medium'] and - re.match(constraint['pattern'], address) - ): - allow = True - break - else: - allow = True - - return allow - - def interactive_auth_handler(orig): """Wraps an on_POST method to handle InteractiveAuthIncompleteErrors diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 2977ad439f..514bb37da1 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -26,7 +26,8 @@ from synapse.http.servlet import ( ) from synapse.util.async import run_on_reactor from synapse.util.msisdn import phone_number_to_msisdn -from ._base import client_v2_patterns, interactive_auth_handler, check_3pid_allowed +from synapse.util.threepids import check_3pid_allowed +from ._base import client_v2_patterns, interactive_auth_handler logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 898d8b133a..c3479e29de 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -26,11 +26,11 @@ from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string ) from synapse.util.msisdn import phone_number_to_msisdn +from synapse.util.threepids import check_3pid_allowed -from ._base import client_v2_patterns, interactive_auth_handler, check_3pid_allowed +from ._base import client_v2_patterns, interactive_auth_handler import logging -import re import hmac from hashlib import sha1 from synapse.util.async import run_on_reactor @@ -316,41 +316,41 @@ class RegisterRestServlet(RestServlet): if 'x_show_msisdn' in body and body['x_show_msisdn']: show_msisdn = True - require_email = False - require_msisdn = False - for constraint in self.hs.config.registrations_require_3pid: - if constraint['medium'] == 'email': - require_email = True - elif constraint['medium'] == 'msisdn': - require_msisdn = True - else: - logger.warn( - "Unrecognised 3PID medium %s in registrations_require_3pid" % - constraint['medium'] - ) + # FIXME: need a better error than "no auth flow found" for scenarios + # where we required 3PID for registration but the user didn't give one + require_email = 'email' in self.hs.config.registrations_require_3pid + require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid flows = [] if self.hs.config.enable_registration_captcha: + # only support 3PIDless registration if no 3PIDs are required if not require_email and not require_msisdn: flows.extend([[LoginType.RECAPTCHA]]) - if require_email or not require_msisdn: + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: flows.extend([[LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]]) if show_msisdn: - if not require_email or require_msisdn: + # only support the MSISDN-only flow if we don't require email 3PIDs + if not require_email: flows.extend([[LoginType.MSISDN, LoginType.RECAPTCHA]]) + # always let users provide both MSISDN & email flows.extend([ [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], ]) else: + # only support 3PIDless registration if no 3PIDs are required if not require_email and not require_msisdn: flows.extend([[LoginType.DUMMY]]) - if require_email or not require_msisdn: + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: flows.extend([[LoginType.EMAIL_IDENTITY]]) if show_msisdn: + # only support the MSISDN-only flow if we don't require email 3PIDs if not require_email or require_msisdn: flows.extend([[LoginType.MSISDN]]) + # always let users provide both MSISDN & email flows.extend([ [LoginType.MSISDN, LoginType.EMAIL_IDENTITY] ]) @@ -359,30 +359,23 @@ class RegisterRestServlet(RestServlet): flows, body, self.hs.get_ip_from_request(request) ) - # doublecheck that we're not trying to register an denied 3pid. - # the user-facing checks should already have happened when we requested - # a 3PID token to validate them in /register/email/requestToken etc + # Check that we're not trying to register a denied 3pid. + # + # the user-facing checks will probably already have happened in + # /register/email/requestToken when we requested a 3pid, but that's not + # guaranteed. - for constraint in self.hs.config.registrations_require_3pid: - if ( - constraint['medium'] == 'email' and - auth_result and LoginType.EMAIL_IDENTITY in auth_result and - re.match( - constraint['pattern'], - auth_result[LoginType.EMAIL_IDENTITY].threepid.address - ) - ): - raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED - ) - elif ( - constraint['medium'] == 'msisdn' and - auth_result and LoginType.MSISDN in auth_result and - re.match( - constraint['pattern'], - auth_result[LoginType.MSISDN].threepid.address - ) - ): + if ( + auth_result and + ( + LoginType.EMAIL_IDENTITY in auth_result or + LoginType.EMAIL_MSISDN in auth_result + ) + ): + medium = auth_result[LoginType.EMAIL_IDENTITY].threepid['medium'] + address = auth_result[LoginType.EMAIL_IDENTITY].threepid['address'] + + if not check_3pid_allowed(self.hs, medium, address): raise SynapseError( 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED ) diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py new file mode 100644 index 0000000000..e921b97796 --- /dev/null +++ b/synapse/util/threepids.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re + +logger = logging.getLogger(__name__) + + +def check_3pid_allowed(hs, medium, address): + """Checks whether a given format of 3PID is allowed to be used on this HS + + Args: + hs (synapse.server.HomeServer): server + medium (str): 3pid medium - e.g. email, msisdn + address (str): address within that medium (e.g. "wotan@matrix.org") + msisdns need to first have been canonicalised + """ + + if hs.config.allowed_local_3pids: + for constraint in hs.config.allowed_local_3pids: + logger.debug("Checking 3PID %s (%s) against %s (%s)" % ( + address, medium, constraint['pattern'], constraint['medium'] + )) + if ( + medium == constraint['medium'] and + re.match(constraint['pattern'], address) + ): + return True + else: + return True + + return False From 293380bef761b67479a90d2837bbf6dfa5a70a90 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 15:38:53 +0000 Subject: [PATCH 23/30] trailing commas --- synapse/rest/client/v2_alpha/account.py | 8 ++++---- synapse/rest/client/v2_alpha/register.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 514bb37da1..30523995af 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -50,7 +50,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): if not check_3pid_allowed(self.hs, "email", body['email']): raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, ) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( @@ -86,7 +86,7 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): if not check_3pid_allowed(self.hs, "msisdn", msisdn): raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, ) existingUid = yield self.datastore.get_user_id_by_threepid( @@ -230,7 +230,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): if not check_3pid_allowed(self.hs, "email", body['email']): raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, ) existingUid = yield self.datastore.get_user_id_by_threepid( @@ -273,7 +273,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): if not check_3pid_allowed(self.hs, "msisdn", msisdn): raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, ) existingUid = yield self.datastore.get_user_id_by_threepid( diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c3479e29de..bf68e34a58 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -73,7 +73,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): if not check_3pid_allowed(self.hs, "email", body['email']): raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, ) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( @@ -113,7 +113,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): if not check_3pid_allowed(self.hs, "msisdn", msisdn): raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, ) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( @@ -377,7 +377,7 @@ class RegisterRestServlet(RestServlet): if not check_3pid_allowed(self.hs, medium, address): raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, ) if registered_user_id is not None: From 8fe253f19b1c61c38111948cce00a7d260d2925a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 18:23:45 +0000 Subject: [PATCH 24/30] fix PR nitpicking --- synapse/util/threepids.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py index e921b97796..75efa0117b 100644 --- a/synapse/util/threepids.py +++ b/synapse/util/threepids.py @@ -27,13 +27,16 @@ def check_3pid_allowed(hs, medium, address): medium (str): 3pid medium - e.g. email, msisdn address (str): address within that medium (e.g. "wotan@matrix.org") msisdns need to first have been canonicalised + Returns: + bool: whether the 3PID medium/address is allowed to be added to this HS """ if hs.config.allowed_local_3pids: for constraint in hs.config.allowed_local_3pids: - logger.debug("Checking 3PID %s (%s) against %s (%s)" % ( - address, medium, constraint['pattern'], constraint['medium'] - )) + logger.debug( + "Checking 3PID %s (%s) against %s (%s)", + address, medium, constraint['pattern'], constraint['medium'], + ) if ( medium == constraint['medium'] and re.match(constraint['pattern'], address) From 62d7d66ae5592175bb35a2a8d2f69b1924d6e1f2 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 18:23:56 +0000 Subject: [PATCH 25/30] oops, check all login types --- synapse/rest/client/v2_alpha/register.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index bf68e34a58..4e73f9a40a 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -365,20 +365,17 @@ class RegisterRestServlet(RestServlet): # /register/email/requestToken when we requested a 3pid, but that's not # guaranteed. - if ( - auth_result and - ( - LoginType.EMAIL_IDENTITY in auth_result or - LoginType.EMAIL_MSISDN in auth_result - ) - ): - medium = auth_result[LoginType.EMAIL_IDENTITY].threepid['medium'] - address = auth_result[LoginType.EMAIL_IDENTITY].threepid['address'] + if auth_result: + for login_type in [LoginType.EMAIL_IDENTITY, LoginType.EMAIL_MSISDN]: + if login_type in auth_result: + medium = auth_result[login_type].threepid['medium'] + address = auth_result[login_type].threepid['address'] - if not check_3pid_allowed(self.hs, medium, address): - raise SynapseError( - 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, - ) + if not check_3pid_allowed(self.hs, medium, address): + raise SynapseError( + 403, "Third party identifier is not allowed", + Codes.THREEPID_DENIED, + ) if registered_user_id is not None: logger.info( From ad7ec63d08c9d814766ea4764e187bc7ba5589d7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 19 Jan 2018 18:29:39 +0000 Subject: [PATCH 26/30] Use the right path for url_preview thumbnails This was introduced by #2627: we were overwriting the original media for url previews with the thumbnails :/ (fixes https://github.com/vector-im/riot-web/issues/6012, hopefully) --- synapse/rest/media/v1/media_storage.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 001e84578e..041ae396cd 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -164,6 +164,14 @@ class MediaStorage(object): str """ if file_info.url_cache: + if file_info.thumbnail: + return self.filepaths.url_cache_thumbnail_rel( + media_id=file_info.file_id, + width=file_info.thumbnail_width, + height=file_info.thumbnail_height, + content_type=file_info.thumbnail_type, + method=file_info.thumbnail_method, + ) return self.filepaths.url_cache_filepath_rel(file_info.file_id) if file_info.server_name: From 49fce046243ae3e2c38ac6ac39001172667b8dfa Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 19 Jan 2018 19:55:33 +0000 Subject: [PATCH 27/30] fix typo (thanks sytest) --- synapse/rest/client/v2_alpha/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 4e73f9a40a..3abfe35479 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -366,7 +366,7 @@ class RegisterRestServlet(RestServlet): # guaranteed. if auth_result: - for login_type in [LoginType.EMAIL_IDENTITY, LoginType.EMAIL_MSISDN]: + for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]: if login_type in auth_result: medium = auth_result[login_type].threepid['medium'] address = auth_result[login_type].threepid['address'] From 5c431f421c0edcdc4582da7e0d780355ebf647e1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 22 Jan 2018 16:45:43 +0000 Subject: [PATCH 28/30] Matthew's fixes to the unit tests Extracted from https://github.com/matrix-org/synapse/pull/2820 --- tests/replication/slave/storage/_base.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 81063f19a1..74f104e3b8 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,6 +15,8 @@ from twisted.internet import defer, reactor from tests import unittest +import tempfile + from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory @@ -41,7 +43,9 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.event_id = 0 server_factory = ReplicationStreamProtocolFactory(self.hs) - listener = reactor.listenUNIX("\0xxx", server_factory) + # XXX: mktemp is unsafe and should never be used. but we're just a test. + path = tempfile.mktemp(prefix="base_slaved_store_test_case_socket") + listener = reactor.listenUNIX(path, server_factory) self.addCleanup(listener.stopListening) self.streamer = server_factory.streamer @@ -49,7 +53,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): client_factory = ReplicationClientFactory( self.hs, "client_name", self.replication_handler ) - client_connector = reactor.connectUNIX("\0xxx", client_factory) + client_connector = reactor.connectUNIX(path, client_factory) self.addCleanup(client_factory.stopTrying) self.addCleanup(client_connector.disconnect) From ab9f844aaf3662a64dbc4c56077e9fa37bc7d5d0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 22 Jan 2018 19:11:18 +0100 Subject: [PATCH 29/30] Add federation_domain_whitelist option (#2820) Add federation_domain_whitelist gives a way to restrict which domains your HS is allowed to federate with. useful mainly for gracefully preventing a private but internet-connected HS from trying to federate to the wider public Matrix network --- synapse/api/errors.py | 26 ++++++++++++++++++++ synapse/config/server.py | 22 +++++++++++++++++ synapse/federation/federation_client.py | 5 +++- synapse/federation/transaction_queue.py | 4 +++- synapse/federation/transport/client.py | 3 +++ synapse/federation/transport/server.py | 9 ++++++- synapse/handlers/device.py | 4 ++++ synapse/handlers/e2e_keys.py | 8 ++++++- synapse/handlers/federation.py | 4 ++++ synapse/http/matrixfederationclient.py | 28 +++++++++++++++++++++- synapse/rest/key/v2/remote_key_resource.py | 8 +++++++ synapse/rest/media/v1/media_repository.py | 19 +++++++++++++-- synapse/util/retryutils.py | 12 ++++++++++ tests/utils.py | 1 + 14 files changed, 146 insertions(+), 7 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 46b0d7b34c..aa15f73f36 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -141,6 +141,32 @@ class RegistrationError(SynapseError): pass +class FederationDeniedError(SynapseError): + """An error raised when the server tries to federate with a server which + is not on its federation whitelist. + + Attributes: + destination (str): The destination which has been denied + """ + + def __init__(self, destination): + """Raised by federation client or server to indicate that we are + are deliberately not attempting to contact a given server because it is + not on our federation whitelist. + + Args: + destination (str): the domain in question + """ + + self.destination = destination + + super(FederationDeniedError, self).__init__( + code=403, + msg="Federation denied with %s." % (self.destination,), + errcode=Codes.FORBIDDEN, + ) + + class InteractiveAuthIncompleteError(Exception): """An error raised when UI auth is not yet complete diff --git a/synapse/config/server.py b/synapse/config/server.py index 436dd8a6fe..8f0b6d1f28 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -55,6 +55,17 @@ class ServerConfig(Config): "block_non_admin_invites", False, ) + # FIXME: federation_domain_whitelist needs sytests + self.federation_domain_whitelist = None + federation_domain_whitelist = config.get( + "federation_domain_whitelist", None + ) + # turn the whitelist into a hash for speed of lookup + if federation_domain_whitelist is not None: + self.federation_domain_whitelist = {} + for domain in federation_domain_whitelist: + self.federation_domain_whitelist[domain] = True + if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': self.public_baseurl += '/' @@ -210,6 +221,17 @@ class ServerConfig(Config): # (except those sent by local server admins). The default is False. # block_non_admin_invites: True + # Restrict federation to the following whitelist of domains. + # N.B. we recommend also firewalling your federation listener to limit + # inbound federation traffic as early as possible, rather than relying + # purely on this application-layer restriction. If not specified, the + # default is to whitelist everything. + # + # federation_domain_whitelist: + # - lon.example.com + # - nyc.example.com + # - syd.example.com + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b1fe03f702..813907f7f2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -23,7 +23,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import ( - CodeMessageException, HttpResponseException, SynapseError, + CodeMessageException, HttpResponseException, SynapseError, FederationDeniedError ) from synapse.events import builder from synapse.federation.federation_base import ( @@ -266,6 +266,9 @@ class FederationClient(FederationBase): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e.message) + continue except Exception as e: pdu_attempts[destination] = now diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d39f46583..a141ec9953 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,7 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu -from synapse.api.errors import HttpResponseException +from synapse.api.errors import HttpResponseException, FederationDeniedError from synapse.util import logcontext, PreserveLoggingContext from synapse.util.async import run_on_reactor from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter @@ -490,6 +490,8 @@ class TransactionQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + except FederationDeniedError as e: + logger.info(e) except Exception as e: logger.warn( "TX [%s] Failed to send transaction: %s", diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 1f3ce238f6..5488e82985 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -212,6 +212,9 @@ class TransportLayerClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if the remote destination + is not in our federation whitelist """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2b02b021ec..06c16ba4fa 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, SynapseError, FederationDeniedError from synapse.http.server import JsonResource from synapse.http.servlet import ( parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, @@ -81,6 +81,7 @@ class Authenticator(object): self.keyring = hs.get_keyring() self.server_name = hs.hostname self.store = hs.get_datastore() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks @@ -92,6 +93,12 @@ class Authenticator(object): "signatures": {}, } + if ( + self.federation_domain_whitelist is not None and + self.server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(self.server_name) + if content is not None: json_request["content"] = content diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2152efc692..0e83453851 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api import errors from synapse.api.constants import EventTypes +from synapse.api.errors import FederationDeniedError from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache @@ -513,6 +514,9 @@ class DeviceListEduUpdater(object): # This makes it more likely that the device lists will # eventually become consistent. return + except FederationDeniedError as e: + logger.info(e) + return except Exception: # TODO: Remember that we are now out of sync and try again # later diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5af8abf66b..9aa95f89e6 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,7 +19,9 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer -from synapse.api.errors import SynapseError, CodeMessageException +from synapse.api.errors import ( + SynapseError, CodeMessageException, FederationDeniedError, +) from synapse.types import get_domain_from_id, UserID from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination @@ -140,6 +142,10 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except FederationDeniedError as e: + failures[destination] = { + "status": 403, "message": "Federation Denied", + } except Exception as e: # include ConnectionRefused and other errors failures[destination] = { diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ac70730885..677532c87b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from ._base import BaseHandler from synapse.api.errors import ( AuthError, FederationError, StoreError, CodeMessageException, SynapseError, + FederationDeniedError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator @@ -782,6 +783,9 @@ class FederationHandler(BaseHandler): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e) + continue except Exception as e: logger.exception( "Failed to backfill from %s because %s", diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 833496b72d..9145405cb0 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,7 @@ import synapse.metrics from canonicaljson import encode_canonical_json from synapse.api.errors import ( - SynapseError, Codes, HttpResponseException, + SynapseError, Codes, HttpResponseException, FederationDeniedError, ) from signedjson.sign import sign_json @@ -123,11 +123,22 @@ class MatrixFederationHttpClient(object): Fails with ``HTTPRequestException``: if we get an HTTP response code >= 300. + Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist + (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if ( + self.hs.config.federation_domain_whitelist and + destination not in self.hs.config.federation_domain_whitelist + ): + raise FederationDeniedError(destination) + limiter = yield synapse.util.retryutils.get_retry_limiter( destination, self.clock, @@ -308,6 +319,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ if not json_data_callback: @@ -368,6 +382,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ def body_callback(method, url_bytes, headers_dict): @@ -422,6 +439,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ logger.debug("get_json args: %s", args) @@ -475,6 +495,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ response = yield self._request( @@ -518,6 +541,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ encoded_args = {} diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index cc2842aa72..17e6079cba 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -93,6 +93,7 @@ class RemoteKey(Resource): self.store = hs.get_datastore() self.version_string = hs.version_string self.clock = hs.get_clock() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist def render_GET(self, request): self.async_render_GET(request) @@ -137,6 +138,13 @@ class RemoteKey(Resource): logger.info("Handling query for keys %r", query) store_queries = [] for server_name, key_ids in query.items(): + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + logger.debug("Federation denied with %s", server_name) + continue + if not key_ids: key_ids = (None,) for key_id in key_ids: diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 4f56bcf577..485db8577a 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -32,8 +32,9 @@ from .media_storage import MediaStorage from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.util.stringutils import random_string -from synapse.api.errors import SynapseError, HttpResponseException, \ - NotFoundError +from synapse.api.errors import ( + SynapseError, HttpResponseException, NotFoundError, FederationDeniedError, +) from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii @@ -75,6 +76,8 @@ class MediaRepository(object): self.recently_accessed_remotes = set() self.recently_accessed_locals = set() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist + # List of StorageProviders where we should search for media and # potentially upload to. storage_providers = [] @@ -216,6 +219,12 @@ class MediaRepository(object): Deferred: Resolves once a response has successfully been written to request """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + self.mark_recently_accessed(server_name, media_id) # We linearize here to ensure that we don't try and download remote @@ -250,6 +259,12 @@ class MediaRepository(object): Returns: Deferred[dict]: The media_info of the file """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + # We linearize here to ensure that we don't try and download remote # media multiple times concurrently key = (server_name, media_id) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 1adedbb361..47b0bb5eb3 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -26,6 +26,18 @@ logger = logging.getLogger(__name__) class NotRetryingDestination(Exception): def __init__(self, retry_last_ts, retry_interval, destination): + """Raised by the limiter (and federation client) to indicate that we are + are deliberately not attempting to contact a given server. + + Args: + retry_last_ts (int): the unix ts in milliseconds of our last attempt + to contact the server. 0 indicates that the last attempt was + successful or that we've never actually attempted to connect. + retry_interval (int): the time in milliseconds to wait until the next + attempt. + destination (str): the domain in question + """ + msg = "Not retrying server %s." % (destination,) super(NotRetryingDestination, self).__init__(msg) diff --git a/tests/utils.py b/tests/utils.py index 44e5f75093..3116047892 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -57,6 +57,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.worker_app = None config.email_enable_notifs = False config.block_non_admin_invites = False + config.federation_domain_whitelist = None # disable user directory updates, because they get done in the # background, which upsets the test runner. From d32385336f2edf2c98ae9eb560bad9402860d7cc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 23 Jan 2018 09:59:06 +0100 Subject: [PATCH 30/30] add ?ts massaging for ASes (#2754) blindly implement ?ts for AS. untested --- synapse/rest/client/v1/room.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 682a0af9fc..867ec8602c 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -195,15 +195,20 @@ class RoomSendEventRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request, allow_guest=True) content = parse_json_object_from_request(request) + event_dict = { + "type": event_type, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + } + + if 'ts' in request.args and requester.app_service: + event_dict['origin_server_ts'] = parse_integer(request, "ts", 0) + msg_handler = self.handlers.message_handler event = yield msg_handler.create_and_send_nonmember_event( requester, - { - "type": event_type, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - }, + event_dict, txn_id=txn_id, )