241 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
| # -*- coding: utf-8 -*-
 | |
| # Copyright 2014-2016 OpenMarket Ltd
 | |
| # Copyright 2020-2021 The Matrix.org Foundation C.I.C.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import functools
 | |
| import os
 | |
| import re
 | |
| from typing import Callable, List
 | |
| 
 | |
| NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
 | |
| 
 | |
| 
 | |
| def _wrap_in_base_path(func: "Callable[..., str]") -> "Callable[..., str]":
 | |
|     """Takes a function that returns a relative path and turns it into an
 | |
|     absolute path based on the location of the primary media store
 | |
|     """
 | |
| 
 | |
|     @functools.wraps(func)
 | |
|     def _wrapped(self, *args, **kwargs):
 | |
|         path = func(self, *args, **kwargs)
 | |
|         return os.path.join(self.base_path, path)
 | |
| 
 | |
|     return _wrapped
 | |
| 
 | |
| 
 | |
| class MediaFilePaths:
 | |
|     """Describes where files are stored on disk.
 | |
| 
 | |
|     Most of the functions have a `*_rel` variant which returns a file path that
 | |
|     is relative to the base media store path. This is mainly used when we want
 | |
|     to write to the backup media store (when one is configured)
 | |
|     """
 | |
| 
 | |
|     def __init__(self, primary_base_path: str):
 | |
|         self.base_path = primary_base_path
 | |
| 
 | |
|     def default_thumbnail_rel(
 | |
|         self,
 | |
|         default_top_level: str,
 | |
|         default_sub_type: str,
 | |
|         width: int,
 | |
|         height: int,
 | |
|         content_type: str,
 | |
|         method: str,
 | |
|     ) -> str:
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
 | |
|         return os.path.join(
 | |
|             "default_thumbnails", default_top_level, default_sub_type, file_name
 | |
|         )
 | |
| 
 | |
|     default_thumbnail = _wrap_in_base_path(default_thumbnail_rel)
 | |
| 
 | |
|     def local_media_filepath_rel(self, media_id: str) -> str:
 | |
|         return os.path.join("local_content", media_id[0:2], media_id[2:4], media_id[4:])
 | |
| 
 | |
|     local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
 | |
| 
 | |
|     def local_media_thumbnail_rel(
 | |
|         self, media_id: str, width: int, height: int, content_type: str, method: str
 | |
|     ) -> str:
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
 | |
|         return os.path.join(
 | |
|             "local_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name
 | |
|         )
 | |
| 
 | |
|     local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
 | |
| 
 | |
|     def local_media_thumbnail_dir(self, media_id: str) -> str:
 | |
|         """
 | |
|         Retrieve the local store path of thumbnails of a given media_id
 | |
| 
 | |
|         Args:
 | |
|             media_id: The media ID to query.
 | |
|         Returns:
 | |
|             Path of local_thumbnails from media_id
 | |
|         """
 | |
|         return os.path.join(
 | |
|             self.base_path,
 | |
|             "local_thumbnails",
 | |
|             media_id[0:2],
 | |
|             media_id[2:4],
 | |
|             media_id[4:],
 | |
|         )
 | |
| 
 | |
|     def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
 | |
|         return os.path.join(
 | |
|             "remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:]
 | |
|         )
 | |
| 
 | |
|     remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
 | |
| 
 | |
|     def remote_media_thumbnail_rel(
 | |
|         self,
 | |
|         server_name: str,
 | |
|         file_id: str,
 | |
|         width: int,
 | |
|         height: int,
 | |
|         content_type: str,
 | |
|         method: str,
 | |
|     ) -> str:
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
 | |
|         return os.path.join(
 | |
|             "remote_thumbnail",
 | |
|             server_name,
 | |
|             file_id[0:2],
 | |
|             file_id[2:4],
 | |
|             file_id[4:],
 | |
|             file_name,
 | |
|         )
 | |
| 
 | |
|     remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
 | |
| 
 | |
|     # Legacy path that was used to store thumbnails previously.
 | |
|     # Should be removed after some time, when most of the thumbnails are stored
 | |
|     # using the new path.
 | |
|     def remote_media_thumbnail_rel_legacy(
 | |
|         self, server_name: str, file_id: str, width: int, height: int, content_type: str
 | |
|     ):
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
 | |
|         return os.path.join(
 | |
|             "remote_thumbnail",
 | |
|             server_name,
 | |
|             file_id[0:2],
 | |
|             file_id[2:4],
 | |
|             file_id[4:],
 | |
|             file_name,
 | |
|         )
 | |
| 
 | |
|     def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
 | |
|         return os.path.join(
 | |
|             self.base_path,
 | |
|             "remote_thumbnail",
 | |
|             server_name,
 | |
|             file_id[0:2],
 | |
|             file_id[2:4],
 | |
|             file_id[4:],
 | |
|         )
 | |
| 
 | |
|     def url_cache_filepath_rel(self, media_id: str) -> str:
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             # Media id is of the form <DATE><RANDOM_STRING>
 | |
|             # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
|             return os.path.join("url_cache", media_id[:10], media_id[11:])
 | |
|         else:
 | |
|             return os.path.join("url_cache", media_id[0:2], media_id[2:4], media_id[4:])
 | |
| 
 | |
|     url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
 | |
| 
 | |
|     def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
 | |
|         "The dirs to try and remove if we delete the media_id file"
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return [os.path.join(self.base_path, "url_cache", media_id[:10])]
 | |
|         else:
 | |
|             return [
 | |
|                 os.path.join(self.base_path, "url_cache", media_id[0:2], media_id[2:4]),
 | |
|                 os.path.join(self.base_path, "url_cache", media_id[0:2]),
 | |
|             ]
 | |
| 
 | |
|     def url_cache_thumbnail_rel(
 | |
|         self, media_id: str, width: int, height: int, content_type: str, method: str
 | |
|     ) -> str:
 | |
|         # Media id is of the form <DATE><RANDOM_STRING>
 | |
|         # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
| 
 | |
|         top_level_type, sub_type = content_type.split("/")
 | |
|         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
 | |
| 
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return os.path.join(
 | |
|                 "url_cache_thumbnails", media_id[:10], media_id[11:], file_name
 | |
|             )
 | |
|         else:
 | |
|             return os.path.join(
 | |
|                 "url_cache_thumbnails",
 | |
|                 media_id[0:2],
 | |
|                 media_id[2:4],
 | |
|                 media_id[4:],
 | |
|                 file_name,
 | |
|             )
 | |
| 
 | |
|     url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
 | |
| 
 | |
|     def url_cache_thumbnail_directory(self, media_id: str) -> str:
 | |
|         # Media id is of the form <DATE><RANDOM_STRING>
 | |
|         # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
| 
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return os.path.join(
 | |
|                 self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:]
 | |
|             )
 | |
|         else:
 | |
|             return os.path.join(
 | |
|                 self.base_path,
 | |
|                 "url_cache_thumbnails",
 | |
|                 media_id[0:2],
 | |
|                 media_id[2:4],
 | |
|                 media_id[4:],
 | |
|             )
 | |
| 
 | |
|     def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
 | |
|         "The dirs to try and remove if we delete the media_id thumbnails"
 | |
|         # Media id is of the form <DATE><RANDOM_STRING>
 | |
|         # E.g.: 2017-09-28-fsdRDt24DS234dsf
 | |
|         if NEW_FORMAT_ID_RE.match(media_id):
 | |
|             return [
 | |
|                 os.path.join(
 | |
|                     self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:]
 | |
|                 ),
 | |
|                 os.path.join(self.base_path, "url_cache_thumbnails", media_id[:10]),
 | |
|             ]
 | |
|         else:
 | |
|             return [
 | |
|                 os.path.join(
 | |
|                     self.base_path,
 | |
|                     "url_cache_thumbnails",
 | |
|                     media_id[0:2],
 | |
|                     media_id[2:4],
 | |
|                     media_id[4:],
 | |
|                 ),
 | |
|                 os.path.join(
 | |
|                     self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4]
 | |
|                 ),
 | |
|                 os.path.join(self.base_path, "url_cache_thumbnails", media_id[0:2]),
 | |
|             ]
 |