411 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			411 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
| # Copyright 2014-2016 OpenMarket Ltd
 | |
| # Copyright 2020-2021 The Matrix.org Foundation C.I.C.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import functools
 | |
| import os
 | |
| import re
 | |
| import string
 | |
| from typing import Any, Callable, List, TypeVar, Union, cast
 | |
| 
 | |
| NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
 | |
| 
 | |
| 
 | |
| F = TypeVar("F", bound=Callable[..., str])
 | |
| 
 | |
| 
 | |
| def _wrap_in_base_path(func: F) -> F:
 | |
|     """Takes a function that returns a relative path and turns it into an
 | |
|     absolute path based on the location of the primary media store
 | |
|     """
 | |
| 
 | |
|     @functools.wraps(func)
 | |
|     def _wrapped(self: "MediaFilePaths", *args: Any, **kwargs: Any) -> str:
 | |
|         path = func(self, *args, **kwargs)
 | |
|         return os.path.join(self.base_path, path)
 | |
| 
 | |
|     return cast(F, _wrapped)
 | |
| 
 | |
| 
 | |
| GetPathMethod = TypeVar(
 | |
|     "GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
 | |
| )
 | |
| 
 | |
| 
 | |
| def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
 | |
|     """Wraps a path-returning method to check that the returned path(s) do not escape
 | |
|     the media store directory.
 | |
| 
 | |
|     The path-returning method may return either a single path, or a list of paths.
 | |
| 
 | |
|     The check is not expected to ever fail, unless `func` is missing a call to
 | |
|     `_validate_path_component`, or `_validate_path_component` is buggy.
 | |
| 
 | |
|     Args:
 | |
|         relative: A boolean indicating whether the wrapped method returns paths relative
 | |
|             to the media store directory.
 | |
| 
 | |
|     Returns:
 | |
|         A method which will wrap a path-returning method, adding a check to ensure that
 | |
|         the returned path(s) lie within the media store directory. The check will raise
 | |
|         a `ValueError` if it fails.
 | |
|     """
 | |
| 
 | |
|     def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
 | |
|         @functools.wraps(func)
 | |
|         def _wrapped(
 | |
|             self: "MediaFilePaths", *args: Any, **kwargs: Any
 | |
|         ) -> Union[str, List[str]]:
 | |
|             path_or_paths = func(self, *args, **kwargs)
 | |
| 
 | |
|             if isinstance(path_or_paths, list):
 | |
|                 paths_to_check = path_or_paths
 | |
|             else:
 | |
|                 paths_to_check = [path_or_paths]
 | |
| 
 | |
|             for path in paths_to_check:
 | |
|                 # Construct the path that will ultimately be used.
 | |
|                 # We cannot guess whether `path` is relative to the media store
 | |
|                 # directory, since the media store directory may itself be a relative
 | |
|                 # path.
 | |
|                 if relative:
 | |
|                     path = os.path.join(self.base_path, path)
 | |
|                 normalized_path = os.path.normpath(path)
 | |
| 
 | |
|                 # Now that `normpath` has eliminated `../`s and `./`s from the path,
 | |
|                 # `os.path.commonpath` can be used to check whether it lies within the
 | |
|                 # media store directory.
 | |
|                 if (
 | |
|                     os.path.commonpath([normalized_path, self.normalized_base_path])
 | |
|                     != self.normalized_base_path
 | |
|                 ):
 | |
|                     # The path resolves to outside the media store directory,
 | |
|                     # or `self.base_path` is `.`, which is an unlikely configuration.
 | |
|                     raise ValueError(f"Invalid media store path: {path!r}")
 | |
| 
 | |
|                 # Note that `os.path.normpath`/`abspath` has a subtle caveat:
 | |
|                 # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
 | |
|                 # different path if `a/b/c` is a symlink. That is, the check above is
 | |
|                 # not perfect and may allow a certain restricted subset of untrustworthy
 | |
|                 # paths through. Since the check above is secondary to the main
 | |
|                 # `_validate_path_component` checks, it's less important for it to be
 | |
|                 # perfect.
 | |
|                 #
 | |
|                 # As an alternative, `os.path.realpath` will resolve symlinks, but
 | |
|                 # proves problematic if there are symlinks inside the media store.
 | |
|                 # eg. if `url_store/` is symlinked to elsewhere, its canonical path
 | |
|                 # won't match that of the main media store directory.
 | |
| 
 | |
|             return path_or_paths
 | |
| 
 | |
|         return cast(GetPathMethod, _wrapped)
 | |
| 
 | |
|     return _wrap_with_jail_check_inner
 | |
| 
 | |
| 
 | |
| ALLOWED_CHARACTERS = set(
 | |
|     string.ascii_letters
 | |
|     + string.digits
 | |
|     + "_-"
 | |
|     + ".[]:"  # Domain names, IPv6 addresses and ports in server names
 | |
| )
 | |
| FORBIDDEN_NAMES = {
 | |
|     "",
 | |
|     os.path.curdir,  # "." for the current platform
 | |
|     os.path.pardir,  # ".." for the current platform
 | |
| }
 | |
| 
 | |
| 
 | |
| def _validate_path_component(name: str) -> str:
 | |
|     """Checks that the given string can be safely used as a path component
 | |
| 
 | |
|     Args:
 | |
|         name: The path component to check.
 | |
| 
 | |
|     Returns:
 | |
|         The path component if valid.
 | |
| 
 | |
|     Raises:
 | |
|         ValueError: If `name` cannot be safely used as a path component.
 | |
|     """
 | |
|     if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
 | |
|         raise ValueError(f"Invalid path component: {name!r}")
 | |
| 
 | |
|     return name
 | |
| 
 | |
| 
 | |
| class MediaFilePaths:
 | |
|     """Describes where files are stored on disk.
 | |
| 
 | |
|     Most of the functions have a `*_rel` variant which returns a file path that
 | |
|     is relative to the base media store path. This is mainly used when we want
 | |
|     to write to the backup media store (when one is configured)
 | |
|     """
 | |
| 
 | |
|     def __init__(self, primary_base_path: str):
 | |
|         self.base_path = primary_base_path
 | |
|         self.normalized_base_path = os.path.normpath(self.base_path)
 | |
| 
 | |
|         # Refuse to initialize if paths cannot be validated correctly for the current
 | |
|         # platform.
 | |
|         assert os.path.sep not in ALLOWED_CHARACTERS
 | |
|         assert os.path.altsep not in ALLOWED_CHARACTERS
 | |
|         # On Windows, paths have all sorts of weirdness which `_validate_path_component`
 | |
|         # does not consider. In any case, the remote media store can't work correctly
 | |
|         # for certain homeservers there, since ":"s aren't allowed in paths.
 | |
|         assert os.name == "posix"
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def local_media_filepath_rel(self, media_id: str) -> str:
 | |
|         return os.path.join(
 | |
|             "local_content",
 | |
|             _validate_path_component(media_id[0:2]),
 | |
|             _validate_path_component(media_id[2:4]),
 | |
|             _validate_path_component(media_id[4:]),
 | |
|         )
 | |
| 
 | |
|     local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def local_media_thumbnail_rel(
 | |
|         self, media_id: str, width: int, height: int, content_type: str, method: str
 | |
|     ) -> str:
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
 | |
|         return os.path.join(
 | |
|             "local_thumbnails",
 | |
|             _validate_path_component(media_id[0:2]),
 | |
|             _validate_path_component(media_id[2:4]),
 | |
|             _validate_path_component(media_id[4:]),
 | |
|             _validate_path_component(file_name),
 | |
|         )
 | |
| 
 | |
|     local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=False)
 | |
|     def local_media_thumbnail_dir(self, media_id: str) -> str:
 | |
|         """
 | |
|         Retrieve the local store path of thumbnails of a given media_id
 | |
| 
 | |
|         Args:
 | |
|             media_id: The media ID to query.
 | |
|         Returns:
 | |
|             Path of local_thumbnails from media_id
 | |
|         """
 | |
|         return os.path.join(
 | |
|             self.base_path,
 | |
|             "local_thumbnails",
 | |
|             _validate_path_component(media_id[0:2]),
 | |
|             _validate_path_component(media_id[2:4]),
 | |
|             _validate_path_component(media_id[4:]),
 | |
|         )
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
 | |
|         return os.path.join(
 | |
|             "remote_content",
 | |
|             _validate_path_component(server_name),
 | |
|             _validate_path_component(file_id[0:2]),
 | |
|             _validate_path_component(file_id[2:4]),
 | |
|             _validate_path_component(file_id[4:]),
 | |
|         )
 | |
| 
 | |
|     remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def remote_media_thumbnail_rel(
 | |
|         self,
 | |
|         server_name: str,
 | |
|         file_id: str,
 | |
|         width: int,
 | |
|         height: int,
 | |
|         content_type: str,
 | |
|         method: str,
 | |
|     ) -> str:
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
 | |
|         return os.path.join(
 | |
|             "remote_thumbnail",
 | |
|             _validate_path_component(server_name),
 | |
|             _validate_path_component(file_id[0:2]),
 | |
|             _validate_path_component(file_id[2:4]),
 | |
|             _validate_path_component(file_id[4:]),
 | |
|             _validate_path_component(file_name),
 | |
|         )
 | |
| 
 | |
|     remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
 | |
| 
 | |
|     # Legacy path that was used to store thumbnails previously.
 | |
|     # Should be removed after some time, when most of the thumbnails are stored
 | |
|     # using the new path.
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def remote_media_thumbnail_rel_legacy(
 | |
|         self, server_name: str, file_id: str, width: int, height: int, content_type: str
 | |
|     ) -> str:
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
 | |
|         return os.path.join(
 | |
|             "remote_thumbnail",
 | |
|             _validate_path_component(server_name),
 | |
|             _validate_path_component(file_id[0:2]),
 | |
|             _validate_path_component(file_id[2:4]),
 | |
|             _validate_path_component(file_id[4:]),
 | |
|             _validate_path_component(file_name),
 | |
|         )
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=False)
 | |
|     def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
 | |
|         return os.path.join(
 | |
|             self.base_path,
 | |
|             "remote_thumbnail",
 | |
|             _validate_path_component(server_name),
 | |
|             _validate_path_component(file_id[0:2]),
 | |
|             _validate_path_component(file_id[2:4]),
 | |
|             _validate_path_component(file_id[4:]),
 | |
|         )
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def url_cache_filepath_rel(self, media_id: str) -> str:
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             # Media id is of the form <DATE><RANDOM_STRING>
 | |
|             # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
|             return os.path.join(
 | |
|                 "url_cache",
 | |
|                 _validate_path_component(media_id[:10]),
 | |
|                 _validate_path_component(media_id[11:]),
 | |
|             )
 | |
|         else:
 | |
|             return os.path.join(
 | |
|                 "url_cache",
 | |
|                 _validate_path_component(media_id[0:2]),
 | |
|                 _validate_path_component(media_id[2:4]),
 | |
|                 _validate_path_component(media_id[4:]),
 | |
|             )
 | |
| 
 | |
|     url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=False)
 | |
|     def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
 | |
|         "The dirs to try and remove if we delete the media_id file"
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return [
 | |
|                 os.path.join(
 | |
|                     self.base_path, "url_cache", _validate_path_component(media_id[:10])
 | |
|                 )
 | |
|             ]
 | |
|         else:
 | |
|             return [
 | |
|                 os.path.join(
 | |
|                     self.base_path,
 | |
|                     "url_cache",
 | |
|                     _validate_path_component(media_id[0:2]),
 | |
|                     _validate_path_component(media_id[2:4]),
 | |
|                 ),
 | |
|                 os.path.join(
 | |
|                     self.base_path, "url_cache", _validate_path_component(media_id[0:2])
 | |
|                 ),
 | |
|             ]
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def url_cache_thumbnail_rel(
 | |
|         self, media_id: str, width: int, height: int, content_type: str, method: str
 | |
|     ) -> str:
 | |
|         # Media id is of the form <DATE><RANDOM_STRING>
 | |
|         # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
| 
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
 | |
| 
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return os.path.join(
 | |
|                 "url_cache_thumbnails",
 | |
|                 _validate_path_component(media_id[:10]),
 | |
|                 _validate_path_component(media_id[11:]),
 | |
|                 _validate_path_component(file_name),
 | |
|             )
 | |
|         else:
 | |
|             return os.path.join(
 | |
|                 "url_cache_thumbnails",
 | |
|                 _validate_path_component(media_id[0:2]),
 | |
|                 _validate_path_component(media_id[2:4]),
 | |
|                 _validate_path_component(media_id[4:]),
 | |
|                 _validate_path_component(file_name),
 | |
|             )
 | |
| 
 | |
|     url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=True)
 | |
|     def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
 | |
|         # Media id is of the form <DATE><RANDOM_STRING>
 | |
|         # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
| 
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return os.path.join(
 | |
|                 "url_cache_thumbnails",
 | |
|                 _validate_path_component(media_id[:10]),
 | |
|                 _validate_path_component(media_id[11:]),
 | |
|             )
 | |
|         else:
 | |
|             return os.path.join(
 | |
|                 "url_cache_thumbnails",
 | |
|                 _validate_path_component(media_id[0:2]),
 | |
|                 _validate_path_component(media_id[2:4]),
 | |
|                 _validate_path_component(media_id[4:]),
 | |
|             )
 | |
| 
 | |
|     url_cache_thumbnail_directory = _wrap_in_base_path(
 | |
|         url_cache_thumbnail_directory_rel
 | |
|     )
 | |
| 
 | |
|     @_wrap_with_jail_check(relative=False)
 | |
|     def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
 | |
|         "The dirs to try and remove if we delete the media_id thumbnails"
 | |
|         # Media id is of the form <DATE><RANDOM_STRING>
 | |
|         # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return [
 | |
|                 os.path.join(
 | |
|                     self.base_path,
 | |
|                     "url_cache_thumbnails",
 | |
|                     _validate_path_component(media_id[:10]),
 | |
|                     _validate_path_component(media_id[11:]),
 | |
|                 ),
 | |
|                 os.path.join(
 | |
|                     self.base_path,
 | |
|                     "url_cache_thumbnails",
 | |
|                     _validate_path_component(media_id[:10]),
 | |
|                 ),
 | |
|             ]
 | |
|         else:
 | |
|             return [
 | |
|                 os.path.join(
 | |
|                     self.base_path,
 | |
|                     "url_cache_thumbnails",
 | |
|                     _validate_path_component(media_id[0:2]),
 | |
|                     _validate_path_component(media_id[2:4]),
 | |
|                     _validate_path_component(media_id[4:]),
 | |
|                 ),
 | |
|                 os.path.join(
 | |
|                     self.base_path,
 | |
|                     "url_cache_thumbnails",
 | |
|                     _validate_path_component(media_id[0:2]),
 | |
|                     _validate_path_component(media_id[2:4]),
 | |
|                 ),
 | |
|                 os.path.join(
 | |
|                     self.base_path,
 | |
|                     "url_cache_thumbnails",
 | |
|                     _validate_path_component(media_id[0:2]),
 | |
|                 ),
 | |
|             ]
 |