MatrixSynapse/synapse/rest/media/v1/media_repository.py

639 lines
23 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2016-01-07 05:26:29 +01:00
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, threads
import twisted.internet.error
import twisted.web.http
from twisted.web.resource import Resource
from .upload_resource import UploadResource
from .download_resource import DownloadResource
2014-12-10 16:46:18 +01:00
from .thumbnail_resource import ThumbnailResource
from .identicon_resource import IdenticonResource
from .preview_url_resource import PreviewUrlResource
from .filepath import MediaFilePaths
from .thumbnailer import Thumbnailer
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.util.stringutils import random_string
from synapse.api.errors import SynapseError, HttpResponseException, \
NotFoundError
2016-06-29 15:57:59 +02:00
from synapse.util.async import Linearizer
from synapse.util.stringutils import is_ascii
2017-10-13 11:24:19 +02:00
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.retryutils import NotRetryingDestination
import os
2016-06-29 15:57:59 +02:00
import errno
import shutil
import cgi
import logging
import urlparse
logger = logging.getLogger(__name__)
UPDATE_RECENTLY_ACCESSED_REMOTES_TS = 60 * 1000
class MediaRepository(object):
2016-06-29 15:57:59 +02:00
def __init__(self, hs):
self.auth = hs.get_auth()
self.client = MatrixFederationHttpClient(hs)
self.clock = hs.get_clock()
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.max_upload_size = hs.config.max_upload_size
self.max_image_pixels = hs.config.max_image_pixels
2017-10-12 18:31:24 +02:00
self.primary_base_path = hs.config.media_store_path
self.filepaths = MediaFilePaths(self.primary_base_path)
self.backup_base_path = None
if hs.config.backup_media_store_path:
2017-10-12 18:31:24 +02:00
self.backup_base_path = hs.config.backup_media_store_path
2017-10-12 16:28:24 +02:00
self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store
self.dynamic_thumbnails = hs.config.dynamic_thumbnails
self.thumbnail_requirements = hs.config.thumbnail_requirements
2017-01-09 18:17:10 +01:00
self.remote_media_linearizer = Linearizer(name="media_remote")
2016-06-29 15:57:59 +02:00
self.recently_accessed_remotes = set()
self.clock.looping_call(
self._update_recently_accessed_remotes,
UPDATE_RECENTLY_ACCESSED_REMOTES_TS
)
@defer.inlineCallbacks
def _update_recently_accessed_remotes(self):
media = self.recently_accessed_remotes
self.recently_accessed_remotes = set()
yield self.store.update_cached_last_access_time(
media, self.clock.time_msec()
)
@staticmethod
def _makedirs(filepath):
dirname = os.path.dirname(filepath)
if not os.path.exists(dirname):
os.makedirs(dirname)
2017-10-12 18:31:24 +02:00
@staticmethod
2017-10-12 18:57:31 +02:00
def _write_file_synchronously(source, fname, close_source=False):
2017-10-13 11:39:32 +02:00
"""Write `source` to the path `fname` synchronously. Should be called
from a thread.
Args:
source: A file like object to be written
fname (str): Path to write to
close_source (bool): Whether to close source after writing
"""
2017-10-13 11:25:01 +02:00
MediaRepository._makedirs(fname)
2017-10-12 18:31:24 +02:00
source.seek(0) # Ensure we read from the start of the file
with open(fname, "wb") as f:
shutil.copyfileobj(source, f)
2017-10-12 18:57:31 +02:00
if close_source:
source.close()
2017-10-12 18:37:21 +02:00
2017-10-12 18:31:24 +02:00
@defer.inlineCallbacks
def write_to_file(self, source, path):
"""Write `source` to the on disk media store, and also the backup store
if configured.
2017-10-12 18:37:21 +02:00
Will close source once finished.
2017-10-12 18:31:24 +02:00
Args:
source: A file like object that should be written
2017-10-13 11:39:32 +02:00
path(str): Relative path to write file to
2017-10-12 18:31:24 +02:00
Returns:
2017-10-13 11:39:32 +02:00
Deferred[str]: the file path written to in the primary media store
2017-10-12 18:31:24 +02:00
"""
fname = os.path.join(self.primary_base_path, path)
# Write to the main repository
2017-10-13 11:24:19 +02:00
yield make_deferred_yieldable(
2017-10-12 18:31:24 +02:00
threads.deferToThread,
2017-10-12 18:37:21 +02:00
self._write_file_synchronously, source, fname,
2017-10-12 18:31:24 +02:00
)
# Write to backup repository
2017-10-12 18:31:24 +02:00
yield self.copy_to_backup(source, path)
defer.returnValue(fname)
@defer.inlineCallbacks
def copy_to_backup(self, source, path):
2017-10-12 18:37:21 +02:00
"""Copy file like object source to the backup media store, if configured.
Will close source after its done.
2017-10-13 11:39:32 +02:00
Args:
source: A file like object that should be written
path(str): Relative path to write file to
2017-10-12 18:37:21 +02:00
"""
2017-10-12 18:31:24 +02:00
if self.backup_base_path:
backup_fname = os.path.join(self.backup_base_path, path)
# We can either wait for successful writing to the backup repository
# or write in the background and immediately return
2017-10-12 16:28:24 +02:00
if self.synchronous_backup_media_store:
2017-10-13 11:24:19 +02:00
yield make_deferred_yieldable(
2017-10-12 18:31:24 +02:00
threads.deferToThread,
2017-10-12 18:37:21 +02:00
self._write_file_synchronously, source, backup_fname,
2017-10-12 18:57:31 +02:00
close_source=True,
)
else:
2017-10-12 18:31:24 +02:00
preserve_fn(threads.deferToThread)(
2017-10-12 18:37:21 +02:00
self._write_file_synchronously, source, backup_fname,
2017-10-12 18:57:31 +02:00
close_source=True,
2017-10-12 18:31:24 +02:00
)
2017-10-12 18:37:21 +02:00
else:
source.close()
@defer.inlineCallbacks
def create_content(self, media_type, upload_name, content, content_length,
auth_user):
2017-10-13 11:39:32 +02:00
"""Store uploaded content for a local user and return the mxc URL
Args:
media_type(str): The content type of the file
upload_name(str): The name of the file
content: A file like object that is the content to store
content_length(int): The length of the content
auth_user(str): The user_id of the uploader
Returns:
Deferred[str]: The mxc url of the stored content
"""
media_id = random_string(24)
2017-10-12 18:31:24 +02:00
fname = yield self.write_to_file(
content, self.filepaths.local_media_filepath_rel(media_id)
)
logger.info("Stored local media in file %r", fname)
yield self.store.store_local_media(
media_id=media_id,
media_type=media_type,
time_now_ms=self.clock.time_msec(),
upload_name=upload_name,
media_length=content_length,
user_id=auth_user,
)
media_info = {
"media_type": media_type,
"media_length": content_length,
}
yield self._generate_local_thumbnails(media_id, media_info)
defer.returnValue("mxc://%s/%s" % (self.server_name, media_id))
2016-06-29 15:57:59 +02:00
@defer.inlineCallbacks
def get_remote_media(self, server_name, media_id):
key = (server_name, media_id)
2016-06-29 15:57:59 +02:00
with (yield self.remote_media_linearizer.queue(key)):
media_info = yield self._get_remote_media_impl(server_name, media_id)
defer.returnValue(media_info)
@defer.inlineCallbacks
def _get_remote_media_impl(self, server_name, media_id):
media_info = yield self.store.get_cached_remote_media(
server_name, media_id
)
if not media_info:
media_info = yield self._download_remote_file(
server_name, media_id
)
2017-06-19 18:39:21 +02:00
elif media_info["quarantined_by"]:
raise NotFoundError()
else:
self.recently_accessed_remotes.add((server_name, media_id))
yield self.store.update_cached_last_access_time(
[(server_name, media_id)], self.clock.time_msec()
)
defer.returnValue(media_info)
@defer.inlineCallbacks
def _download_remote_file(self, server_name, media_id):
file_id = random_string(24)
2017-10-12 18:31:24 +02:00
fpath = self.filepaths.remote_media_filepath_rel(
server_name, file_id
)
2017-10-12 18:31:24 +02:00
fname = os.path.join(self.primary_base_path, fpath)
self._makedirs(fname)
try:
with open(fname, "wb") as f:
request_path = "/".join((
"/_matrix/media/v1/download", server_name, media_id,
))
try:
length, headers = yield self.client.get_file(
server_name, request_path, output_stream=f,
max_size=self.max_upload_size, args={
# tell the remote server to 404 if it doesn't
# recognise the server_name, to make sure we don't
# end up with a routing loop.
"allow_remote": "false",
}
)
except twisted.internet.error.DNSLookupError as e:
logger.warn("HTTP error fetching remote media %s/%s: %r",
server_name, media_id, e)
raise NotFoundError()
except HttpResponseException as e:
logger.warn("HTTP error fetching remote media %s/%s: %s",
server_name, media_id, e.response)
if e.code == twisted.web.http.NOT_FOUND:
raise SynapseError.from_http_response_exception(e)
raise SynapseError(502, "Failed to fetch remote media")
except SynapseError:
logger.exception("Failed to fetch remote media %s/%s",
server_name, media_id)
raise
except NotRetryingDestination:
logger.warn("Not retrying destination %r", server_name)
raise SynapseError(502, "Failed to fetch remote media")
except Exception:
logger.exception("Failed to fetch remote media %s/%s",
server_name, media_id)
raise SynapseError(502, "Failed to fetch remote media")
2017-10-12 18:52:30 +02:00
# Will close the file after its done
2017-10-12 18:37:21 +02:00
yield self.copy_to_backup(open(fname), fpath)
2017-10-12 18:31:24 +02:00
media_type = headers["Content-Type"][0]
time_now_ms = self.clock.time_msec()
content_disposition = headers.get("Content-Disposition", None)
if content_disposition:
_, params = cgi.parse_header(content_disposition[0],)
upload_name = None
# First check if there is a valid UTF-8 filename
upload_name_utf8 = params.get("filename*", None)
if upload_name_utf8:
if upload_name_utf8.lower().startswith("utf-8''"):
upload_name = upload_name_utf8[7:]
# If there isn't check for an ascii name.
if not upload_name:
upload_name_ascii = params.get("filename", None)
if upload_name_ascii and is_ascii(upload_name_ascii):
upload_name = upload_name_ascii
if upload_name:
upload_name = urlparse.unquote(upload_name)
try:
upload_name = upload_name.decode("utf-8")
except UnicodeDecodeError:
upload_name = None
else:
upload_name = None
logger.info("Stored remote media in file %r", fname)
yield self.store.store_cached_remote_media(
origin=server_name,
media_id=media_id,
media_type=media_type,
time_now_ms=self.clock.time_msec(),
upload_name=upload_name,
media_length=length,
filesystem_id=file_id,
)
except:
os.remove(fname)
raise
media_info = {
"media_type": media_type,
"media_length": length,
"upload_name": upload_name,
"created_ts": time_now_ms,
"filesystem_id": file_id,
}
yield self._generate_remote_thumbnails(
server_name, media_id, media_info
)
defer.returnValue(media_info)
def _get_thumbnail_requirements(self, media_type):
return self.thumbnail_requirements.get(media_type, ())
def _generate_thumbnail(self, thumbnailer, t_width, t_height,
t_method, t_type):
m_width = thumbnailer.width
m_height = thumbnailer.height
if m_width * m_height >= self.max_image_pixels:
logger.info(
"Image too large to thumbnail %r x %r > %r",
m_width, m_height, self.max_image_pixels
)
return
if t_method == "crop":
t_byte_source = thumbnailer.crop(t_width, t_height, t_type)
elif t_method == "scale":
2017-02-24 22:42:38 +01:00
t_width, t_height = thumbnailer.aspect(t_width, t_height)
t_width = min(m_width, t_width)
t_height = min(m_height, t_height)
t_byte_source = thumbnailer.scale(t_width, t_height, t_type)
else:
t_byte_source = None
return t_byte_source
@defer.inlineCallbacks
def generate_local_exact_thumbnail(self, media_id, t_width, t_height,
t_method, t_type):
input_path = self.filepaths.local_media_filepath(media_id)
thumbnailer = Thumbnailer(input_path)
2017-10-13 11:24:19 +02:00
t_byte_source = yield make_deferred_yieldable(
threads.deferToThread,
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
)
if t_byte_source:
2017-10-12 18:31:24 +02:00
output_path = yield self.write_to_file(
2017-10-12 16:28:24 +02:00
t_byte_source,
2017-10-12 18:31:24 +02:00
self.filepaths.local_media_thumbnail_rel(
media_id, t_width, t_height, t_type, t_method
)
)
logger.info("Stored thumbnail in file %r", output_path)
2017-10-12 18:52:30 +02:00
t_len = os.path.getsize(output_path)
2017-10-13 11:39:59 +02:00
yield self.store.store_local_thumbnail(
2017-10-12 18:39:23 +02:00
media_id, t_width, t_height, t_type, t_method, t_len
)
2017-10-12 16:28:24 +02:00
defer.returnValue(output_path)
@defer.inlineCallbacks
def generate_remote_exact_thumbnail(self, server_name, file_id, media_id,
t_width, t_height, t_method, t_type):
input_path = self.filepaths.remote_media_filepath(server_name, file_id)
thumbnailer = Thumbnailer(input_path)
2017-10-13 11:24:19 +02:00
t_byte_source = yield make_deferred_yieldable(
threads.deferToThread,
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
)
if t_byte_source:
2017-10-12 18:31:24 +02:00
output_path = yield self.write_to_file(
2017-10-12 16:28:24 +02:00
t_byte_source,
2017-10-12 18:31:24 +02:00
self.filepaths.remote_media_thumbnail_rel(
server_name, file_id, t_width, t_height, t_type, t_method
)
)
logger.info("Stored thumbnail in file %r", output_path)
2017-10-12 18:52:30 +02:00
t_len = os.path.getsize(output_path)
yield self.store.store_remote_media_thumbnail(
server_name, media_id, file_id,
2017-10-12 18:39:23 +02:00
t_width, t_height, t_type, t_method, t_len
)
2017-10-12 16:28:24 +02:00
defer.returnValue(output_path)
@defer.inlineCallbacks
def _generate_local_thumbnails(self, media_id, media_info, url_cache=False):
media_type = media_info["media_type"]
requirements = self._get_thumbnail_requirements(media_type)
if not requirements:
return
if url_cache:
input_path = self.filepaths.url_cache_filepath(media_id)
else:
input_path = self.filepaths.local_media_filepath(media_id)
thumbnailer = Thumbnailer(input_path)
m_width = thumbnailer.width
m_height = thumbnailer.height
if m_width * m_height >= self.max_image_pixels:
logger.info(
"Image too large to thumbnail %r x %r > %r",
m_width, m_height, self.max_image_pixels
)
return
local_thumbnails = []
def generate_thumbnails():
for r_width, r_height, r_method, r_type in requirements:
t_byte_source = self._generate_thumbnail(
thumbnailer, r_width, r_height, r_method, r_type,
)
local_thumbnails.append((
r_width, r_height, r_method, r_type, t_byte_source
))
2017-10-13 11:24:19 +02:00
yield make_deferred_yieldable(threads.deferToThread, generate_thumbnails)
for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails:
2017-10-12 18:31:24 +02:00
if url_cache:
file_path = self.filepaths.url_cache_thumbnail_rel(
media_id, t_width, t_height, t_type, t_method
)
else:
file_path = self.filepaths.local_media_thumbnail_rel(
media_id, t_width, t_height, t_type, t_method
)
2017-10-12 18:52:30 +02:00
output_path = yield self.write_to_file(t_byte_source, file_path)
t_len = os.path.getsize(output_path)
yield self.store.store_local_thumbnail(
2017-10-12 18:39:23 +02:00
media_id, t_width, t_height, t_type, t_method, t_len
)
defer.returnValue({
"width": m_width,
"height": m_height,
})
@defer.inlineCallbacks
def _generate_remote_thumbnails(self, server_name, media_id, media_info):
media_type = media_info["media_type"]
file_id = media_info["filesystem_id"]
requirements = self._get_thumbnail_requirements(media_type)
if not requirements:
return
remote_thumbnails = []
input_path = self.filepaths.remote_media_filepath(server_name, file_id)
thumbnailer = Thumbnailer(input_path)
m_width = thumbnailer.width
m_height = thumbnailer.height
def generate_thumbnails():
if m_width * m_height >= self.max_image_pixels:
logger.info(
"Image too large to thumbnail %r x %r > %r",
m_width, m_height, self.max_image_pixels
)
return
for r_width, r_height, r_method, r_type in requirements:
t_byte_source = self._generate_thumbnail(
thumbnailer, r_width, r_height, r_method, r_type,
)
remote_thumbnails.append((
r_width, r_height, r_method, r_type, t_byte_source
))
2017-10-13 11:24:19 +02:00
yield make_deferred_yieldable(threads.deferToThread, generate_thumbnails)
2017-10-12 16:28:24 +02:00
for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails:
2017-10-12 18:31:24 +02:00
file_path = self.filepaths.remote_media_thumbnail_rel(
server_name, file_id, t_width, t_height, t_type, t_method
)
2017-10-12 18:52:30 +02:00
output_path = yield self.write_to_file(t_byte_source, file_path)
t_len = os.path.getsize(output_path)
yield self.store.store_remote_media_thumbnail(
server_name, media_id, file_id,
2017-10-12 18:39:23 +02:00
t_width, t_height, t_type, t_method, t_len
)
defer.returnValue({
"width": m_width,
"height": m_height,
})
2016-06-29 15:57:59 +02:00
@defer.inlineCallbacks
def delete_old_remote_media(self, before_ts):
old_media = yield self.store.get_remote_media_before(before_ts)
deleted = 0
for media in old_media:
origin = media["media_origin"]
media_id = media["media_id"]
file_id = media["filesystem_id"]
key = (origin, media_id)
logger.info("Deleting: %r", key)
2017-10-12 18:31:24 +02:00
# TODO: Should we delete from the backup store
2016-06-29 15:57:59 +02:00
with (yield self.remote_media_linearizer.queue(key)):
full_path = self.filepaths.remote_media_filepath(origin, file_id)
try:
os.remove(full_path)
except OSError as e:
logger.warn("Failed to remove file: %r", full_path)
if e.errno == errno.ENOENT:
pass
else:
continue
thumbnail_dir = self.filepaths.remote_media_thumbnail_dir(
origin, file_id
)
shutil.rmtree(thumbnail_dir, ignore_errors=True)
yield self.store.delete_remote_media(origin, media_id)
deleted += 1
defer.returnValue({"deleted": deleted})
class MediaRepositoryResource(Resource):
"""File uploading and downloading.
Uploads are POSTed to a resource which returns a token which is used to GET
the download::
=> POST /_matrix/media/v1/upload HTTP/1.1
Content-Type: <media-type>
2015-02-07 13:56:21 +01:00
Content-Length: <content-length>
<media>
<= HTTP/1.1 200 OK
Content-Type: application/json
2014-12-15 17:57:53 +01:00
{ "content_uri": "mxc://<server-name>/<media-id>" }
2014-12-15 14:56:43 +01:00
=> GET /_matrix/media/v1/download/<server-name>/<media-id> HTTP/1.1
<= HTTP/1.1 200 OK
Content-Type: <media-type>
Content-Disposition: attachment;filename=<upload-filename>
<media>
2014-12-11 11:41:43 +01:00
Clients can get thumbnails by supplying a desired width and height and
thumbnailing method::
=> GET /_matrix/media/v1/thumbnail/<server_name>
/<media-id>?width=<w>&height=<h>&method=<m> HTTP/1.1
<= HTTP/1.1 200 OK
Content-Type: image/jpeg or image/png
<thumbnail>
2014-12-11 11:41:43 +01:00
The thumbnail methods are "crop" and "scale". "scale" trys to return an
image where either the width or the height is smaller than the requested
size. The client should then scale and letterbox the image if it needs to
fit within a given rectangle. "crop" trys to return an image where the
width and height are close to the requested size and the aspect matches
the requested size. The client should scale the image if it needs to fit
within a given rectangle.
"""
def __init__(self, hs):
Resource.__init__(self)
2016-06-29 15:57:59 +02:00
media_repo = hs.get_media_repository()
self.putChild("upload", UploadResource(hs, media_repo))
self.putChild("download", DownloadResource(hs, media_repo))
self.putChild("thumbnail", ThumbnailResource(hs, media_repo))
self.putChild("identicon", IdenticonResource())
if hs.config.url_preview_enabled:
self.putChild("preview_url", PreviewUrlResource(hs, media_repo))