From 4c1b32d7e246a97fad97d03e450e2df078a86bf3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 Apr 2016 18:28:42 +0100 Subject: [PATCH 01/22] Fix login to error for nonexistent users Fixes SYN-680 --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 7a13a8b11c..9341cb5cfe 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -441,7 +441,7 @@ class AuthHandler(BaseHandler): user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) defer.returnValue(not self.validate_hash(password, password_hash)) except LoginError: - defer.returnValue(False) + defer.returnValue(True) @defer.inlineCallbacks def _check_ldap_password(self, user_id, password): From 3c79bdd7a06bcebb547e5a7abb7393fbede7ef0c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 Apr 2016 19:00:21 +0100 Subject: [PATCH 02/22] Fix check_password rather than inverting the meaning of _check_local_password (#730) --- synapse/handlers/auth.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 9341cb5cfe..916632c7d7 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -428,24 +428,27 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def _check_password(self, user_id, password): - defer.returnValue( - not ( - (yield self._check_ldap_password(user_id, password)) - or - (yield self._check_local_password(user_id, password)) - )) + """ + Returns: + True if the user_id successfully authenticated + """ + defer.returnValue(( + (yield self._check_ldap_password(user_id, password)) + or + (yield self._check_local_password(user_id, password)) + )) @defer.inlineCallbacks def _check_local_password(self, user_id, password): try: user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) - defer.returnValue(not self.validate_hash(password, password_hash)) + defer.returnValue(self.validate_hash(password, password_hash)) except LoginError: - defer.returnValue(True) + defer.returnValue(False) @defer.inlineCallbacks def _check_ldap_password(self, user_id, password): - if self.ldap_enabled is not True: + if not self.ldap_enabled: logger.debug("LDAP not configured") defer.returnValue(False) From cb9c465707f265fb2b4606269495ddb53d84db05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Apr 2016 10:21:32 +0100 Subject: [PATCH 03/22] Use SynapseError 504 for Timeout errors --- synapse/util/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index b462495eb8..2b3f0bef3c 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.api.errors import SynapseError from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -80,7 +81,7 @@ class Clock(object): def timed_out_fn(): try: - ret_deferred.errback(RuntimeError("Timed out")) + ret_deferred.errback(SynapseError(504, "Timed out")) except: pass From 6fd2f685fe5722326a7719892368d0fa2aa92efa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Apr 2016 11:17:18 +0100 Subject: [PATCH 04/22] Simplify _check_password --- synapse/handlers/auth.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 916632c7d7..61fe56032a 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -432,11 +432,15 @@ class AuthHandler(BaseHandler): Returns: True if the user_id successfully authenticated """ - defer.returnValue(( - (yield self._check_ldap_password(user_id, password)) - or - (yield self._check_local_password(user_id, password)) - )) + valid_ldap = yield self._check_ldap_password(user_id, password) + if valid_ldap: + defer.returnValue(True) + + valid_local_password = yield self._check_local_password(user_id, password) + if valid_local_password: + defer.returnValue(True) + + defer.returnValue(False) @defer.inlineCallbacks def _check_local_password(self, user_id, password): From 914f1eafac9fe600e4d83a2b9bb6dac6d1dcb707 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Apr 2016 11:22:23 +0100 Subject: [PATCH 05/22] Lower timeout for make_membership_event Calls to make_membership_event are done in response to client requests, and so should not be retried over long timeframes. --- synapse/federation/transport/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 2237e3413c..cd2841c4db 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -179,7 +179,8 @@ class TransportLayerClient(object): content = yield self.client.get_json( destination=destination, path=path, - retry_on_dns_fail=True, + retry_on_dns_fail=False, + timeout=20000, ) defer.returnValue(content) From 84f9cac4d0a7f19b432e683981f66c20339a60f5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 15 Apr 2016 13:19:57 +0100 Subject: [PATCH 06/22] fix cyrillic URL previews by hardcoding all page decoding to UTF-8 for now, rather than relying on lxml's heuristics which seem to get it wrong --- synapse/rest/media/v1/preview_url_resource.py | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index c27ba72735..7e937b0446 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -181,22 +181,14 @@ class PreviewUrlResource(BaseMediaResource): from lxml import html - try: - tree = html.parse(media_info['filename']) - og = yield self._calc_og(tree, media_info, requester) - except UnicodeDecodeError: - # XXX: evil evil bodge - # Empirically, sites like google.com mix Latin-1 and utf-8 - # encodings in the same page. The rogue Latin-1 characters - # cause lxml to choke with a UnicodeDecodeError, so if we - # see this we go and do a manual decode of the HTML before - # handing it to lxml as utf-8 encoding, counter-intuitively, - # which seems to make it happier... - file = open(media_info['filename']) - body = file.read() - file.close() - tree = html.fromstring(body.decode('utf-8', 'ignore')) - og = yield self._calc_og(tree, media_info, requester) + # XXX: always manually try to decode body as utf-8 first, which + # seems to help with most character encoding woes. + # XXX: handle non-utf-8 encodings? + file = open(media_info['filename']) + body = file.read() + file.close() + tree = html.fromstring(body.decode('utf-8', 'ignore')) + og = yield self._calc_og(tree, media_info, requester) else: logger.warn("Failed to find any OG data in %s", url) From aaabbd3e9e514b3779b2004ff8e9f74dd9dc4b6a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 15 Apr 2016 14:32:25 +0100 Subject: [PATCH 07/22] explicitly pass in the charset from Content-Type to lxml to fix cyrillic woes better --- synapse/rest/media/v1/preview_url_resource.py | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 7e937b0446..9bb7c72cfc 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -179,16 +179,28 @@ class PreviewUrlResource(BaseMediaResource): elif self._is_html(media_info['media_type']): # TODO: somehow stop a big HTML tree from exploding synapse's RAM - from lxml import html + from lxml import etree - # XXX: always manually try to decode body as utf-8 first, which - # seems to help with most character encoding woes. - # XXX: handle non-utf-8 encodings? file = open(media_info['filename']) body = file.read() file.close() - tree = html.fromstring(body.decode('utf-8', 'ignore')) - og = yield self._calc_og(tree, media_info, requester) + + # clobber the encoding from the content-type, or default to utf-8 + # XXX: this overrides any or XML charset headers in the body + # which may pose problems, but so far seems to work okay. + match = re.match(r'.*; *charset=(.*?)(;|$)', media_info['media_type'], re.I) + encoding = match.group(1) if match else "utf-8" + + try: + parser = etree.HTMLParser(recover=True, encoding=encoding) + tree = etree.fromstring(body, parser) + og = yield self._calc_og(tree, media_info, requester) + except UnicodeDecodeError: + # blindly try decoding the body as utf-8, which seems to fix + # the charset mismatches on https://google.com + parser = etree.HTMLParser(recover=True, encoding=encoding) + tree = etree.fromstring(body.decode('utf-8', 'ignore'), parser) + og = yield self._calc_og(tree, media_info, requester) else: logger.warn("Failed to find any OG data in %s", url) From eb8619e2562432cdd22a625bb612c907c63f723c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Apr 2016 16:08:32 +0100 Subject: [PATCH 08/22] Create log context in Measure if one doesn't exist --- synapse/util/metrics.py | 23 +++++++++++++++++------ tests/test_state.py | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index c51b641125..e1f374807e 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -50,7 +50,7 @@ block_db_txn_duration = metrics.register_distribution( class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", - "ru_stime", "db_txn_count", "db_txn_duration" + "ru_stime", "db_txn_count", "db_txn_duration", "created_context" ] def __init__(self, clock, name): @@ -58,14 +58,20 @@ class Measure(object): self.name = name self.start_context = None self.start = None + self.created_context = False def __enter__(self): self.start = self.clock.time_msec() self.start_context = LoggingContext.current_context() - if self.start_context: - self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() - self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration = self.start_context.db_txn_duration + if not self.start_context: + logger.warn("Entered Measure without log context: %s", self.name) + self.start_context = LoggingContext("Measure") + self.start_context.__enter__() + self.created_context = True + + self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() + self.db_txn_count = self.start_context.db_txn_count + self.db_txn_duration = self.start_context.db_txn_duration def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None or not self.start_context: @@ -91,7 +97,12 @@ class Measure(object): block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name) block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name) - block_db_txn_count.inc_by(context.db_txn_count - self.db_txn_count, self.name) + block_db_txn_count.inc_by( + context.db_txn_count - self.db_txn_count, self.name + ) block_db_txn_duration.inc_by( context.db_txn_duration - self.db_txn_duration, self.name ) + + if self.created_context: + self.start_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/tests/test_state.py b/tests/test_state.py index a1ea7ef672..1a11bbcee0 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -140,13 +140,13 @@ class StateTestCase(unittest.TestCase): "add_event_hashes", ] ) - hs = Mock(spec=[ + hs = Mock(spec_set=[ "get_datastore", "get_auth", "get_state_handler", "get_clock", ]) hs.get_datastore.return_value = self.store hs.get_state_handler.return_value = None - hs.get_auth.return_value = Auth(hs) hs.get_clock.return_value = MockClock() + hs.get_auth.return_value = Auth(hs) self.state = StateHandler(hs) self.event_id = 0 From 43f0941e8f478e22558db6fdc205b730c0087556 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 11:24:59 +0100 Subject: [PATCH 09/22] Split out BaseMediaResource into MediaRepository This is so that a single MediaRepository can be shared across all resources, rather than having a "copy" per resource. In particular this allows us to guard against both the thumbnail and download resource triggering a download of remote content at the same time. --- synapse/rest/media/v1/base_resource.py | 160 ++++++++++-------- synapse/rest/media/v1/download_resource.py | 24 ++- synapse/rest/media/v1/media_repository.py | 12 +- synapse/rest/media/v1/preview_url_resource.py | 20 ++- synapse/rest/media/v1/thumbnail_resource.py | 51 +++--- synapse/rest/media/v1/upload_resource.py | 51 ++---- 6 files changed, 180 insertions(+), 138 deletions(-) diff --git a/synapse/rest/media/v1/base_resource.py b/synapse/rest/media/v1/base_resource.py index 2b1938dc8e..ade4e28034 100644 --- a/synapse/rest/media/v1/base_resource.py +++ b/synapse/rest/media/v1/base_resource.py @@ -23,7 +23,6 @@ from synapse.api.errors import ( ) from twisted.internet import defer, threads -from twisted.web.resource import Resource from twisted.protocols.basic import FileSender from synapse.util.async import ObservableDeferred @@ -60,11 +59,66 @@ def parse_media_id(request): ) -class BaseMediaResource(Resource): - isLeaf = True +def respond_404(request): + respond_with_json( + request, 404, + cs_error( + "Not found %r" % (request.postpath,), + code=Codes.NOT_FOUND, + ), + send_cors=True + ) + +@defer.inlineCallbacks +def respond_with_file(request, media_type, file_path, + file_size=None, upload_name=None): + logger.debug("Responding with %r", file_path) + + if os.path.isfile(file_path): + request.setHeader(b"Content-Type", media_type.encode("UTF-8")) + if upload_name: + if is_ascii(upload_name): + request.setHeader( + b"Content-Disposition", + b"inline; filename=%s" % ( + urllib.quote(upload_name.encode("utf-8")), + ), + ) + else: + request.setHeader( + b"Content-Disposition", + b"inline; filename*=utf-8''%s" % ( + urllib.quote(upload_name.encode("utf-8")), + ), + ) + + # cache for at least a day. + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our + # clients are smart enough to be happy with Cache-Control + request.setHeader( + b"Cache-Control", b"public,max-age=86400,s-maxage=86400" + ) + if file_size is None: + stat = os.stat(file_path) + file_size = stat.st_size + + request.setHeader( + b"Content-Length", b"%d" % (file_size,) + ) + + with open(file_path, "rb") as f: + yield FileSender().beginFileTransfer(f, request) + + finish_request(request) + else: + respond_404(request) + + +class MediaRepository(object): def __init__(self, hs, filepaths): - Resource.__init__(self) self.auth = hs.get_auth() self.client = MatrixFederationHttpClient(hs) self.clock = hs.get_clock() @@ -72,30 +126,48 @@ class BaseMediaResource(Resource): self.store = hs.get_datastore() self.max_upload_size = hs.config.max_upload_size self.max_image_pixels = hs.config.max_image_pixels - self.max_spider_size = hs.config.max_spider_size self.filepaths = filepaths - self.version_string = hs.version_string self.downloads = {} self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.thumbnail_requirements = hs.config.thumbnail_requirements - def _respond_404(self, request): - respond_with_json( - request, 404, - cs_error( - "Not found %r" % (request.postpath,), - code=Codes.NOT_FOUND, - ), - send_cors=True - ) - @staticmethod def _makedirs(filepath): dirname = os.path.dirname(filepath) if not os.path.exists(dirname): os.makedirs(dirname) - def _get_remote_media(self, server_name, media_id): + @defer.inlineCallbacks + def create_content(self, media_type, upload_name, content, content_length, + auth_user): + media_id = random_string(24) + + fname = self.filepaths.local_media_filepath(media_id) + self._makedirs(fname) + + # This shouldn't block for very long because the content will have + # already been uploaded at this point. + with open(fname, "wb") as f: + f.write(content) + + yield self.store.store_local_media( + media_id=media_id, + media_type=media_type, + time_now_ms=self.clock.time_msec(), + upload_name=upload_name, + media_length=content_length, + user_id=auth_user, + ) + media_info = { + "media_type": media_type, + "media_length": content_length, + } + + yield self._generate_local_thumbnails(media_id, media_info) + + defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) + + def get_remote_media(self, server_name, media_id): key = (server_name, media_id) download = self.downloads.get(key) if download is None: @@ -197,52 +269,6 @@ class BaseMediaResource(Resource): defer.returnValue(media_info) - @defer.inlineCallbacks - def _respond_with_file(self, request, media_type, file_path, - file_size=None, upload_name=None): - logger.debug("Responding with %r", file_path) - - if os.path.isfile(file_path): - request.setHeader(b"Content-Type", media_type.encode("UTF-8")) - if upload_name: - if is_ascii(upload_name): - request.setHeader( - b"Content-Disposition", - b"inline; filename=%s" % ( - urllib.quote(upload_name.encode("utf-8")), - ), - ) - else: - request.setHeader( - b"Content-Disposition", - b"inline; filename*=utf-8''%s" % ( - urllib.quote(upload_name.encode("utf-8")), - ), - ) - - # cache for at least a day. - # XXX: we might want to turn this off for data we don't want to - # recommend caching as it's sensitive or private - or at least - # select private. don't bother setting Expires as all our - # clients are smart enough to be happy with Cache-Control - request.setHeader( - b"Cache-Control", b"public,max-age=86400,s-maxage=86400" - ) - if file_size is None: - stat = os.stat(file_path) - file_size = stat.st_size - - request.setHeader( - b"Content-Length", b"%d" % (file_size,) - ) - - with open(file_path, "rb") as f: - yield FileSender().beginFileTransfer(f, request) - - finish_request(request) - else: - self._respond_404(request) - def _get_thumbnail_requirements(self, media_type): return self.thumbnail_requirements.get(media_type, ()) @@ -269,8 +295,8 @@ class BaseMediaResource(Resource): return t_len @defer.inlineCallbacks - def _generate_local_exact_thumbnail(self, media_id, t_width, t_height, - t_method, t_type): + def generate_local_exact_thumbnail(self, media_id, t_width, t_height, + t_method, t_type): input_path = self.filepaths.local_media_filepath(media_id) t_path = self.filepaths.local_media_thumbnail( @@ -292,8 +318,8 @@ class BaseMediaResource(Resource): defer.returnValue(t_path) @defer.inlineCallbacks - def _generate_remote_exact_thumbnail(self, server_name, file_id, media_id, - t_width, t_height, t_method, t_type): + def generate_remote_exact_thumbnail(self, server_name, file_id, media_id, + t_width, t_height, t_method, t_type): input_path = self.filepaths.remote_media_filepath(server_name, file_id) t_path = self.filepaths.remote_media_thumbnail( diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 1aad6b3551..97f4e9b54b 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .base_resource import BaseMediaResource, parse_media_id +from .base_resource import parse_media_id, respond_with_file, respond_404 +from twisted.web.resource import Resource from synapse.http.server import request_handler from twisted.web.server import NOT_DONE_YET @@ -24,7 +25,18 @@ import logging logger = logging.getLogger(__name__) -class DownloadResource(BaseMediaResource): +class DownloadResource(Resource): + isLeaf = True + + def __init__(self, hs, media_repo): + Resource.__init__(self) + + self.filepaths = media_repo.filepaths + self.media_repo = media_repo + self.server_name = hs.hostname + self.store = hs.get_datastore() + self.version_string = hs.version_string + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -44,7 +56,7 @@ class DownloadResource(BaseMediaResource): def _respond_local_file(self, request, media_id, name): media_info = yield self.store.get_local_media(media_id) if not media_info: - self._respond_404(request) + respond_404(request) return media_type = media_info["media_type"] @@ -52,14 +64,14 @@ class DownloadResource(BaseMediaResource): upload_name = name if name else media_info["upload_name"] file_path = self.filepaths.local_media_filepath(media_id) - yield self._respond_with_file( + yield respond_with_file( request, media_type, file_path, media_length, upload_name=upload_name, ) @defer.inlineCallbacks def _respond_remote_file(self, request, server_name, media_id, name): - media_info = yield self._get_remote_media(server_name, media_id) + media_info = yield self.media_repo.get_remote_media(server_name, media_id) media_type = media_info["media_type"] media_length = media_info["media_length"] @@ -70,7 +82,7 @@ class DownloadResource(BaseMediaResource): server_name, filesystem_id ) - yield self._respond_with_file( + yield respond_with_file( request, media_type, file_path, media_length, upload_name=upload_name, ) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 77fb0313c5..e8fe3302b2 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from .base_resource import MediaRepository from .upload_resource import UploadResource from .download_resource import DownloadResource from .thumbnail_resource import ThumbnailResource @@ -75,9 +76,12 @@ class MediaRepositoryResource(Resource): def __init__(self, hs): Resource.__init__(self) filepaths = MediaFilePaths(hs.config.media_store_path) - self.putChild("upload", UploadResource(hs, filepaths)) - self.putChild("download", DownloadResource(hs, filepaths)) - self.putChild("thumbnail", ThumbnailResource(hs, filepaths)) + + media_repo = MediaRepository(hs, filepaths) + + self.putChild("upload", UploadResource(hs, media_repo)) + self.putChild("download", DownloadResource(hs, media_repo)) + self.putChild("thumbnail", ThumbnailResource(hs, media_repo)) self.putChild("identicon", IdenticonResource()) if hs.config.url_preview_enabled: - self.putChild("preview_url", PreviewUrlResource(hs, filepaths)) + self.putChild("preview_url", PreviewUrlResource(hs, media_repo)) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 9bb7c72cfc..fecdf8ed86 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -13,10 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .base_resource import BaseMediaResource - from twisted.web.server import NOT_DONE_YET from twisted.internet import defer +from twisted.web.resource import Resource from synapse.api.errors import ( SynapseError, Codes, @@ -41,11 +40,11 @@ import logging logger = logging.getLogger(__name__) -class PreviewUrlResource(BaseMediaResource): +class PreviewUrlResource(Resource): isLeaf = True - def __init__(self, hs, filepaths): - BaseMediaResource.__init__(self, hs, filepaths) + def __init__(self, hs, media_repo): + Resource.__init__(self) self.client = SpiderHttpClient(hs) if hasattr(hs.config, "url_preview_url_blacklist"): self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist @@ -61,6 +60,13 @@ class PreviewUrlResource(BaseMediaResource): self.downloads = {} + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.version_string = hs.version_string + self.filepaths = media_repo.filepaths + self.max_spider_size = hs.config.max_spider_size + self.server_name = hs.hostname + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -156,7 +162,7 @@ class PreviewUrlResource(BaseMediaResource): logger.debug("got media_info of '%s'" % media_info) if self._is_media(media_info['media_type']): - dims = yield self._generate_local_thumbnails( + dims = yield self.media_repo._generate_local_thumbnails( media_info['filesystem_id'], media_info ) @@ -291,7 +297,7 @@ class PreviewUrlResource(BaseMediaResource): if self._is_media(image_info['media_type']): # TODO: make sure we don't choke on white-on-transparent images - dims = yield self._generate_local_thumbnails( + dims = yield self.media_repo._generate_local_thumbnails( image_info['filesystem_id'], image_info ) if dims: diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 40ef22459c..43c568b769 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -14,7 +14,8 @@ # limitations under the License. -from .base_resource import BaseMediaResource, parse_media_id +from .base_resource import parse_media_id, respond_404, respond_with_file +from twisted.web.resource import Resource from synapse.http.servlet import parse_string, parse_integer from synapse.http.server import request_handler @@ -26,9 +27,19 @@ import logging logger = logging.getLogger(__name__) -class ThumbnailResource(BaseMediaResource): +class ThumbnailResource(Resource): isLeaf = True + def __init__(self, hs, media_repo): + Resource.__init__(self) + + self.store = hs.get_datastore() + self.filepaths = media_repo.filepaths + self.media_repo = media_repo + self.dynamic_thumbnails = hs.config.dynamic_thumbnails + self.server_name = hs.hostname + self.version_string = hs.version_string + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -69,12 +80,12 @@ class ThumbnailResource(BaseMediaResource): media_info = yield self.store.get_local_media(media_id) if not media_info: - self._respond_404(request) + respond_404(request) return # if media_info["media_type"] == "image/svg+xml": # file_path = self.filepaths.local_media_filepath(media_id) - # yield self._respond_with_file(request, media_info["media_type"], file_path) + # yield respond_with_file(request, media_info["media_type"], file_path) # return thumbnail_infos = yield self.store.get_local_media_thumbnails(media_id) @@ -91,7 +102,7 @@ class ThumbnailResource(BaseMediaResource): file_path = self.filepaths.local_media_thumbnail( media_id, t_width, t_height, t_type, t_method, ) - yield self._respond_with_file(request, t_type, file_path) + yield respond_with_file(request, t_type, file_path) else: yield self._respond_default_thumbnail( @@ -105,12 +116,12 @@ class ThumbnailResource(BaseMediaResource): media_info = yield self.store.get_local_media(media_id) if not media_info: - self._respond_404(request) + respond_404(request) return # if media_info["media_type"] == "image/svg+xml": # file_path = self.filepaths.local_media_filepath(media_id) - # yield self._respond_with_file(request, media_info["media_type"], file_path) + # yield respond_with_file(request, media_info["media_type"], file_path) # return thumbnail_infos = yield self.store.get_local_media_thumbnails(media_id) @@ -124,18 +135,18 @@ class ThumbnailResource(BaseMediaResource): file_path = self.filepaths.local_media_thumbnail( media_id, desired_width, desired_height, desired_type, desired_method, ) - yield self._respond_with_file(request, desired_type, file_path) + yield respond_with_file(request, desired_type, file_path) return logger.debug("We don't have a local thumbnail of that size. Generating") # Okay, so we generate one. - file_path = yield self._generate_local_exact_thumbnail( + file_path = yield self.media_repo.generate_local_exact_thumbnail( media_id, desired_width, desired_height, desired_method, desired_type ) if file_path: - yield self._respond_with_file(request, desired_type, file_path) + yield respond_with_file(request, desired_type, file_path) else: yield self._respond_default_thumbnail( request, media_info, desired_width, desired_height, @@ -146,11 +157,11 @@ class ThumbnailResource(BaseMediaResource): def _select_or_generate_remote_thumbnail(self, request, server_name, media_id, desired_width, desired_height, desired_method, desired_type): - media_info = yield self._get_remote_media(server_name, media_id) + media_info = yield self.media_repo.get_remote_media(server_name, media_id) # if media_info["media_type"] == "image/svg+xml": # file_path = self.filepaths.remote_media_filepath(server_name, media_id) - # yield self._respond_with_file(request, media_info["media_type"], file_path) + # yield respond_with_file(request, media_info["media_type"], file_path) # return thumbnail_infos = yield self.store.get_remote_media_thumbnails( @@ -170,19 +181,19 @@ class ThumbnailResource(BaseMediaResource): server_name, file_id, desired_width, desired_height, desired_type, desired_method, ) - yield self._respond_with_file(request, desired_type, file_path) + yield respond_with_file(request, desired_type, file_path) return logger.debug("We don't have a local thumbnail of that size. Generating") # Okay, so we generate one. - file_path = yield self._generate_remote_exact_thumbnail( + file_path = yield self.media_repo.generate_remote_exact_thumbnail( server_name, file_id, media_id, desired_width, desired_height, desired_method, desired_type ) if file_path: - yield self._respond_with_file(request, desired_type, file_path) + yield respond_with_file(request, desired_type, file_path) else: yield self._respond_default_thumbnail( request, media_info, desired_width, desired_height, @@ -194,11 +205,11 @@ class ThumbnailResource(BaseMediaResource): height, method, m_type): # TODO: Don't download the whole remote file # We should proxy the thumbnail from the remote server instead. - media_info = yield self._get_remote_media(server_name, media_id) + media_info = yield self.media_repo.get_remote_media(server_name, media_id) # if media_info["media_type"] == "image/svg+xml": # file_path = self.filepaths.remote_media_filepath(server_name, media_id) - # yield self._respond_with_file(request, media_info["media_type"], file_path) + # yield respond_with_file(request, media_info["media_type"], file_path) # return thumbnail_infos = yield self.store.get_remote_media_thumbnails( @@ -219,7 +230,7 @@ class ThumbnailResource(BaseMediaResource): file_path = self.filepaths.remote_media_thumbnail( server_name, file_id, t_width, t_height, t_type, t_method, ) - yield self._respond_with_file(request, t_type, file_path, t_length) + yield respond_with_file(request, t_type, file_path, t_length) else: yield self._respond_default_thumbnail( request, media_info, width, height, method, m_type, @@ -245,7 +256,7 @@ class ThumbnailResource(BaseMediaResource): "_default", "_default", ) if not thumbnail_infos: - self._respond_404(request) + respond_404(request) return thumbnail_info = self._select_thumbnail( @@ -261,7 +272,7 @@ class ThumbnailResource(BaseMediaResource): file_path = self.filepaths.default_thumbnail( top_level_type, sub_type, t_width, t_height, t_type, t_method, ) - yield self.respond_with_file(request, t_type, file_path, t_length) + yield respond_with_file(request, t_type, file_path, t_length) def _select_thumbnail(self, desired_width, desired_height, desired_method, desired_type, thumbnail_infos): diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 9c7ad4ae85..299e1f6e56 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -15,20 +15,33 @@ from synapse.http.server import respond_with_json, request_handler -from synapse.util.stringutils import random_string from synapse.api.errors import SynapseError from twisted.web.server import NOT_DONE_YET from twisted.internet import defer -from .base_resource import BaseMediaResource +from twisted.web.resource import Resource import logging logger = logging.getLogger(__name__) -class UploadResource(BaseMediaResource): +class UploadResource(Resource): + isLeaf = True + + def __init__(self, hs, media_repo): + Resource.__init__(self) + + self.media_repo = media_repo + self.filepaths = media_repo.filepaths + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.server_name = hs.hostname + self.auth = hs.get_auth() + self.max_upload_size = hs.config.max_upload_size + self.version_string = hs.version_string + def render_POST(self, request): self._async_render_POST(request) return NOT_DONE_YET @@ -37,36 +50,6 @@ class UploadResource(BaseMediaResource): respond_with_json(request, 200, {}, send_cors=True) return NOT_DONE_YET - @defer.inlineCallbacks - def create_content(self, media_type, upload_name, content, content_length, - auth_user): - media_id = random_string(24) - - fname = self.filepaths.local_media_filepath(media_id) - self._makedirs(fname) - - # This shouldn't block for very long because the content will have - # already been uploaded at this point. - with open(fname, "wb") as f: - f.write(content) - - yield self.store.store_local_media( - media_id=media_id, - media_type=media_type, - time_now_ms=self.clock.time_msec(), - upload_name=upload_name, - media_length=content_length, - user_id=auth_user, - ) - media_info = { - "media_type": media_type, - "media_length": content_length, - } - - yield self._generate_local_thumbnails(media_id, media_info) - - defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) - @request_handler @defer.inlineCallbacks def _async_render_POST(self, request): @@ -108,7 +91,7 @@ class UploadResource(BaseMediaResource): # disposition = headers.getRawHeaders("Content-Disposition")[0] # TODO(markjh): parse content-dispostion - content_uri = yield self.create_content( + content_uri = yield self.media_repo.create_content( media_type, upload_name, request.content.read(), content_length, requester.user ) From 0c93df89b65e19952d497885d15417939aa2a2d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 11:31:43 +0100 Subject: [PATCH 10/22] Move MediaRepository to media_repository module --- synapse/rest/media/v1/_base.py | 110 +++++ synapse/rest/media/v1/base_resource.py | 486 -------------------- synapse/rest/media/v1/download_resource.py | 2 +- synapse/rest/media/v1/media_repository.py | 385 +++++++++++++++- synapse/rest/media/v1/thumbnail_resource.py | 2 +- 5 files changed, 496 insertions(+), 489 deletions(-) create mode 100644 synapse/rest/media/v1/_base.py delete mode 100644 synapse/rest/media/v1/base_resource.py diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py new file mode 100644 index 0000000000..b9600f2167 --- /dev/null +++ b/synapse/rest/media/v1/_base.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json, finish_request +from synapse.api.errors import ( + cs_error, Codes, SynapseError +) + +from twisted.internet import defer +from twisted.protocols.basic import FileSender + +from synapse.util.stringutils import is_ascii + +import os + +import logging +import urllib +import urlparse + +logger = logging.getLogger(__name__) + + +def parse_media_id(request): + try: + # This allows users to append e.g. /test.png to the URL. Useful for + # clients that parse the URL to see content type. + server_name, media_id = request.postpath[:2] + file_name = None + if len(request.postpath) > 2: + try: + file_name = urlparse.unquote(request.postpath[-1]).decode("utf-8") + except UnicodeDecodeError: + pass + return server_name, media_id, file_name + except: + raise SynapseError( + 404, + "Invalid media id token %r" % (request.postpath,), + Codes.UNKNOWN, + ) + + +def respond_404(request): + respond_with_json( + request, 404, + cs_error( + "Not found %r" % (request.postpath,), + code=Codes.NOT_FOUND, + ), + send_cors=True + ) + + +@defer.inlineCallbacks +def respond_with_file(request, media_type, file_path, + file_size=None, upload_name=None): + logger.debug("Responding with %r", file_path) + + if os.path.isfile(file_path): + request.setHeader(b"Content-Type", media_type.encode("UTF-8")) + if upload_name: + if is_ascii(upload_name): + request.setHeader( + b"Content-Disposition", + b"inline; filename=%s" % ( + urllib.quote(upload_name.encode("utf-8")), + ), + ) + else: + request.setHeader( + b"Content-Disposition", + b"inline; filename*=utf-8''%s" % ( + urllib.quote(upload_name.encode("utf-8")), + ), + ) + + # cache for at least a day. + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our + # clients are smart enough to be happy with Cache-Control + request.setHeader( + b"Cache-Control", b"public,max-age=86400,s-maxage=86400" + ) + if file_size is None: + stat = os.stat(file_path) + file_size = stat.st_size + + request.setHeader( + b"Content-Length", b"%d" % (file_size,) + ) + + with open(file_path, "rb") as f: + yield FileSender().beginFileTransfer(f, request) + + finish_request(request) + else: + respond_404(request) diff --git a/synapse/rest/media/v1/base_resource.py b/synapse/rest/media/v1/base_resource.py deleted file mode 100644 index ade4e28034..0000000000 --- a/synapse/rest/media/v1/base_resource.py +++ /dev/null @@ -1,486 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .thumbnailer import Thumbnailer - -from synapse.http.matrixfederationclient import MatrixFederationHttpClient -from synapse.http.server import respond_with_json, finish_request -from synapse.util.stringutils import random_string -from synapse.api.errors import ( - cs_error, Codes, SynapseError -) - -from twisted.internet import defer, threads -from twisted.protocols.basic import FileSender - -from synapse.util.async import ObservableDeferred -from synapse.util.stringutils import is_ascii -from synapse.util.logcontext import preserve_context_over_fn - -import os - -import cgi -import logging -import urllib -import urlparse - -logger = logging.getLogger(__name__) - - -def parse_media_id(request): - try: - # This allows users to append e.g. /test.png to the URL. Useful for - # clients that parse the URL to see content type. - server_name, media_id = request.postpath[:2] - file_name = None - if len(request.postpath) > 2: - try: - file_name = urlparse.unquote(request.postpath[-1]).decode("utf-8") - except UnicodeDecodeError: - pass - return server_name, media_id, file_name - except: - raise SynapseError( - 404, - "Invalid media id token %r" % (request.postpath,), - Codes.UNKNOWN, - ) - - -def respond_404(request): - respond_with_json( - request, 404, - cs_error( - "Not found %r" % (request.postpath,), - code=Codes.NOT_FOUND, - ), - send_cors=True - ) - - -@defer.inlineCallbacks -def respond_with_file(request, media_type, file_path, - file_size=None, upload_name=None): - logger.debug("Responding with %r", file_path) - - if os.path.isfile(file_path): - request.setHeader(b"Content-Type", media_type.encode("UTF-8")) - if upload_name: - if is_ascii(upload_name): - request.setHeader( - b"Content-Disposition", - b"inline; filename=%s" % ( - urllib.quote(upload_name.encode("utf-8")), - ), - ) - else: - request.setHeader( - b"Content-Disposition", - b"inline; filename*=utf-8''%s" % ( - urllib.quote(upload_name.encode("utf-8")), - ), - ) - - # cache for at least a day. - # XXX: we might want to turn this off for data we don't want to - # recommend caching as it's sensitive or private - or at least - # select private. don't bother setting Expires as all our - # clients are smart enough to be happy with Cache-Control - request.setHeader( - b"Cache-Control", b"public,max-age=86400,s-maxage=86400" - ) - if file_size is None: - stat = os.stat(file_path) - file_size = stat.st_size - - request.setHeader( - b"Content-Length", b"%d" % (file_size,) - ) - - with open(file_path, "rb") as f: - yield FileSender().beginFileTransfer(f, request) - - finish_request(request) - else: - respond_404(request) - - -class MediaRepository(object): - def __init__(self, hs, filepaths): - self.auth = hs.get_auth() - self.client = MatrixFederationHttpClient(hs) - self.clock = hs.get_clock() - self.server_name = hs.hostname - self.store = hs.get_datastore() - self.max_upload_size = hs.config.max_upload_size - self.max_image_pixels = hs.config.max_image_pixels - self.filepaths = filepaths - self.downloads = {} - self.dynamic_thumbnails = hs.config.dynamic_thumbnails - self.thumbnail_requirements = hs.config.thumbnail_requirements - - @staticmethod - def _makedirs(filepath): - dirname = os.path.dirname(filepath) - if not os.path.exists(dirname): - os.makedirs(dirname) - - @defer.inlineCallbacks - def create_content(self, media_type, upload_name, content, content_length, - auth_user): - media_id = random_string(24) - - fname = self.filepaths.local_media_filepath(media_id) - self._makedirs(fname) - - # This shouldn't block for very long because the content will have - # already been uploaded at this point. - with open(fname, "wb") as f: - f.write(content) - - yield self.store.store_local_media( - media_id=media_id, - media_type=media_type, - time_now_ms=self.clock.time_msec(), - upload_name=upload_name, - media_length=content_length, - user_id=auth_user, - ) - media_info = { - "media_type": media_type, - "media_length": content_length, - } - - yield self._generate_local_thumbnails(media_id, media_info) - - defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) - - def get_remote_media(self, server_name, media_id): - key = (server_name, media_id) - download = self.downloads.get(key) - if download is None: - download = self._get_remote_media_impl(server_name, media_id) - download = ObservableDeferred( - download, - consumeErrors=True - ) - self.downloads[key] = download - - @download.addBoth - def callback(media_info): - del self.downloads[key] - return media_info - return download.observe() - - @defer.inlineCallbacks - def _get_remote_media_impl(self, server_name, media_id): - media_info = yield self.store.get_cached_remote_media( - server_name, media_id - ) - if not media_info: - media_info = yield self._download_remote_file( - server_name, media_id - ) - defer.returnValue(media_info) - - @defer.inlineCallbacks - def _download_remote_file(self, server_name, media_id): - file_id = random_string(24) - - fname = self.filepaths.remote_media_filepath( - server_name, file_id - ) - self._makedirs(fname) - - try: - with open(fname, "wb") as f: - request_path = "/".join(( - "/_matrix/media/v1/download", server_name, media_id, - )) - length, headers = yield self.client.get_file( - server_name, request_path, output_stream=f, - max_size=self.max_upload_size, - ) - media_type = headers["Content-Type"][0] - time_now_ms = self.clock.time_msec() - - content_disposition = headers.get("Content-Disposition", None) - if content_disposition: - _, params = cgi.parse_header(content_disposition[0],) - upload_name = None - - # First check if there is a valid UTF-8 filename - upload_name_utf8 = params.get("filename*", None) - if upload_name_utf8: - if upload_name_utf8.lower().startswith("utf-8''"): - upload_name = upload_name_utf8[7:] - - # If there isn't check for an ascii name. - if not upload_name: - upload_name_ascii = params.get("filename", None) - if upload_name_ascii and is_ascii(upload_name_ascii): - upload_name = upload_name_ascii - - if upload_name: - upload_name = urlparse.unquote(upload_name) - try: - upload_name = upload_name.decode("utf-8") - except UnicodeDecodeError: - upload_name = None - else: - upload_name = None - - yield self.store.store_cached_remote_media( - origin=server_name, - media_id=media_id, - media_type=media_type, - time_now_ms=self.clock.time_msec(), - upload_name=upload_name, - media_length=length, - filesystem_id=file_id, - ) - except: - os.remove(fname) - raise - - media_info = { - "media_type": media_type, - "media_length": length, - "upload_name": upload_name, - "created_ts": time_now_ms, - "filesystem_id": file_id, - } - - yield self._generate_remote_thumbnails( - server_name, media_id, media_info - ) - - defer.returnValue(media_info) - - def _get_thumbnail_requirements(self, media_type): - return self.thumbnail_requirements.get(media_type, ()) - - def _generate_thumbnail(self, input_path, t_path, t_width, t_height, - t_method, t_type): - thumbnailer = Thumbnailer(input_path) - m_width = thumbnailer.width - m_height = thumbnailer.height - - if m_width * m_height >= self.max_image_pixels: - logger.info( - "Image too large to thumbnail %r x %r > %r", - m_width, m_height, self.max_image_pixels - ) - return - - if t_method == "crop": - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) - elif t_method == "scale": - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) - else: - t_len = None - - return t_len - - @defer.inlineCallbacks - def generate_local_exact_thumbnail(self, media_id, t_width, t_height, - t_method, t_type): - input_path = self.filepaths.local_media_filepath(media_id) - - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - - t_len = yield preserve_context_over_fn( - threads.deferToThread, - self._generate_thumbnail, - input_path, t_path, t_width, t_height, t_method, t_type - ) - - if t_len: - yield self.store.store_local_thumbnail( - media_id, t_width, t_height, t_type, t_method, t_len - ) - - defer.returnValue(t_path) - - @defer.inlineCallbacks - def generate_remote_exact_thumbnail(self, server_name, file_id, media_id, - t_width, t_height, t_method, t_type): - input_path = self.filepaths.remote_media_filepath(server_name, file_id) - - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - - t_len = yield preserve_context_over_fn( - threads.deferToThread, - self._generate_thumbnail, - input_path, t_path, t_width, t_height, t_method, t_type - ) - - if t_len: - yield self.store.store_remote_media_thumbnail( - server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len - ) - - defer.returnValue(t_path) - - @defer.inlineCallbacks - def _generate_local_thumbnails(self, media_id, media_info): - media_type = media_info["media_type"] - requirements = self._get_thumbnail_requirements(media_type) - if not requirements: - return - - input_path = self.filepaths.local_media_filepath(media_id) - thumbnailer = Thumbnailer(input_path) - m_width = thumbnailer.width - m_height = thumbnailer.height - - if m_width * m_height >= self.max_image_pixels: - logger.info( - "Image too large to thumbnail %r x %r > %r", - m_width, m_height, self.max_image_pixels - ) - return - - local_thumbnails = [] - - def generate_thumbnails(): - scales = set() - crops = set() - for r_width, r_height, r_method, r_type in requirements: - if r_method == "scale": - t_width, t_height = thumbnailer.aspect(r_width, r_height) - scales.add(( - min(m_width, t_width), min(m_height, t_height), r_type, - )) - elif r_method == "crop": - crops.add((r_width, r_height, r_type)) - - for t_width, t_height, t_type in scales: - t_method = "scale" - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) - - local_thumbnails.append(( - media_id, t_width, t_height, t_type, t_method, t_len - )) - - for t_width, t_height, t_type in crops: - if (t_width, t_height, t_type) in scales: - # If the aspect ratio of the cropped thumbnail matches a purely - # scaled one then there is no point in calculating a separate - # thumbnail. - continue - t_method = "crop" - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) - local_thumbnails.append(( - media_id, t_width, t_height, t_type, t_method, t_len - )) - - yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) - - for l in local_thumbnails: - yield self.store.store_local_thumbnail(*l) - - defer.returnValue({ - "width": m_width, - "height": m_height, - }) - - @defer.inlineCallbacks - def _generate_remote_thumbnails(self, server_name, media_id, media_info): - media_type = media_info["media_type"] - file_id = media_info["filesystem_id"] - requirements = self._get_thumbnail_requirements(media_type) - if not requirements: - return - - remote_thumbnails = [] - - input_path = self.filepaths.remote_media_filepath(server_name, file_id) - thumbnailer = Thumbnailer(input_path) - m_width = thumbnailer.width - m_height = thumbnailer.height - - def generate_thumbnails(): - if m_width * m_height >= self.max_image_pixels: - logger.info( - "Image too large to thumbnail %r x %r > %r", - m_width, m_height, self.max_image_pixels - ) - return - - scales = set() - crops = set() - for r_width, r_height, r_method, r_type in requirements: - if r_method == "scale": - t_width, t_height = thumbnailer.aspect(r_width, r_height) - scales.add(( - min(m_width, t_width), min(m_height, t_height), r_type, - )) - elif r_method == "crop": - crops.add((r_width, r_height, r_type)) - - for t_width, t_height, t_type in scales: - t_method = "scale" - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) - remote_thumbnails.append([ - server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len - ]) - - for t_width, t_height, t_type in crops: - if (t_width, t_height, t_type) in scales: - # If the aspect ratio of the cropped thumbnail matches a purely - # scaled one then there is no point in calculating a separate - # thumbnail. - continue - t_method = "crop" - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) - remote_thumbnails.append([ - server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len - ]) - - yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) - - for r in remote_thumbnails: - yield self.store.store_remote_media_thumbnail(*r) - - defer.returnValue({ - "width": m_width, - "height": m_height, - }) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 97f4e9b54b..510884262c 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .base_resource import parse_media_id, respond_with_file, respond_404 +from ._base import parse_media_id, respond_with_file, respond_404 from twisted.web.resource import Resource from synapse.http.server import request_handler diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index e8fe3302b2..d96bf9afe2 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .base_resource import MediaRepository from .upload_resource import UploadResource from .download_resource import DownloadResource from .thumbnail_resource import ThumbnailResource @@ -23,11 +22,395 @@ from .filepath import MediaFilePaths from twisted.web.resource import Resource +from .thumbnailer import Thumbnailer + +from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.util.stringutils import random_string + +from twisted.internet import defer, threads + +from synapse.util.async import ObservableDeferred +from synapse.util.stringutils import is_ascii +from synapse.util.logcontext import preserve_context_over_fn + +import os + +import cgi import logging +import urlparse logger = logging.getLogger(__name__) +class MediaRepository(object): + def __init__(self, hs, filepaths): + self.auth = hs.get_auth() + self.client = MatrixFederationHttpClient(hs) + self.clock = hs.get_clock() + self.server_name = hs.hostname + self.store = hs.get_datastore() + self.max_upload_size = hs.config.max_upload_size + self.max_image_pixels = hs.config.max_image_pixels + self.filepaths = filepaths + self.downloads = {} + self.dynamic_thumbnails = hs.config.dynamic_thumbnails + self.thumbnail_requirements = hs.config.thumbnail_requirements + + @staticmethod + def _makedirs(filepath): + dirname = os.path.dirname(filepath) + if not os.path.exists(dirname): + os.makedirs(dirname) + + @defer.inlineCallbacks + def create_content(self, media_type, upload_name, content, content_length, + auth_user): + media_id = random_string(24) + + fname = self.filepaths.local_media_filepath(media_id) + self._makedirs(fname) + + # This shouldn't block for very long because the content will have + # already been uploaded at this point. + with open(fname, "wb") as f: + f.write(content) + + yield self.store.store_local_media( + media_id=media_id, + media_type=media_type, + time_now_ms=self.clock.time_msec(), + upload_name=upload_name, + media_length=content_length, + user_id=auth_user, + ) + media_info = { + "media_type": media_type, + "media_length": content_length, + } + + yield self._generate_local_thumbnails(media_id, media_info) + + defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) + + def get_remote_media(self, server_name, media_id): + key = (server_name, media_id) + download = self.downloads.get(key) + if download is None: + download = self._get_remote_media_impl(server_name, media_id) + download = ObservableDeferred( + download, + consumeErrors=True + ) + self.downloads[key] = download + + @download.addBoth + def callback(media_info): + del self.downloads[key] + return media_info + return download.observe() + + @defer.inlineCallbacks + def _get_remote_media_impl(self, server_name, media_id): + media_info = yield self.store.get_cached_remote_media( + server_name, media_id + ) + if not media_info: + media_info = yield self._download_remote_file( + server_name, media_id + ) + defer.returnValue(media_info) + + @defer.inlineCallbacks + def _download_remote_file(self, server_name, media_id): + file_id = random_string(24) + + fname = self.filepaths.remote_media_filepath( + server_name, file_id + ) + self._makedirs(fname) + + try: + with open(fname, "wb") as f: + request_path = "/".join(( + "/_matrix/media/v1/download", server_name, media_id, + )) + length, headers = yield self.client.get_file( + server_name, request_path, output_stream=f, + max_size=self.max_upload_size, + ) + media_type = headers["Content-Type"][0] + time_now_ms = self.clock.time_msec() + + content_disposition = headers.get("Content-Disposition", None) + if content_disposition: + _, params = cgi.parse_header(content_disposition[0],) + upload_name = None + + # First check if there is a valid UTF-8 filename + upload_name_utf8 = params.get("filename*", None) + if upload_name_utf8: + if upload_name_utf8.lower().startswith("utf-8''"): + upload_name = upload_name_utf8[7:] + + # If there isn't check for an ascii name. + if not upload_name: + upload_name_ascii = params.get("filename", None) + if upload_name_ascii and is_ascii(upload_name_ascii): + upload_name = upload_name_ascii + + if upload_name: + upload_name = urlparse.unquote(upload_name) + try: + upload_name = upload_name.decode("utf-8") + except UnicodeDecodeError: + upload_name = None + else: + upload_name = None + + yield self.store.store_cached_remote_media( + origin=server_name, + media_id=media_id, + media_type=media_type, + time_now_ms=self.clock.time_msec(), + upload_name=upload_name, + media_length=length, + filesystem_id=file_id, + ) + except: + os.remove(fname) + raise + + media_info = { + "media_type": media_type, + "media_length": length, + "upload_name": upload_name, + "created_ts": time_now_ms, + "filesystem_id": file_id, + } + + yield self._generate_remote_thumbnails( + server_name, media_id, media_info + ) + + defer.returnValue(media_info) + + def _get_thumbnail_requirements(self, media_type): + return self.thumbnail_requirements.get(media_type, ()) + + def _generate_thumbnail(self, input_path, t_path, t_width, t_height, + t_method, t_type): + thumbnailer = Thumbnailer(input_path) + m_width = thumbnailer.width + m_height = thumbnailer.height + + if m_width * m_height >= self.max_image_pixels: + logger.info( + "Image too large to thumbnail %r x %r > %r", + m_width, m_height, self.max_image_pixels + ) + return + + if t_method == "crop": + t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) + elif t_method == "scale": + t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) + else: + t_len = None + + return t_len + + @defer.inlineCallbacks + def generate_local_exact_thumbnail(self, media_id, t_width, t_height, + t_method, t_type): + input_path = self.filepaths.local_media_filepath(media_id) + + t_path = self.filepaths.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + self._makedirs(t_path) + + t_len = yield preserve_context_over_fn( + threads.deferToThread, + self._generate_thumbnail, + input_path, t_path, t_width, t_height, t_method, t_type + ) + + if t_len: + yield self.store.store_local_thumbnail( + media_id, t_width, t_height, t_type, t_method, t_len + ) + + defer.returnValue(t_path) + + @defer.inlineCallbacks + def generate_remote_exact_thumbnail(self, server_name, file_id, media_id, + t_width, t_height, t_method, t_type): + input_path = self.filepaths.remote_media_filepath(server_name, file_id) + + t_path = self.filepaths.remote_media_thumbnail( + server_name, file_id, t_width, t_height, t_type, t_method + ) + self._makedirs(t_path) + + t_len = yield preserve_context_over_fn( + threads.deferToThread, + self._generate_thumbnail, + input_path, t_path, t_width, t_height, t_method, t_type + ) + + if t_len: + yield self.store.store_remote_media_thumbnail( + server_name, media_id, file_id, + t_width, t_height, t_type, t_method, t_len + ) + + defer.returnValue(t_path) + + @defer.inlineCallbacks + def _generate_local_thumbnails(self, media_id, media_info): + media_type = media_info["media_type"] + requirements = self._get_thumbnail_requirements(media_type) + if not requirements: + return + + input_path = self.filepaths.local_media_filepath(media_id) + thumbnailer = Thumbnailer(input_path) + m_width = thumbnailer.width + m_height = thumbnailer.height + + if m_width * m_height >= self.max_image_pixels: + logger.info( + "Image too large to thumbnail %r x %r > %r", + m_width, m_height, self.max_image_pixels + ) + return + + local_thumbnails = [] + + def generate_thumbnails(): + scales = set() + crops = set() + for r_width, r_height, r_method, r_type in requirements: + if r_method == "scale": + t_width, t_height = thumbnailer.aspect(r_width, r_height) + scales.add(( + min(m_width, t_width), min(m_height, t_height), r_type, + )) + elif r_method == "crop": + crops.add((r_width, r_height, r_type)) + + for t_width, t_height, t_type in scales: + t_method = "scale" + t_path = self.filepaths.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + self._makedirs(t_path) + t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) + + local_thumbnails.append(( + media_id, t_width, t_height, t_type, t_method, t_len + )) + + for t_width, t_height, t_type in crops: + if (t_width, t_height, t_type) in scales: + # If the aspect ratio of the cropped thumbnail matches a purely + # scaled one then there is no point in calculating a separate + # thumbnail. + continue + t_method = "crop" + t_path = self.filepaths.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + self._makedirs(t_path) + t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) + local_thumbnails.append(( + media_id, t_width, t_height, t_type, t_method, t_len + )) + + yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) + + for l in local_thumbnails: + yield self.store.store_local_thumbnail(*l) + + defer.returnValue({ + "width": m_width, + "height": m_height, + }) + + @defer.inlineCallbacks + def _generate_remote_thumbnails(self, server_name, media_id, media_info): + media_type = media_info["media_type"] + file_id = media_info["filesystem_id"] + requirements = self._get_thumbnail_requirements(media_type) + if not requirements: + return + + remote_thumbnails = [] + + input_path = self.filepaths.remote_media_filepath(server_name, file_id) + thumbnailer = Thumbnailer(input_path) + m_width = thumbnailer.width + m_height = thumbnailer.height + + def generate_thumbnails(): + if m_width * m_height >= self.max_image_pixels: + logger.info( + "Image too large to thumbnail %r x %r > %r", + m_width, m_height, self.max_image_pixels + ) + return + + scales = set() + crops = set() + for r_width, r_height, r_method, r_type in requirements: + if r_method == "scale": + t_width, t_height = thumbnailer.aspect(r_width, r_height) + scales.add(( + min(m_width, t_width), min(m_height, t_height), r_type, + )) + elif r_method == "crop": + crops.add((r_width, r_height, r_type)) + + for t_width, t_height, t_type in scales: + t_method = "scale" + t_path = self.filepaths.remote_media_thumbnail( + server_name, file_id, t_width, t_height, t_type, t_method + ) + self._makedirs(t_path) + t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) + remote_thumbnails.append([ + server_name, media_id, file_id, + t_width, t_height, t_type, t_method, t_len + ]) + + for t_width, t_height, t_type in crops: + if (t_width, t_height, t_type) in scales: + # If the aspect ratio of the cropped thumbnail matches a purely + # scaled one then there is no point in calculating a separate + # thumbnail. + continue + t_method = "crop" + t_path = self.filepaths.remote_media_thumbnail( + server_name, file_id, t_width, t_height, t_type, t_method + ) + self._makedirs(t_path) + t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) + remote_thumbnails.append([ + server_name, media_id, file_id, + t_width, t_height, t_type, t_method, t_len + ]) + + yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) + + for r in remote_thumbnails: + yield self.store.store_remote_media_thumbnail(*r) + + defer.returnValue({ + "width": m_width, + "height": m_height, + }) + + class MediaRepositoryResource(Resource): """File uploading and downloading. diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 43c568b769..234dd4261c 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -14,7 +14,7 @@ # limitations under the License. -from .base_resource import parse_media_id, respond_404, respond_with_file +from ._base import parse_media_id, respond_404, respond_with_file from twisted.web.resource import Resource from synapse.http.servlet import parse_string, parse_integer from synapse.http.server import request_handler From fb76a81ff7615da46c043a0ee1e8b980756efe00 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:45:05 +0100 Subject: [PATCH 11/22] Reorder imports --- synapse/rest/media/v1/preview_url_resource.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index fecdf8ed86..122b34faea 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -45,7 +45,15 @@ class PreviewUrlResource(Resource): def __init__(self, hs, media_repo): Resource.__init__(self) + + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.version_string = hs.version_string + self.filepaths = media_repo.filepaths + self.max_spider_size = hs.config.max_spider_size + self.server_name = hs.hostname self.client = SpiderHttpClient(hs) + if hasattr(hs.config, "url_preview_url_blacklist"): self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist @@ -60,13 +68,6 @@ class PreviewUrlResource(Resource): self.downloads = {} - self.auth = hs.get_auth() - self.clock = hs.get_clock() - self.version_string = hs.version_string - self.filepaths = media_repo.filepaths - self.max_spider_size = hs.config.max_spider_size - self.server_name = hs.hostname - def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET From 9181e2f4c78acba89644ac21eed5ce7c9fc872c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:48:24 +0100 Subject: [PATCH 12/22] Add store to PreviewUrlResource --- synapse/rest/media/v1/preview_url_resource.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 122b34faea..70087e959a 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -52,6 +52,7 @@ class PreviewUrlResource(Resource): self.filepaths = media_repo.filepaths self.max_spider_size = hs.config.max_spider_size self.server_name = hs.hostname + self.store = hs.get_datastore() self.client = SpiderHttpClient(hs) if hasattr(hs.config, "url_preview_url_blacklist"): From a7001c311b76fbdcefc00b753fa50b1bdd3dc4cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:49:31 +0100 Subject: [PATCH 13/22] _make_dirs was moved to MediaRepository --- synapse/rest/media/v1/preview_url_resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 70087e959a..3d93d928e4 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -370,7 +370,7 @@ class PreviewUrlResource(Resource): file_id = random_string(24) fname = self.filepaths.local_media_filepath(file_id) - self._makedirs(fname) + self.media_repo._makedirs(fname) try: with open(fname, "wb") as f: From e8884e5e9cad42445e14b9f119e2a4f69334f726 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 14:51:34 +0100 Subject: [PATCH 14/22] Add self.media_repo to PreviewUrlResource --- synapse/rest/media/v1/preview_url_resource.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 3d93d928e4..69327ac493 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -54,6 +54,7 @@ class PreviewUrlResource(Resource): self.server_name = hs.hostname self.store = hs.get_datastore() self.client = SpiderHttpClient(hs) + self.media_repo = media_repo if hasattr(hs.config, "url_preview_url_blacklist"): self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist From e99365f6015af6dc0c2c107f47bda3760ff1153e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 15:22:14 +0100 Subject: [PATCH 15/22] Replicate get_invited_rooms_for_user --- synapse/replication/slave/storage/events.py | 9 +++++++-- tests/replication/slave/storage/test_events.py | 12 ++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cfc728a038..82f171c257 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -68,6 +68,9 @@ class SlavedEventStore(BaseSlavedStore): _get_current_state_for_key = StateStore.__dict__[ "_get_current_state_for_key" ] + get_invited_rooms_for_user = RoomMemberStore.__dict__[ + "get_invited_rooms_for_user" + ] get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ @@ -82,6 +85,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + _set_before_and_after = DataStore._set_before_and_after _get_events = DataStore._get_events.__func__ @@ -147,11 +151,11 @@ class SlavedEventStore(BaseSlavedStore): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) - self._invalidate_caches_for_event( + self.invalidate_caches_for_event( event, backfilled, reset_state=position in state_resets ) - def _invalidate_caches_for_event(self, event, backfilled, reset_state): + def invalidate_caches_for_event(self, event, backfilled, reset_state): if reset_state: self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() @@ -182,6 +186,7 @@ class SlavedEventStore(BaseSlavedStore): # self._membership_stream_cache.entity_has_changed( # event.state_key, event.internal_metadata.stream_ordering # ) + self.get_invited_rooms_for_user.invalidate((event.state_key,)) if not event.is_state(): return diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index baa4a26eb5..88b8d08110 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -251,6 +251,18 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) yield self.check("get_event", [msg.event_id], redacted) + @defer.inlineCallbacks + def test_invites(self): + yield self.check("get_invited_rooms_for_user", [USER_ID_2], []) + event = yield self.persist( + type="m.room.member", key=USER_ID_2, membership="invite" + ) + yield self.replicate() + yield self.check("get_invited_rooms_for_user", [USER_ID_2], [RoomsForUser( + ROOM_ID, USER_ID, "invite", event.event_id, + event.internal_metadata.stream_ordering + )]) + event_id = 0 @defer.inlineCallbacks From f505575f69f90eb027acb093819c083ed49a8008 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 15:39:08 +0100 Subject: [PATCH 16/22] Make InsecureInterceptableContextFactory work with SpiderEndpoint --- synapse/http/client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index 6c89b20984..902ae7a203 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -462,5 +462,8 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory): self._context = SSL.Context(SSL.SSLv23_METHOD) self._context.set_verify(VERIFY_NONE, lambda *_: None) - def getContext(self, hostname, port): + def getContext(self, hostname=None, port=None): return self._context + + def creatorForNetloc(self, hostname, port): + return self From 5bbd424ee0c983d305014df618fa1917ecd10d91 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 17:11:44 +0100 Subject: [PATCH 17/22] Add a slaved receipts store --- synapse/replication/slave/storage/receipts.py | 61 +++++++++++++++++++ tests/replication/slave/storage/_base.py | 4 +- .../replication/slave/storage/test_events.py | 3 + .../slave/storage/test_receipts.py | 39 ++++++++++++ 4 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 synapse/replication/slave/storage/receipts.py create mode 100644 tests/replication/slave/storage/test_receipts.py diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py new file mode 100644 index 0000000000..b55d5dfd08 --- /dev/null +++ b/synapse/replication/slave/storage/receipts.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore +from synapse.storage.receipts import ReceiptsStore + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedReceiptsStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedReceiptsStore, self).__init__(db_conn, hs) + + self._receipts_id_gen = SlavedIdTracker( + db_conn, "receipts_linearized", "stream_id" + ) + + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + + get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + + def stream_positions(self): + result = super(SlavedReceiptsStore, self).stream_positions() + result["receipts"] = self._receipts_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("receipts") + if stream: + self._receipts_id_gen.advance(stream["position"]) + for row in stream["rows"]: + room_id, receipt_type, user_id = row[1:4] + self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + + return super(SlavedReceiptsStore, self).process_replication(result) + + def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): + self.get_receipts_for_user.invalidate((user_id, receipt_type)) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 983caafe8a..1f13cd0bc0 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,8 +15,6 @@ from twisted.internet import defer from tests import unittest -from synapse.replication.slave.storage.events import SlavedEventStore - from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver from synapse.replication.resource import ReplicationResource @@ -38,7 +36,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.replication = ReplicationResource(self.hs) self.master_store = self.hs.get_datastore() - self.slaved_store = SlavedEventStore(self.hs.get_db_conn(), self.hs) + self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.event_id = 0 @defer.inlineCallbacks diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index baa4a26eb5..66f166047e 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStoreTestCase from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext +from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser from twisted.internet import defer @@ -43,6 +44,8 @@ def patch__eq__(cls): class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): + STORE_TYPE = SlavedEventStore + def setUp(self): # Patch up the equality operator for events so that we can check # whether lists of events match using assertEquals diff --git a/tests/replication/slave/storage/test_receipts.py b/tests/replication/slave/storage/test_receipts.py new file mode 100644 index 0000000000..6624fe4eea --- /dev/null +++ b/tests/replication/slave/storage/test_receipts.py @@ -0,0 +1,39 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +ROOM_ID = "!room:blue" +EVENT_ID = "$event:blue" + + +class SlavedReceiptTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedReceiptsStore + + @defer.inlineCallbacks + def test_receipt(self): + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], {}) + yield self.master_store.insert_receipt( + ROOM_ID, "m.read", USER_ID, [EVENT_ID], {} + ) + yield self.replicate() + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], { + ROOM_ID: EVENT_ID + }) From 61c7edfd34abdb9eaa7c8d3dd3dbef95b60de5de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2016 17:22:03 +0100 Subject: [PATCH 18/22] Add cache to _get_state_groups_from_groups --- synapse/storage/state.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c5d2a3a6df..5b743db67a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -174,6 +174,12 @@ class StateStore(SQLBaseStore): return [r[0] for r in results] return self.runInteraction("get_current_state_for_key", f) + @cached(num_args=2, lru=True, max_entries=1000) + def _get_state_group_from_group(self, group, types): + raise NotImplementedError() + + @cachedList(cached_method_name="_get_state_group_from_group", + list_name="groups", num_args=2, inlineCallbacks=True) def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ @@ -201,18 +207,23 @@ class StateStore(SQLBaseStore): txn.execute(sql, args) rows = self.cursor_to_dict(txn) - results = {} + results = {group: {} for group in groups} for row in rows: key = (row["type"], row["state_key"]) - results.setdefault(row["state_group"], {})[key] = row["event_id"] + results[row["state_group"]][key] = row["event_id"] return results + results = {} + chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] for chunk in chunks: - return self.runInteraction( + res = yield self.runInteraction( "_get_state_groups_from_groups", f, chunk ) + results.update(res) + + defer.returnValue(results) @defer.inlineCallbacks def get_state_for_events(self, event_ids, types): @@ -359,6 +370,8 @@ class StateStore(SQLBaseStore): a `state_key` of None matches all state_keys. If `types` is None then all events are returned. """ + if types: + types = frozenset(types) results = {} missing_groups = [] if types is not None: From 4cf4320593f9d3448e28819e094b42eadab6967d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Apr 2016 11:06:02 +0100 Subject: [PATCH 19/22] Add some logging to state resolve_events --- synapse/state.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/state.py b/synapse/state.py index 58211f5feb..cca9167e5b 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -230,6 +230,8 @@ class StateHandler(object): (cache.state_group, state, prev_states) ) + logger.info("Resolving state for %s with %d groups", room_id, len(state_groups)) + new_state, prev_states = self._resolve_events( state_groups.values(), event_type, state_key ) @@ -246,6 +248,9 @@ class StateHandler(object): defer.returnValue((None, new_state, prev_states)) def resolve_events(self, state_sets, event): + logger.info( + "Resolving state for %s with %d groups", event.room_id, len(state_sets) + ) if event.is_state(): return self._resolve_events( state_sets, event.type, event.state_key From 5bbc3215887a653796a09178fbb69c38e241259d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Apr 2016 11:39:54 +0100 Subject: [PATCH 20/22] Always use state cache entry if it exists Also check if the resolved state matches an existing state group. --- synapse/state.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index cca9167e5b..d0f76dc4f5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -214,7 +214,7 @@ class StateHandler(object): if self._state_cache is not None: cache = self._state_cache.get(group_names, None) - if cache and cache.state_group: + if cache: cache.ts = self.clock.time_msec() event_dict = yield self.store.get_events(cache.state.values()) @@ -236,16 +236,23 @@ class StateHandler(object): state_groups.values(), event_type, state_key ) + state_group = None + new_state_event_ids = frozenset(e.event_id for e in new_state.values()) + for sg, events in state_groups.items(): + if new_state_event_ids == frozenset(e.event_id for e in events): + state_group = sg + break + if self._state_cache is not None: cache = _StateCacheEntry( state={key: event.event_id for key, event in new_state.items()}, - state_group=None, + state_group=state_group, ts=self.clock.time_msec() ) self._state_cache[group_names] = cache - defer.returnValue((None, new_state, prev_states)) + defer.returnValue((state_group, new_state, prev_states)) def resolve_events(self, state_sets, event): logger.info( From 59698906eb6495f15339f39d59d2ff26bef0ab37 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Apr 2016 15:36:13 +0100 Subject: [PATCH 21/22] Make jenkins install lxml --- jenkins-postgres.sh | 1 + jenkins-sqlite.sh | 1 + 2 files changed, 2 insertions(+) diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index 9ac86d2593..9a78480dd8 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -26,6 +26,7 @@ tox --notest -e py27 TOX_BIN=$WORKSPACE/.tox/py27/bin $TOX_BIN/pip install psycopg2 +$TOX_BIN/pip install lxml : ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index 345d01936c..5500f8d8d6 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -24,6 +24,7 @@ rm .coverage* || echo "No coverage files to remove" tox --notest -e py27 TOX_BIN=$WORKSPACE/.tox/py27/bin +$TOX_BIN/pip install lxml : ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} From 02a27a6c4fd7f561426e17e04fec1931c4392d73 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 16:00:28 +0100 Subject: [PATCH 22/22] pip install new python dependencies in jenkins.sh --- jenkins-postgres.sh | 1 + jenkins-sqlite.sh | 1 + jenkins.sh | 86 --------------------------------------------- 3 files changed, 2 insertions(+), 86 deletions(-) delete mode 100755 jenkins.sh diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index 9a78480dd8..ae6b111591 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -25,6 +25,7 @@ rm .coverage* || echo "No coverage files to remove" tox --notest -e py27 TOX_BIN=$WORKSPACE/.tox/py27/bin +python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install $TOX_BIN/pip install psycopg2 $TOX_BIN/pip install lxml diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index 5500f8d8d6..9398d9db15 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -24,6 +24,7 @@ rm .coverage* || echo "No coverage files to remove" tox --notest -e py27 TOX_BIN=$WORKSPACE/.tox/py27/bin +python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install $TOX_BIN/pip install lxml : ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} diff --git a/jenkins.sh b/jenkins.sh deleted file mode 100755 index b826d510c9..0000000000 --- a/jenkins.sh +++ /dev/null @@ -1,86 +0,0 @@ -#!/bin/bash - -set -eux - -: ${WORKSPACE:="$(pwd)"} - -export PYTHONDONTWRITEBYTECODE=yep -export SYNAPSE_CACHE_FACTOR=1 - -# Output test results as junit xml -export TRIAL_FLAGS="--reporter=subunit" -export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" -# Write coverage reports to a separate file for each process -export COVERAGE_OPTS="-p" -export DUMP_COVERAGE_COMMAND="coverage help" - -# Output flake8 violations to violations.flake8.log -# Don't exit with non-0 status code on Jenkins, -# so that the build steps continue and a later step can decided whether to -# UNSTABLE or FAILURE this build. -export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" - -rm .coverage* || echo "No coverage files to remove" - -tox - -: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} - -TOX_BIN=$WORKSPACE/.tox/py27/bin - -if [[ ! -e .sytest-base ]]; then - git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror -else - (cd .sytest-base; git fetch -p) -fi - -rm -rf sytest -git clone .sytest-base sytest --shared -cd sytest - -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) - -: ${PERL5LIB:=$WORKSPACE/perl5/lib/perl5} -: ${PERL_MB_OPT:=--install_base=$WORKSPACE/perl5} -: ${PERL_MM_OPT:=INSTALL_BASE=$WORKSPACE/perl5} -export PERL5LIB PERL_MB_OPT PERL_MM_OPT - -./install-deps.pl - -: ${PORT_BASE:=8000} - -echo >&2 "Running sytest with SQLite3"; -./run-tests.pl --coverage -O tap --synapse-directory $WORKSPACE \ - --python $TOX_BIN/python --all --port-base $PORT_BASE > results-sqlite3.tap - -RUN_POSTGRES="" - -for port in $(($PORT_BASE + 1)) $(($PORT_BASE + 2)); do - if psql synapse_jenkins_$port <<< ""; then - RUN_POSTGRES="$RUN_POSTGRES:$port" - cat > localhost-$port/database.yaml << EOF -name: psycopg2 -args: - database: synapse_jenkins_$port -EOF - fi -done - -# Run if both postgresql databases exist -if test "$RUN_POSTGRES" = ":$(($PORT_BASE + 1)):$(($PORT_BASE + 2))"; then - echo >&2 "Running sytest with PostgreSQL"; - $TOX_BIN/pip install psycopg2 - ./run-tests.pl --coverage -O tap --synapse-directory $WORKSPACE \ - --python $TOX_BIN/python --all --port-base $PORT_BASE > results-postgresql.tap -else - echo >&2 "Skipping running sytest with PostgreSQL, $RUN_POSTGRES" -fi - -cd .. -cp sytest/.coverage.* . - -# Combine the coverage reports -echo "Combining:" .coverage.* -$TOX_BIN/python -m coverage combine -# Output coverage to coverage.xml -$TOX_BIN/coverage xml -o coverage.xml