Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

pull/3979/head
Erik Johnston 2018-09-14 18:25:55 +01:00
commit b5eef203f4
20 changed files with 288 additions and 81 deletions

View File

@ -53,7 +53,7 @@ jobs:
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
@ -76,7 +76,7 @@ jobs:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
@ -101,6 +101,8 @@ workflows:
jobs:
- sytestpy2
- sytestpy2postgres
- sytestpy3
- sytestpy3postgres
- sytestpy2merged:
filters:
branches:
@ -109,6 +111,11 @@ workflows:
filters:
branches:
ignore: /develop|master/
# Currently broken while the Python 3 port is incomplete
# - sytestpy3
# - sytestpy3postgres
- sytestpy3merged:
filters:
branches:
ignore: /develop|master/
- sytestpy3postgresmerged:
filters:
branches:
ignore: /develop|master/

View File

@ -25,6 +25,9 @@ matrix:
services:
- postgresql
- python: 3.5
env: TOX_ENV=py35
- python: 3.6
env: TOX_ENV=py36

1
changelog.d/3576.feature Normal file
View File

@ -0,0 +1 @@
Python 3.5+ is now supported.

1
changelog.d/3860.misc Normal file
View File

@ -0,0 +1 @@
Fix typo in replication stream exception.

1
changelog.d/3871.misc Normal file
View File

@ -0,0 +1 @@
Add in flight real time metrics for Measure blocks

1
changelog.d/3872.misc Normal file
View File

@ -0,0 +1 @@
Disable buffering and automatic retrying in treq requests to prevent timeouts.

0
changelog.d/3874.bugfix Normal file
View File

View File

@ -269,14 +269,7 @@ class PaginationHandler(object):
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
if state:
state = yield filter_events_for_client(
self.store,
user_id,
state.values(),
is_peeking=(member_event_id is None),
)
state = state.values()
time_now = self.clock.time_msec()

View File

@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout):
return value
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
r'\1<redacted>\3',
uri
)

View File

@ -93,7 +93,7 @@ class SimpleHttpClient(object):
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii')))
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = treq.request(
@ -108,14 +108,14 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method, redact_uri(uri.encode('ascii')), response.code
method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0]
method, redact_uri(uri), type(e).__name__, e.args[0]
)
raise
@ -348,7 +348,8 @@ class SimpleHttpClient(object):
resp_headers = dict(response.headers.getAllRawHeaders())
if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size:
if (b'Content-Length' in resp_headers and
int(resp_headers[b'Content-Length']) > max_size):
logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
raise SynapseError(
502,
@ -381,7 +382,12 @@ class SimpleHttpClient(object):
)
defer.returnValue(
(length, resp_headers, response.request.absoluteURI, response.code),
(
length,
resp_headers,
response.request.absoluteURI.decode('ascii'),
response.code,
),
)
@ -466,9 +472,9 @@ class SpiderEndpointFactory(object):
def endpointForURI(self, uri):
logger.info("Getting endpoint for %s", uri.toBytes())
if uri.scheme == "http":
if uri.scheme == b"http":
endpoint_factory = HostnameEndpoint
elif uri.scheme == "https":
elif uri.scheme == b"https":
tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port)
def endpoint_factory(reactor, host, port, **kw):

View File

@ -43,6 +43,7 @@ from synapse.api.errors import (
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
@ -91,6 +92,7 @@ class MatrixFederationHttpClient(object):
self.server_name = hs.hostname
reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
@ -221,12 +223,15 @@ class MatrixFederationHttpClient(object):
headers=Headers(headers_dict),
data=data,
agent=self.agent,
reactor=self.hs.get_reactor()
reactor=self.hs.get_reactor(),
unbuffered=True
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
response = yield make_deferred_yieldable(
request_deferred,
)
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
log_result = "%d %s" % (response.code, response.phrase,)
break

View File

@ -85,7 +85,10 @@ class SynapseRequest(Request):
return "%s-%i" % (self.method, self.request_seq)
def get_redacted_uri(self):
return redact_uri(self.uri)
uri = self.uri
if isinstance(uri, bytes):
uri = self.uri.decode('ascii')
return redact_uri(uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]

View File

@ -18,8 +18,11 @@ import gc
import logging
import os
import platform
import threading
import time
import six
import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily
@ -68,7 +71,7 @@ class LaterGauge(object):
return
if isinstance(calls, dict):
for k, v in calls.items():
for k, v in six.iteritems(calls):
g.add_metric(k, v)
else:
g.add_metric([], calls)
@ -87,6 +90,109 @@ class LaterGauge(object):
all_gauges[self.name] = self
class InFlightGauge(object):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Each InFlightGauge will create a metric called `<name>_total` that counts
the number of in flight blocks, as well as a metrics for each item in the
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
callbacks.
Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
"""
def __init__(self, name, desc, labels, sub_metrics):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
)
# Counts number of in flight blocks for a given set of label values
self._registrations = {}
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
def register(self, key, callback):
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
value must also be given to `unregister`.
`callback` gets called with an object that has an attribute per
sub_metric, which should be updated with the necessary values. Note that
the metrics object is shared between all callbacks registered with the
same key.
Note that `callback` may be called on a separate thread.
"""
with self._lock:
self._registrations.setdefault(key, set()).add(callback)
def unregister(self, key, callback):
"""Registers that we've exited a block with labels `key`.
"""
with self._lock:
self._registrations.setdefault(key, set()).discard(callback)
def collect(self):
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
metrics_by_key = {}
# We copy so that we don't mutate the list while iterating
with self._lock:
keys = list(self._registrations)
for key in keys:
with self._lock:
callbacks = set(self._registrations[key])
in_flight.add_metric(key, len(callbacks))
metrics = self._metrics_class()
metrics_by_key[key] = metrics
for callback in callbacks:
callback(metrics)
yield in_flight
for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
def _register_with_collector(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
#
# Detailed CPU metrics
#

View File

@ -15,6 +15,8 @@
# limitations under the License.
import logging
import six
from prometheus_client import Counter
from twisted.internet import defer
@ -26,6 +28,9 @@ from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
if six.PY3:
long = int
logger = logging.getLogger(__name__)
http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
@ -96,7 +101,7 @@ class HttpPusher(object):
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
yield self._process()
@defer.inlineCallbacks

View File

@ -17,10 +17,11 @@ import email.mime.multipart
import email.utils
import logging
import time
import urllib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from six.moves import urllib
import bleach
import jinja2
@ -474,7 +475,7 @@ class Mailer(object):
# XXX: make r0 once API is stable
return "%s_matrix/client/unstable/pushers/remove?%s" % (
self.hs.config.public_baseurl,
urllib.urlencode(params),
urllib.parse.urlencode(params),
)
@ -561,7 +562,7 @@ def _create_mxc_to_http_filter(config):
return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
config.public_baseurl,
serverAndMediaId,
urllib.urlencode(params),
urllib.parse.urlencode(params),
fragment or "",
)

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
@ -21,6 +23,13 @@ from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
def __func__(inp):
if six.PY3:
return inp
else:
return inp.__func__
class SlavedDeviceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceStore, self).__init__(db_conn, hs)
@ -38,14 +47,14 @@ class SlavedDeviceStore(BaseSlavedStore):
"DeviceListFederationStreamChangeCache", device_list_max,
)
get_device_stream_token = DataStore.get_device_stream_token.__func__
get_user_whose_devices_changed = DataStore.get_user_whose_devices_changed.__func__
get_devices_by_remote = DataStore.get_devices_by_remote.__func__
_get_devices_by_remote_txn = DataStore._get_devices_by_remote_txn.__func__
_get_e2e_device_keys_txn = DataStore._get_e2e_device_keys_txn.__func__
mark_as_sent_devices_by_remote = DataStore.mark_as_sent_devices_by_remote.__func__
get_device_stream_token = __func__(DataStore.get_device_stream_token)
get_user_whose_devices_changed = __func__(DataStore.get_user_whose_devices_changed)
get_devices_by_remote = __func__(DataStore.get_devices_by_remote)
_get_devices_by_remote_txn = __func__(DataStore._get_devices_by_remote_txn)
_get_e2e_device_keys_txn = __func__(DataStore._get_e2e_device_keys_txn)
mark_as_sent_devices_by_remote = __func__(DataStore.mark_as_sent_devices_by_remote)
_mark_as_sent_devices_by_remote_txn = (
DataStore._mark_as_sent_devices_by_remote_txn.__func__
__func__(DataStore._mark_as_sent_devices_by_remote_txn)
)
count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]

View File

@ -196,7 +196,7 @@ class Stream(object):
)
if len(rows) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behined" % (self.NAME))
raise Exception("stream %s has fallen behind" % (self.NAME))
else:
rows = yield self.update_function(
from_token, current_token,

View File

@ -20,6 +20,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
@ -45,6 +46,13 @@ block_db_txn_duration = Counter(
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
# Tracks the number of blocks currently active
in_flight = InFlightGauge(
"synapse_util_metrics_block_in_flight", "",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
)
def measure_func(name):
def wrapper(func):
@ -82,10 +90,14 @@ class Measure(object):
self.start_usage = self.start_context.get_resource_usage()
in_flight.register((self.name,), self._update_in_flight)
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return
in_flight.unregister((self.name,), self._update_in_flight)
duration = self.clock.time() - self.start
block_counter.labels(self.name).inc()
@ -120,3 +132,13 @@ class Measure(object):
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)
def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics
"""
duration = self.clock.time() - self.start
metrics.real_time_max = max(metrics.real_time_max, duration)
metrics.real_time_sum += duration
# TODO: Add other in flight metrics.

81
tests/test_metrics.py Normal file
View File

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.metrics import InFlightGauge
from tests import unittest
class TestMauLimit(unittest.TestCase):
def test_basic(self):
gauge = InFlightGauge(
"test1", "",
labels=["test_label"],
sub_metrics=["foo", "bar"],
)
def handle1(metrics):
metrics.foo += 2
metrics.bar = max(metrics.bar, 5)
def handle2(metrics):
metrics.foo += 3
metrics.bar = max(metrics.bar, 7)
gauge.register(("key1",), handle1)
self.assert_dict({
"test1_total": {("key1",): 1},
"test1_foo": {("key1",): 2},
"test1_bar": {("key1",): 5},
}, self.get_metrics_from_gauge(gauge))
gauge.unregister(("key1",), handle1)
self.assert_dict({
"test1_total": {("key1",): 0},
"test1_foo": {("key1",): 0},
"test1_bar": {("key1",): 0},
}, self.get_metrics_from_gauge(gauge))
gauge.register(("key1",), handle1)
gauge.register(("key2",), handle2)
self.assert_dict({
"test1_total": {("key1",): 1, ("key2",): 1},
"test1_foo": {("key1",): 2, ("key2",): 3},
"test1_bar": {("key1",): 5, ("key2",): 7},
}, self.get_metrics_from_gauge(gauge))
gauge.unregister(("key2",), handle2)
gauge.register(("key1",), handle2)
self.assert_dict({
"test1_total": {("key1",): 2, ("key2",): 0},
"test1_foo": {("key1",): 5, ("key2",): 0},
"test1_bar": {("key1",): 7, ("key2",): 0},
}, self.get_metrics_from_gauge(gauge))
def get_metrics_from_gauge(self, gauge):
results = {}
for r in gauge.collect():
results[r.name] = {
tuple(labels[x] for x in gauge.labels): value
for _, labels, value in r.samples
}
return results

44
tox.ini
View File

@ -64,49 +64,11 @@ setenv =
{[base]setenv}
SYNAPSE_POSTGRES = 1
[testenv:py35]
usedevelop=true
[testenv:py36]
usedevelop=true
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \
tests/api/test_filtering.py \
tests/api/test_ratelimiting.py \
tests/appservice \
tests/crypto \
tests/events \
tests/handlers/test_appservice.py \
tests/handlers/test_auth.py \
tests/handlers/test_device.py \
tests/handlers/test_directory.py \
tests/handlers/test_e2e_keys.py \
tests/handlers/test_presence.py \
tests/handlers/test_profile.py \
tests/handlers/test_register.py \
tests/replication/slave/storage/test_account_data.py \
tests/replication/slave/storage/test_receipts.py \
tests/storage/test_appservice.py \
tests/storage/test_background_update.py \
tests/storage/test_base.py \
tests/storage/test__base.py \
tests/storage/test_client_ips.py \
tests/storage/test_devices.py \
tests/storage/test_end_to_end_keys.py \
tests/storage/test_event_push_actions.py \
tests/storage/test_keys.py \
tests/storage/test_presence.py \
tests/storage/test_profile.py \
tests/storage/test_registration.py \
tests/storage/test_room.py \
tests/storage/test_user_directory.py \
tests/test_distributor.py \
tests/test_dns.py \
tests/test_preview.py \
tests/test_test_utils.py \
tests/test_types.py \
tests/util} \
{env:TOXSUFFIX:}
{env:DUMP_COVERAGE_COMMAND:coverage report -m}
[testenv:packaging]
deps =