Merge branch 'release-v1.3.0' of github.com:matrix-org/synapse into matrix-org-hotfixes
commit
a7efbc5416
|
@ -16,6 +16,7 @@ _trial_temp*/
|
||||||
/*.log
|
/*.log
|
||||||
/*.log.config
|
/*.log.config
|
||||||
/*.pid
|
/*.pid
|
||||||
|
/.python-version
|
||||||
/*.signing.key
|
/*.signing.key
|
||||||
/env/
|
/env/
|
||||||
/homeserver*.yaml
|
/homeserver*.yaml
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Synapse will no longer serve any media repo admin endpoints when `enable_media_repo` is set to False in the configuration. If a media repo worker is used, the admin APIs relating to the media repo will be served from it instead.
|
|
@ -0,0 +1 @@
|
||||||
|
Handle pusher being deleted during processing rather than logging an exception.
|
|
@ -0,0 +1 @@
|
||||||
|
Add a lower bound to well-known lookup cache time to avoid repeated lookups.
|
|
@ -565,6 +565,13 @@ log_config: "CONFDIR/SERVERNAME.log.config"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Media Store ##
|
||||||
|
|
||||||
|
# Enable the media store service in the Synapse master. Uncomment the
|
||||||
|
# following if you are using a separate media store worker.
|
||||||
|
#
|
||||||
|
#enable_media_repo: false
|
||||||
|
|
||||||
# Directory where uploaded images and attachments are stored.
|
# Directory where uploaded images and attachments are stored.
|
||||||
#
|
#
|
||||||
media_store_path: "DATADIR/media_store"
|
media_store_path: "DATADIR/media_store"
|
||||||
|
|
|
@ -206,6 +206,13 @@ Handles the media repository. It can handle all endpoints starting with::
|
||||||
|
|
||||||
/_matrix/media/
|
/_matrix/media/
|
||||||
|
|
||||||
|
And the following regular expressions matching media-specific administration
|
||||||
|
APIs::
|
||||||
|
|
||||||
|
^/_synapse/admin/v1/purge_media_cache$
|
||||||
|
^/_synapse/admin/v1/room/.*/media$
|
||||||
|
^/_synapse/admin/v1/quarantine_media/.*$
|
||||||
|
|
||||||
You should also set ``enable_media_repo: False`` in the shared configuration
|
You should also set ``enable_media_repo: False`` in the shared configuration
|
||||||
file to stop the main synapse running background jobs related to managing the
|
file to stop the main synapse running background jobs related to managing the
|
||||||
media repository.
|
media repository.
|
||||||
|
|
|
@ -26,6 +26,7 @@ from synapse.app import _base
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.config.logger import setup_logging
|
from synapse.config.logger import setup_logging
|
||||||
|
from synapse.http.server import JsonResource
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.logging.context import LoggingContext
|
from synapse.logging.context import LoggingContext
|
||||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||||
|
@ -35,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
|
from synapse.rest.admin import register_servlets_for_media_repo
|
||||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
|
@ -71,6 +73,12 @@ class MediaRepositoryServer(HomeServer):
|
||||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||||
elif name == "media":
|
elif name == "media":
|
||||||
media_repo = self.get_media_repository_resource()
|
media_repo = self.get_media_repository_resource()
|
||||||
|
|
||||||
|
# We need to serve the admin servlets for media on the
|
||||||
|
# worker.
|
||||||
|
admin_resource = JsonResource(self, canonical_json=False)
|
||||||
|
register_servlets_for_media_repo(self, admin_resource)
|
||||||
|
|
||||||
resources.update(
|
resources.update(
|
||||||
{
|
{
|
||||||
MEDIA_PREFIX: media_repo,
|
MEDIA_PREFIX: media_repo,
|
||||||
|
@ -78,6 +86,7 @@ class MediaRepositoryServer(HomeServer):
|
||||||
CONTENT_REPO_PREFIX: ContentRepoResource(
|
CONTENT_REPO_PREFIX: ContentRepoResource(
|
||||||
self, self.config.uploads_path
|
self, self.config.uploads_path
|
||||||
),
|
),
|
||||||
|
"/_synapse/admin": admin_resource,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
|
||||||
|
@ -87,6 +88,18 @@ def parse_thumbnail_requirements(thumbnail_sizes):
|
||||||
|
|
||||||
class ContentRepositoryConfig(Config):
|
class ContentRepositoryConfig(Config):
|
||||||
def read_config(self, config, **kwargs):
|
def read_config(self, config, **kwargs):
|
||||||
|
|
||||||
|
# Only enable the media repo if either the media repo is enabled or the
|
||||||
|
# current worker app is the media repo.
|
||||||
|
if (
|
||||||
|
self.enable_media_repo is False
|
||||||
|
and config.worker_app != "synapse.app.media_repository"
|
||||||
|
):
|
||||||
|
self.can_load_media_repo = False
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
self.can_load_media_repo = True
|
||||||
|
|
||||||
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
|
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
|
||||||
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
|
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
|
||||||
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
|
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
|
||||||
|
@ -202,6 +215,13 @@ class ContentRepositoryConfig(Config):
|
||||||
|
|
||||||
return (
|
return (
|
||||||
r"""
|
r"""
|
||||||
|
## Media Store ##
|
||||||
|
|
||||||
|
# Enable the media store service in the Synapse master. Uncomment the
|
||||||
|
# following if you are using a separate media store worker.
|
||||||
|
#
|
||||||
|
#enable_media_repo: false
|
||||||
|
|
||||||
# Directory where uploaded images and attachments are stored.
|
# Directory where uploaded images and attachments are stored.
|
||||||
#
|
#
|
||||||
media_store_path: "%(media_store)s"
|
media_store_path: "%(media_store)s"
|
||||||
|
|
|
@ -12,10 +12,8 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
import random
|
|
||||||
import time
|
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
from netaddr import IPAddress
|
from netaddr import IPAddress
|
||||||
|
@ -24,31 +22,16 @@ from zope.interface import implementer
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||||
from twisted.internet.interfaces import IStreamClientEndpoint
|
from twisted.internet.interfaces import IStreamClientEndpoint
|
||||||
from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
|
from twisted.web.client import URI, Agent, HTTPConnectionPool
|
||||||
from twisted.web.http import stringToDatetime
|
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from twisted.web.iweb import IAgent
|
from twisted.web.iweb import IAgent
|
||||||
|
|
||||||
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
|
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
|
||||||
|
from synapse.http.federation.well_known_resolver import WellKnownResolver
|
||||||
from synapse.logging.context import make_deferred_yieldable
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.caches.ttlcache import TTLCache
|
|
||||||
from synapse.util.metrics import Measure
|
|
||||||
|
|
||||||
# period to cache .well-known results for by default
|
|
||||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
|
|
||||||
|
|
||||||
# jitter to add to the .well-known default cache ttl
|
|
||||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
|
|
||||||
|
|
||||||
# period to cache failure to fetch .well-known for
|
|
||||||
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
|
|
||||||
|
|
||||||
# cap for .well-known cache period
|
|
||||||
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
well_known_cache = TTLCache("well-known")
|
|
||||||
|
|
||||||
|
|
||||||
@implementer(IAgent)
|
@implementer(IAgent)
|
||||||
|
@ -78,7 +61,7 @@ class MatrixFederationAgent(object):
|
||||||
reactor,
|
reactor,
|
||||||
tls_client_options_factory,
|
tls_client_options_factory,
|
||||||
_srv_resolver=None,
|
_srv_resolver=None,
|
||||||
_well_known_cache=well_known_cache,
|
_well_known_cache=None,
|
||||||
):
|
):
|
||||||
self._reactor = reactor
|
self._reactor = reactor
|
||||||
self._clock = Clock(reactor)
|
self._clock = Clock(reactor)
|
||||||
|
@ -93,20 +76,15 @@ class MatrixFederationAgent(object):
|
||||||
self._pool.maxPersistentPerHost = 5
|
self._pool.maxPersistentPerHost = 5
|
||||||
self._pool.cachedConnectionTimeout = 2 * 60
|
self._pool.cachedConnectionTimeout = 2 * 60
|
||||||
|
|
||||||
_well_known_agent = RedirectAgent(
|
self._well_known_resolver = WellKnownResolver(
|
||||||
Agent(
|
self._reactor,
|
||||||
|
agent=Agent(
|
||||||
self._reactor,
|
self._reactor,
|
||||||
pool=self._pool,
|
pool=self._pool,
|
||||||
contextFactory=tls_client_options_factory,
|
contextFactory=tls_client_options_factory,
|
||||||
)
|
),
|
||||||
|
well_known_cache=_well_known_cache,
|
||||||
)
|
)
|
||||||
self._well_known_agent = _well_known_agent
|
|
||||||
|
|
||||||
# our cache of .well-known lookup results, mapping from server name
|
|
||||||
# to delegated name. The values can be:
|
|
||||||
# `bytes`: a valid server-name
|
|
||||||
# `None`: there is no (valid) .well-known here
|
|
||||||
self._well_known_cache = _well_known_cache
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def request(self, method, uri, headers=None, bodyProducer=None):
|
def request(self, method, uri, headers=None, bodyProducer=None):
|
||||||
|
@ -217,7 +195,10 @@ class MatrixFederationAgent(object):
|
||||||
|
|
||||||
if lookup_well_known:
|
if lookup_well_known:
|
||||||
# try a .well-known lookup
|
# try a .well-known lookup
|
||||||
well_known_server = yield self._get_well_known(parsed_uri.host)
|
well_known_result = yield self._well_known_resolver.get_well_known(
|
||||||
|
parsed_uri.host
|
||||||
|
)
|
||||||
|
well_known_server = well_known_result.delegated_server
|
||||||
|
|
||||||
if well_known_server:
|
if well_known_server:
|
||||||
# if we found a .well-known, start again, but don't do another
|
# if we found a .well-known, start again, but don't do another
|
||||||
|
@ -280,85 +261,6 @@ class MatrixFederationAgent(object):
|
||||||
target_port=port,
|
target_port=port,
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _get_well_known(self, server_name):
|
|
||||||
"""Attempt to fetch and parse a .well-known file for the given server
|
|
||||||
|
|
||||||
Args:
|
|
||||||
server_name (bytes): name of the server, from the requested url
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred[bytes|None]: either the new server name, from the .well-known, or
|
|
||||||
None if there was no .well-known file.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
result = self._well_known_cache[server_name]
|
|
||||||
except KeyError:
|
|
||||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
|
||||||
# requests for the same server in parallel?
|
|
||||||
with Measure(self._clock, "get_well_known"):
|
|
||||||
result, cache_period = yield self._do_get_well_known(server_name)
|
|
||||||
|
|
||||||
if cache_period > 0:
|
|
||||||
self._well_known_cache.set(server_name, result, cache_period)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _do_get_well_known(self, server_name):
|
|
||||||
"""Actually fetch and parse a .well-known, without checking the cache
|
|
||||||
|
|
||||||
Args:
|
|
||||||
server_name (bytes): name of the server, from the requested url
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred[Tuple[bytes|None|object],int]:
|
|
||||||
result, cache period, where result is one of:
|
|
||||||
- the new server name from the .well-known (as a `bytes`)
|
|
||||||
- None if there was no .well-known file.
|
|
||||||
- INVALID_WELL_KNOWN if the .well-known was invalid
|
|
||||||
"""
|
|
||||||
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
|
||||||
uri_str = uri.decode("ascii")
|
|
||||||
logger.info("Fetching %s", uri_str)
|
|
||||||
try:
|
|
||||||
response = yield make_deferred_yieldable(
|
|
||||||
self._well_known_agent.request(b"GET", uri)
|
|
||||||
)
|
|
||||||
body = yield make_deferred_yieldable(readBody(response))
|
|
||||||
if response.code != 200:
|
|
||||||
raise Exception("Non-200 response %s" % (response.code,))
|
|
||||||
|
|
||||||
parsed_body = json.loads(body.decode("utf-8"))
|
|
||||||
logger.info("Response from .well-known: %s", parsed_body)
|
|
||||||
if not isinstance(parsed_body, dict):
|
|
||||||
raise Exception("not a dict")
|
|
||||||
if "m.server" not in parsed_body:
|
|
||||||
raise Exception("Missing key 'm.server'")
|
|
||||||
except Exception as e:
|
|
||||||
logger.info("Error fetching %s: %s", uri_str, e)
|
|
||||||
|
|
||||||
# add some randomness to the TTL to avoid a stampeding herd every hour
|
|
||||||
# after startup
|
|
||||||
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
|
|
||||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
|
||||||
return (None, cache_period)
|
|
||||||
|
|
||||||
result = parsed_body["m.server"].encode("ascii")
|
|
||||||
|
|
||||||
cache_period = _cache_period_from_headers(
|
|
||||||
response.headers, time_now=self._reactor.seconds
|
|
||||||
)
|
|
||||||
if cache_period is None:
|
|
||||||
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
|
|
||||||
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
|
|
||||||
# after startup
|
|
||||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
|
||||||
else:
|
|
||||||
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
|
|
||||||
|
|
||||||
return (result, cache_period)
|
|
||||||
|
|
||||||
|
|
||||||
@implementer(IStreamClientEndpoint)
|
@implementer(IStreamClientEndpoint)
|
||||||
class LoggingHostnameEndpoint(object):
|
class LoggingHostnameEndpoint(object):
|
||||||
|
@ -374,44 +276,6 @@ class LoggingHostnameEndpoint(object):
|
||||||
return self.ep.connect(protocol_factory)
|
return self.ep.connect(protocol_factory)
|
||||||
|
|
||||||
|
|
||||||
def _cache_period_from_headers(headers, time_now=time.time):
|
|
||||||
cache_controls = _parse_cache_control(headers)
|
|
||||||
|
|
||||||
if b"no-store" in cache_controls:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if b"max-age" in cache_controls:
|
|
||||||
try:
|
|
||||||
max_age = int(cache_controls[b"max-age"])
|
|
||||||
return max_age
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
expires = headers.getRawHeaders(b"expires")
|
|
||||||
if expires is not None:
|
|
||||||
try:
|
|
||||||
expires_date = stringToDatetime(expires[-1])
|
|
||||||
return expires_date - time_now()
|
|
||||||
except ValueError:
|
|
||||||
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
|
|
||||||
# especially the value "0", as representing a time in the past (i.e.,
|
|
||||||
# "already expired").
|
|
||||||
return 0
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_cache_control(headers):
|
|
||||||
cache_controls = {}
|
|
||||||
for hdr in headers.getRawHeaders(b"cache-control", []):
|
|
||||||
for directive in hdr.split(b","):
|
|
||||||
splits = [x.strip() for x in directive.split(b"=", 1)]
|
|
||||||
k = splits[0].lower()
|
|
||||||
v = splits[1] if len(splits) > 1 else None
|
|
||||||
cache_controls[k] = v
|
|
||||||
return cache_controls
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class _RoutingResult(object):
|
class _RoutingResult(object):
|
||||||
"""The result returned by `_route_matrix_uri`.
|
"""The result returned by `_route_matrix_uri`.
|
||||||
|
|
|
@ -0,0 +1,187 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
from twisted.web.client import RedirectAgent, readBody
|
||||||
|
from twisted.web.http import stringToDatetime
|
||||||
|
|
||||||
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
|
from synapse.util import Clock
|
||||||
|
from synapse.util.caches.ttlcache import TTLCache
|
||||||
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
|
# period to cache .well-known results for by default
|
||||||
|
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
|
||||||
|
|
||||||
|
# jitter to add to the .well-known default cache ttl
|
||||||
|
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
|
||||||
|
|
||||||
|
# period to cache failure to fetch .well-known for
|
||||||
|
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
|
||||||
|
|
||||||
|
# cap for .well-known cache period
|
||||||
|
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
|
||||||
|
|
||||||
|
# lower bound for .well-known cache period
|
||||||
|
WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
_well_known_cache = TTLCache("well-known")
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True)
|
||||||
|
class WellKnownLookupResult(object):
|
||||||
|
delegated_server = attr.ib()
|
||||||
|
|
||||||
|
|
||||||
|
class WellKnownResolver(object):
|
||||||
|
"""Handles well-known lookups for matrix servers.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, reactor, agent, well_known_cache=None):
|
||||||
|
self._reactor = reactor
|
||||||
|
self._clock = Clock(reactor)
|
||||||
|
|
||||||
|
if well_known_cache is None:
|
||||||
|
well_known_cache = _well_known_cache
|
||||||
|
|
||||||
|
self._well_known_cache = well_known_cache
|
||||||
|
self._well_known_agent = RedirectAgent(agent)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_well_known(self, server_name):
|
||||||
|
"""Attempt to fetch and parse a .well-known file for the given server
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_name (bytes): name of the server, from the requested url
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[WellKnownLookupResult]: The result of the lookup
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = self._well_known_cache[server_name]
|
||||||
|
except KeyError:
|
||||||
|
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||||
|
# requests for the same server in parallel?
|
||||||
|
with Measure(self._clock, "get_well_known"):
|
||||||
|
result, cache_period = yield self._do_get_well_known(server_name)
|
||||||
|
|
||||||
|
if cache_period > 0:
|
||||||
|
self._well_known_cache.set(server_name, result, cache_period)
|
||||||
|
|
||||||
|
return WellKnownLookupResult(delegated_server=result)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _do_get_well_known(self, server_name):
|
||||||
|
"""Actually fetch and parse a .well-known, without checking the cache
|
||||||
|
|
||||||
|
Args:
|
||||||
|
server_name (bytes): name of the server, from the requested url
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[Tuple[bytes|None|object],int]:
|
||||||
|
result, cache period, where result is one of:
|
||||||
|
- the new server name from the .well-known (as a `bytes`)
|
||||||
|
- None if there was no .well-known file.
|
||||||
|
- INVALID_WELL_KNOWN if the .well-known was invalid
|
||||||
|
"""
|
||||||
|
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
||||||
|
uri_str = uri.decode("ascii")
|
||||||
|
logger.info("Fetching %s", uri_str)
|
||||||
|
try:
|
||||||
|
response = yield make_deferred_yieldable(
|
||||||
|
self._well_known_agent.request(b"GET", uri)
|
||||||
|
)
|
||||||
|
body = yield make_deferred_yieldable(readBody(response))
|
||||||
|
if response.code != 200:
|
||||||
|
raise Exception("Non-200 response %s" % (response.code,))
|
||||||
|
|
||||||
|
parsed_body = json.loads(body.decode("utf-8"))
|
||||||
|
logger.info("Response from .well-known: %s", parsed_body)
|
||||||
|
if not isinstance(parsed_body, dict):
|
||||||
|
raise Exception("not a dict")
|
||||||
|
if "m.server" not in parsed_body:
|
||||||
|
raise Exception("Missing key 'm.server'")
|
||||||
|
except Exception as e:
|
||||||
|
logger.info("Error fetching %s: %s", uri_str, e)
|
||||||
|
|
||||||
|
# add some randomness to the TTL to avoid a stampeding herd every hour
|
||||||
|
# after startup
|
||||||
|
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
|
||||||
|
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||||
|
return (None, cache_period)
|
||||||
|
|
||||||
|
result = parsed_body["m.server"].encode("ascii")
|
||||||
|
|
||||||
|
cache_period = _cache_period_from_headers(
|
||||||
|
response.headers, time_now=self._reactor.seconds
|
||||||
|
)
|
||||||
|
if cache_period is None:
|
||||||
|
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
|
||||||
|
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
|
||||||
|
# after startup
|
||||||
|
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||||
|
else:
|
||||||
|
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
|
||||||
|
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
|
||||||
|
|
||||||
|
return (result, cache_period)
|
||||||
|
|
||||||
|
|
||||||
|
def _cache_period_from_headers(headers, time_now=time.time):
|
||||||
|
cache_controls = _parse_cache_control(headers)
|
||||||
|
|
||||||
|
if b"no-store" in cache_controls:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if b"max-age" in cache_controls:
|
||||||
|
try:
|
||||||
|
max_age = int(cache_controls[b"max-age"])
|
||||||
|
return max_age
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
expires = headers.getRawHeaders(b"expires")
|
||||||
|
if expires is not None:
|
||||||
|
try:
|
||||||
|
expires_date = stringToDatetime(expires[-1])
|
||||||
|
return expires_date - time_now()
|
||||||
|
except ValueError:
|
||||||
|
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
|
||||||
|
# especially the value "0", as representing a time in the past (i.e.,
|
||||||
|
# "already expired").
|
||||||
|
return 0
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_cache_control(headers):
|
||||||
|
cache_controls = {}
|
||||||
|
for hdr in headers.getRawHeaders(b"cache-control", []):
|
||||||
|
for directive in hdr.split(b","):
|
||||||
|
splits = [x.strip() for x in directive.split(b"=", 1)]
|
||||||
|
k = splits[0].lower()
|
||||||
|
v = splits[1] if len(splits) > 1 else None
|
||||||
|
cache_controls[k] = v
|
||||||
|
return cache_controls
|
|
@ -234,13 +234,19 @@ class EmailPusher(object):
|
||||||
return
|
return
|
||||||
|
|
||||||
self.last_stream_ordering = last_stream_ordering
|
self.last_stream_ordering = last_stream_ordering
|
||||||
yield self.store.update_pusher_last_stream_ordering_and_success(
|
pusher_still_exists = (
|
||||||
self.app_id,
|
yield self.store.update_pusher_last_stream_ordering_and_success(
|
||||||
self.email,
|
self.app_id,
|
||||||
self.user_id,
|
self.email,
|
||||||
last_stream_ordering,
|
self.user_id,
|
||||||
self.clock.time_msec(),
|
last_stream_ordering,
|
||||||
|
self.clock.time_msec(),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
if not pusher_still_exists:
|
||||||
|
# The pusher has been deleted while we were processing, so
|
||||||
|
# lets just stop and return.
|
||||||
|
self.on_stop()
|
||||||
|
|
||||||
def seconds_until(self, ts_msec):
|
def seconds_until(self, ts_msec):
|
||||||
secs = (ts_msec - self.clock.time_msec()) / 1000
|
secs = (ts_msec - self.clock.time_msec()) / 1000
|
||||||
|
|
|
@ -203,13 +203,21 @@ class HttpPusher(object):
|
||||||
http_push_processed_counter.inc()
|
http_push_processed_counter.inc()
|
||||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||||
self.last_stream_ordering = push_action["stream_ordering"]
|
self.last_stream_ordering = push_action["stream_ordering"]
|
||||||
yield self.store.update_pusher_last_stream_ordering_and_success(
|
pusher_still_exists = (
|
||||||
self.app_id,
|
yield self.store.update_pusher_last_stream_ordering_and_success(
|
||||||
self.pushkey,
|
self.app_id,
|
||||||
self.user_id,
|
self.pushkey,
|
||||||
self.last_stream_ordering,
|
self.user_id,
|
||||||
self.clock.time_msec(),
|
self.last_stream_ordering,
|
||||||
|
self.clock.time_msec(),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
if not pusher_still_exists:
|
||||||
|
# The pusher has been deleted while we were processing, so
|
||||||
|
# lets just stop and return.
|
||||||
|
self.on_stop()
|
||||||
|
return
|
||||||
|
|
||||||
if self.failing_since:
|
if self.failing_since:
|
||||||
self.failing_since = None
|
self.failing_since = None
|
||||||
yield self.store.update_pusher_failing_since(
|
yield self.store.update_pusher_failing_since(
|
||||||
|
@ -238,12 +246,17 @@ class HttpPusher(object):
|
||||||
)
|
)
|
||||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||||
self.last_stream_ordering = push_action["stream_ordering"]
|
self.last_stream_ordering = push_action["stream_ordering"]
|
||||||
yield self.store.update_pusher_last_stream_ordering(
|
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
|
||||||
self.app_id,
|
self.app_id,
|
||||||
self.pushkey,
|
self.pushkey,
|
||||||
self.user_id,
|
self.user_id,
|
||||||
self.last_stream_ordering,
|
self.last_stream_ordering,
|
||||||
)
|
)
|
||||||
|
if not pusher_still_exists:
|
||||||
|
# The pusher has been deleted while we were processing, so
|
||||||
|
# lets just stop and return.
|
||||||
|
self.on_stop()
|
||||||
|
return
|
||||||
|
|
||||||
self.failing_since = None
|
self.failing_since = None
|
||||||
yield self.store.update_pusher_failing_since(
|
yield self.store.update_pusher_failing_since(
|
||||||
|
|
|
@ -27,7 +27,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
from synapse.api.constants import Membership, UserTypes
|
from synapse.api.constants import Membership, UserTypes
|
||||||
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
|
from synapse.api.errors import Codes, NotFoundError, SynapseError
|
||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
RestServlet,
|
RestServlet,
|
||||||
|
@ -36,7 +36,12 @@ from synapse.http.servlet import (
|
||||||
parse_json_object_from_request,
|
parse_json_object_from_request,
|
||||||
parse_string,
|
parse_string,
|
||||||
)
|
)
|
||||||
from synapse.rest.admin._base import assert_requester_is_admin, assert_user_is_admin
|
from synapse.rest.admin._base import (
|
||||||
|
assert_requester_is_admin,
|
||||||
|
assert_user_is_admin,
|
||||||
|
historical_admin_path_patterns,
|
||||||
|
)
|
||||||
|
from synapse.rest.admin.media import register_servlets_for_media_repo
|
||||||
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
||||||
from synapse.types import UserID, create_requester
|
from synapse.types import UserID, create_requester
|
||||||
from synapse.util.versionstring import get_version_string
|
from synapse.util.versionstring import get_version_string
|
||||||
|
@ -44,28 +49,6 @@ from synapse.util.versionstring import get_version_string
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def historical_admin_path_patterns(path_regex):
|
|
||||||
"""Returns the list of patterns for an admin endpoint, including historical ones
|
|
||||||
|
|
||||||
This is a backwards-compatibility hack. Previously, the Admin API was exposed at
|
|
||||||
various paths under /_matrix/client. This function returns a list of patterns
|
|
||||||
matching those paths (as well as the new one), so that existing scripts which rely
|
|
||||||
on the endpoints being available there are not broken.
|
|
||||||
|
|
||||||
Note that this should only be used for existing endpoints: new ones should just
|
|
||||||
register for the /_synapse/admin path.
|
|
||||||
"""
|
|
||||||
return list(
|
|
||||||
re.compile(prefix + path_regex)
|
|
||||||
for prefix in (
|
|
||||||
"^/_synapse/admin/v1",
|
|
||||||
"^/_matrix/client/api/v1/admin",
|
|
||||||
"^/_matrix/client/unstable/admin",
|
|
||||||
"^/_matrix/client/r0/admin",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class UsersRestServlet(RestServlet):
|
class UsersRestServlet(RestServlet):
|
||||||
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
|
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
|
||||||
|
|
||||||
|
@ -255,25 +238,6 @@ class WhoisRestServlet(RestServlet):
|
||||||
return (200, ret)
|
return (200, ret)
|
||||||
|
|
||||||
|
|
||||||
class PurgeMediaCacheRestServlet(RestServlet):
|
|
||||||
PATTERNS = historical_admin_path_patterns("/purge_media_cache")
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
self.media_repository = hs.get_media_repository()
|
|
||||||
self.auth = hs.get_auth()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def on_POST(self, request):
|
|
||||||
yield assert_requester_is_admin(self.auth, request)
|
|
||||||
|
|
||||||
before_ts = parse_integer(request, "before_ts", required=True)
|
|
||||||
logger.info("before_ts: %r", before_ts)
|
|
||||||
|
|
||||||
ret = yield self.media_repository.delete_old_remote_media(before_ts)
|
|
||||||
|
|
||||||
return (200, ret)
|
|
||||||
|
|
||||||
|
|
||||||
class PurgeHistoryRestServlet(RestServlet):
|
class PurgeHistoryRestServlet(RestServlet):
|
||||||
PATTERNS = historical_admin_path_patterns(
|
PATTERNS = historical_admin_path_patterns(
|
||||||
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
|
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
|
||||||
|
@ -542,50 +506,6 @@ class ShutdownRoomRestServlet(RestServlet):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class QuarantineMediaInRoom(RestServlet):
|
|
||||||
"""Quarantines all media in a room so that no one can download it via
|
|
||||||
this server.
|
|
||||||
"""
|
|
||||||
|
|
||||||
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.auth = hs.get_auth()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def on_POST(self, request, room_id):
|
|
||||||
requester = yield self.auth.get_user_by_req(request)
|
|
||||||
yield assert_user_is_admin(self.auth, requester.user)
|
|
||||||
|
|
||||||
num_quarantined = yield self.store.quarantine_media_ids_in_room(
|
|
||||||
room_id, requester.user.to_string()
|
|
||||||
)
|
|
||||||
|
|
||||||
return (200, {"num_quarantined": num_quarantined})
|
|
||||||
|
|
||||||
|
|
||||||
class ListMediaInRoom(RestServlet):
|
|
||||||
"""Lists all of the media in a given room.
|
|
||||||
"""
|
|
||||||
|
|
||||||
PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
self.store = hs.get_datastore()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def on_GET(self, request, room_id):
|
|
||||||
requester = yield self.auth.get_user_by_req(request)
|
|
||||||
is_admin = yield self.auth.is_server_admin(requester.user)
|
|
||||||
if not is_admin:
|
|
||||||
raise AuthError(403, "You are not a server admin")
|
|
||||||
|
|
||||||
local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
|
|
||||||
|
|
||||||
return (200, {"local": local_mxcs, "remote": remote_mxcs})
|
|
||||||
|
|
||||||
|
|
||||||
class ResetPasswordRestServlet(RestServlet):
|
class ResetPasswordRestServlet(RestServlet):
|
||||||
"""Post request to allow an administrator reset password for a user.
|
"""Post request to allow an administrator reset password for a user.
|
||||||
This needs user to have administrator access in Synapse.
|
This needs user to have administrator access in Synapse.
|
||||||
|
@ -825,7 +745,6 @@ def register_servlets(hs, http_server):
|
||||||
def register_servlets_for_client_rest_resource(hs, http_server):
|
def register_servlets_for_client_rest_resource(hs, http_server):
|
||||||
"""Register only the servlets which need to be exposed on /_matrix/client/xxx"""
|
"""Register only the servlets which need to be exposed on /_matrix/client/xxx"""
|
||||||
WhoisRestServlet(hs).register(http_server)
|
WhoisRestServlet(hs).register(http_server)
|
||||||
PurgeMediaCacheRestServlet(hs).register(http_server)
|
|
||||||
PurgeHistoryStatusRestServlet(hs).register(http_server)
|
PurgeHistoryStatusRestServlet(hs).register(http_server)
|
||||||
DeactivateAccountRestServlet(hs).register(http_server)
|
DeactivateAccountRestServlet(hs).register(http_server)
|
||||||
PurgeHistoryRestServlet(hs).register(http_server)
|
PurgeHistoryRestServlet(hs).register(http_server)
|
||||||
|
@ -834,10 +753,13 @@ def register_servlets_for_client_rest_resource(hs, http_server):
|
||||||
GetUsersPaginatedRestServlet(hs).register(http_server)
|
GetUsersPaginatedRestServlet(hs).register(http_server)
|
||||||
SearchUsersRestServlet(hs).register(http_server)
|
SearchUsersRestServlet(hs).register(http_server)
|
||||||
ShutdownRoomRestServlet(hs).register(http_server)
|
ShutdownRoomRestServlet(hs).register(http_server)
|
||||||
QuarantineMediaInRoom(hs).register(http_server)
|
|
||||||
ListMediaInRoom(hs).register(http_server)
|
|
||||||
UserRegisterServlet(hs).register(http_server)
|
UserRegisterServlet(hs).register(http_server)
|
||||||
DeleteGroupAdminRestServlet(hs).register(http_server)
|
DeleteGroupAdminRestServlet(hs).register(http_server)
|
||||||
AccountValidityRenewServlet(hs).register(http_server)
|
AccountValidityRenewServlet(hs).register(http_server)
|
||||||
|
|
||||||
|
# Load the media repo ones if we're using them.
|
||||||
|
if hs.config.can_load_media_repo:
|
||||||
|
register_servlets_for_media_repo(hs, http_server)
|
||||||
|
|
||||||
# don't add more things here: new servlets should only be exposed on
|
# don't add more things here: new servlets should only be exposed on
|
||||||
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.
|
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.
|
||||||
|
|
|
@ -12,11 +12,36 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import AuthError
|
from synapse.api.errors import AuthError
|
||||||
|
|
||||||
|
|
||||||
|
def historical_admin_path_patterns(path_regex):
|
||||||
|
"""Returns the list of patterns for an admin endpoint, including historical ones
|
||||||
|
|
||||||
|
This is a backwards-compatibility hack. Previously, the Admin API was exposed at
|
||||||
|
various paths under /_matrix/client. This function returns a list of patterns
|
||||||
|
matching those paths (as well as the new one), so that existing scripts which rely
|
||||||
|
on the endpoints being available there are not broken.
|
||||||
|
|
||||||
|
Note that this should only be used for existing endpoints: new ones should just
|
||||||
|
register for the /_synapse/admin path.
|
||||||
|
"""
|
||||||
|
return list(
|
||||||
|
re.compile(prefix + path_regex)
|
||||||
|
for prefix in (
|
||||||
|
"^/_synapse/admin/v1",
|
||||||
|
"^/_matrix/client/api/v1/admin",
|
||||||
|
"^/_matrix/client/unstable/admin",
|
||||||
|
"^/_matrix/client/r0/admin",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def assert_requester_is_admin(auth, request):
|
def assert_requester_is_admin(auth, request):
|
||||||
"""Verify that the requester is an admin user
|
"""Verify that the requester is an admin user
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018-2019 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.api.errors import AuthError
|
||||||
|
from synapse.http.servlet import RestServlet, parse_integer
|
||||||
|
from synapse.rest.admin._base import (
|
||||||
|
assert_requester_is_admin,
|
||||||
|
assert_user_is_admin,
|
||||||
|
historical_admin_path_patterns,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class QuarantineMediaInRoom(RestServlet):
|
||||||
|
"""Quarantines all media in a room so that no one can download it via
|
||||||
|
this server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_POST(self, request, room_id):
|
||||||
|
requester = yield self.auth.get_user_by_req(request)
|
||||||
|
yield assert_user_is_admin(self.auth, requester.user)
|
||||||
|
|
||||||
|
num_quarantined = yield self.store.quarantine_media_ids_in_room(
|
||||||
|
room_id, requester.user.to_string()
|
||||||
|
)
|
||||||
|
|
||||||
|
return (200, {"num_quarantined": num_quarantined})
|
||||||
|
|
||||||
|
|
||||||
|
class ListMediaInRoom(RestServlet):
|
||||||
|
"""Lists all of the media in a given room.
|
||||||
|
"""
|
||||||
|
|
||||||
|
PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_GET(self, request, room_id):
|
||||||
|
requester = yield self.auth.get_user_by_req(request)
|
||||||
|
is_admin = yield self.auth.is_server_admin(requester.user)
|
||||||
|
if not is_admin:
|
||||||
|
raise AuthError(403, "You are not a server admin")
|
||||||
|
|
||||||
|
local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
|
||||||
|
|
||||||
|
return (200, {"local": local_mxcs, "remote": remote_mxcs})
|
||||||
|
|
||||||
|
|
||||||
|
class PurgeMediaCacheRestServlet(RestServlet):
|
||||||
|
PATTERNS = historical_admin_path_patterns("/purge_media_cache")
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.media_repository = hs.get_media_repository()
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_POST(self, request):
|
||||||
|
yield assert_requester_is_admin(self.auth, request)
|
||||||
|
|
||||||
|
before_ts = parse_integer(request, "before_ts", required=True)
|
||||||
|
logger.info("before_ts: %r", before_ts)
|
||||||
|
|
||||||
|
ret = yield self.media_repository.delete_old_remote_media(before_ts)
|
||||||
|
|
||||||
|
return (200, ret)
|
||||||
|
|
||||||
|
|
||||||
|
def register_servlets_for_media_repo(hs, http_server):
|
||||||
|
"""
|
||||||
|
Media repo specific APIs.
|
||||||
|
"""
|
||||||
|
PurgeMediaCacheRestServlet(hs).register(http_server)
|
||||||
|
QuarantineMediaInRoom(hs).register(http_server)
|
||||||
|
ListMediaInRoom(hs).register(http_server)
|
|
@ -33,6 +33,7 @@ from synapse.api.errors import (
|
||||||
RequestSendFailed,
|
RequestSendFailed,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
|
from synapse.config._base import ConfigError
|
||||||
from synapse.logging.context import defer_to_thread
|
from synapse.logging.context import defer_to_thread
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
|
@ -753,8 +754,11 @@ class MediaRepositoryResource(Resource):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
Resource.__init__(self)
|
# If we're not configured to use it, raise if we somehow got here.
|
||||||
|
if not hs.config.can_load_media_repo:
|
||||||
|
raise ConfigError("Synapse is not configured to use a media repo.")
|
||||||
|
|
||||||
|
super().__init__()
|
||||||
media_repo = hs.get_media_repository()
|
media_repo = hs.get_media_repository()
|
||||||
|
|
||||||
self.putChild(b"upload", UploadResource(hs, media_repo))
|
self.putChild(b"upload", UploadResource(hs, media_repo))
|
||||||
|
|
|
@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
|
||||||
def update_pusher_last_stream_ordering_and_success(
|
def update_pusher_last_stream_ordering_and_success(
|
||||||
self, app_id, pushkey, user_id, last_stream_ordering, last_success
|
self, app_id, pushkey, user_id, last_stream_ordering, last_success
|
||||||
):
|
):
|
||||||
yield self._simple_update_one(
|
"""Update the last stream ordering position we've processed up to for
|
||||||
"pushers",
|
the given pusher.
|
||||||
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
|
||||||
{
|
Args:
|
||||||
|
app_id (str)
|
||||||
|
pushkey (str)
|
||||||
|
last_stream_ordering (int)
|
||||||
|
last_success (int)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[bool]: True if the pusher still exists; False if it has been deleted.
|
||||||
|
"""
|
||||||
|
updated = yield self._simple_update(
|
||||||
|
table="pushers",
|
||||||
|
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
||||||
|
updatevalues={
|
||||||
"last_stream_ordering": last_stream_ordering,
|
"last_stream_ordering": last_stream_ordering,
|
||||||
"last_success": last_success,
|
"last_success": last_success,
|
||||||
},
|
},
|
||||||
desc="update_pusher_last_stream_ordering_and_success",
|
desc="update_pusher_last_stream_ordering_and_success",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return bool(updated)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
|
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
|
||||||
yield self._simple_update_one(
|
yield self._simple_update(
|
||||||
"pushers",
|
table="pushers",
|
||||||
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
||||||
{"failing_since": failing_since},
|
updatevalues={"failing_since": failing_since},
|
||||||
desc="update_pusher_failing_since",
|
desc="update_pusher_failing_since",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -25,17 +25,19 @@ from twisted.internet._sslverify import ClientTLSOptions, OpenSSLCertificateOpti
|
||||||
from twisted.internet.protocol import Factory
|
from twisted.internet.protocol import Factory
|
||||||
from twisted.protocols.tls import TLSMemoryBIOFactory
|
from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||||
from twisted.web._newclient import ResponseNeverReceived
|
from twisted.web._newclient import ResponseNeverReceived
|
||||||
|
from twisted.web.client import Agent
|
||||||
from twisted.web.http import HTTPChannel
|
from twisted.web.http import HTTPChannel
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from twisted.web.iweb import IPolicyForHTTPS
|
from twisted.web.iweb import IPolicyForHTTPS
|
||||||
|
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.crypto.context_factory import ClientTLSOptionsFactory
|
from synapse.crypto.context_factory import ClientTLSOptionsFactory
|
||||||
from synapse.http.federation.matrix_federation_agent import (
|
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
||||||
MatrixFederationAgent,
|
from synapse.http.federation.srv_resolver import Server
|
||||||
|
from synapse.http.federation.well_known_resolver import (
|
||||||
|
WellKnownResolver,
|
||||||
_cache_period_from_headers,
|
_cache_period_from_headers,
|
||||||
)
|
)
|
||||||
from synapse.http.federation.srv_resolver import Server
|
|
||||||
from synapse.logging.context import LoggingContext
|
from synapse.logging.context import LoggingContext
|
||||||
from synapse.util.caches.ttlcache import TTLCache
|
from synapse.util.caches.ttlcache import TTLCache
|
||||||
|
|
||||||
|
@ -79,9 +81,10 @@ class MatrixFederationAgentTests(TestCase):
|
||||||
self._config = config = HomeServerConfig()
|
self._config = config = HomeServerConfig()
|
||||||
config.parse_config_dict(config_dict, "", "")
|
config.parse_config_dict(config_dict, "", "")
|
||||||
|
|
||||||
|
self.tls_factory = ClientTLSOptionsFactory(config)
|
||||||
self.agent = MatrixFederationAgent(
|
self.agent = MatrixFederationAgent(
|
||||||
reactor=self.reactor,
|
reactor=self.reactor,
|
||||||
tls_client_options_factory=ClientTLSOptionsFactory(config),
|
tls_client_options_factory=self.tls_factory,
|
||||||
_srv_resolver=self.mock_resolver,
|
_srv_resolver=self.mock_resolver,
|
||||||
_well_known_cache=self.well_known_cache,
|
_well_known_cache=self.well_known_cache,
|
||||||
)
|
)
|
||||||
|
@ -928,20 +931,16 @@ class MatrixFederationAgentTests(TestCase):
|
||||||
self.reactor.pump((0.1,))
|
self.reactor.pump((0.1,))
|
||||||
self.successResultOf(test_d)
|
self.successResultOf(test_d)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def do_get_well_known(self, serv):
|
|
||||||
try:
|
|
||||||
result = yield self.agent._get_well_known(serv)
|
|
||||||
logger.info("Result from well-known fetch: %s", result)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Error fetching well-known: %s", e)
|
|
||||||
raise
|
|
||||||
return result
|
|
||||||
|
|
||||||
def test_well_known_cache(self):
|
def test_well_known_cache(self):
|
||||||
|
well_known_resolver = WellKnownResolver(
|
||||||
|
self.reactor,
|
||||||
|
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||||
|
well_known_cache=self.well_known_cache,
|
||||||
|
)
|
||||||
|
|
||||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||||
|
|
||||||
fetch_d = self.do_get_well_known(b"testserv")
|
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||||
|
|
||||||
# there should be an attempt to connect on port 443 for the .well-known
|
# there should be an attempt to connect on port 443 for the .well-known
|
||||||
clients = self.reactor.tcpClients
|
clients = self.reactor.tcpClients
|
||||||
|
@ -953,26 +952,26 @@ class MatrixFederationAgentTests(TestCase):
|
||||||
well_known_server = self._handle_well_known_connection(
|
well_known_server = self._handle_well_known_connection(
|
||||||
client_factory,
|
client_factory,
|
||||||
expected_sni=b"testserv",
|
expected_sni=b"testserv",
|
||||||
response_headers={b"Cache-Control": b"max-age=10"},
|
response_headers={b"Cache-Control": b"max-age=1000"},
|
||||||
content=b'{ "m.server": "target-server" }',
|
content=b'{ "m.server": "target-server" }',
|
||||||
)
|
)
|
||||||
|
|
||||||
r = self.successResultOf(fetch_d)
|
r = self.successResultOf(fetch_d)
|
||||||
self.assertEqual(r, b"target-server")
|
self.assertEqual(r.delegated_server, b"target-server")
|
||||||
|
|
||||||
# close the tcp connection
|
# close the tcp connection
|
||||||
well_known_server.loseConnection()
|
well_known_server.loseConnection()
|
||||||
|
|
||||||
# repeat the request: it should hit the cache
|
# repeat the request: it should hit the cache
|
||||||
fetch_d = self.do_get_well_known(b"testserv")
|
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||||
r = self.successResultOf(fetch_d)
|
r = self.successResultOf(fetch_d)
|
||||||
self.assertEqual(r, b"target-server")
|
self.assertEqual(r.delegated_server, b"target-server")
|
||||||
|
|
||||||
# expire the cache
|
# expire the cache
|
||||||
self.reactor.pump((10.0,))
|
self.reactor.pump((1000.0,))
|
||||||
|
|
||||||
# now it should connect again
|
# now it should connect again
|
||||||
fetch_d = self.do_get_well_known(b"testserv")
|
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||||
|
|
||||||
self.assertEqual(len(clients), 1)
|
self.assertEqual(len(clients), 1)
|
||||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||||
|
@ -986,7 +985,7 @@ class MatrixFederationAgentTests(TestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
r = self.successResultOf(fetch_d)
|
r = self.successResultOf(fetch_d)
|
||||||
self.assertEqual(r, b"other-server")
|
self.assertEqual(r.delegated_server, b"other-server")
|
||||||
|
|
||||||
|
|
||||||
class TestCachePeriodFromHeaders(TestCase):
|
class TestCachePeriodFromHeaders(TestCase):
|
||||||
|
|
Loading…
Reference in New Issue