Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

pull/8675/head
Richard van der Hoff 2020-03-19 10:03:10 +00:00
commit 059e91bdce
8 changed files with 38 additions and 9 deletions

1
changelog.d/7103.feature Normal file
View File

@ -0,0 +1 @@
Add prometheus metrics for the number of active pushers.

1
changelog.d/7104.misc Normal file
View File

@ -0,0 +1 @@
Merge worker apps together.

View File

@ -276,6 +276,7 @@ def start(hs, listeners=None):
# It is now safe to start your Synapse. # It is now safe to start your Synapse.
hs.start_listening(listeners) hs.start_listening(listeners)
hs.get_datastore().db.start_profiling() hs.get_datastore().db.start_profiling()
hs.get_pusherpool().start()
setup_sentry(hs) setup_sentry(hs)
setup_sdnotify(hs) setup_sdnotify(hs)

View File

@ -408,7 +408,6 @@ def setup(config_options):
_base.start(hs, config.listeners) _base.start(hs, config.listeners)
hs.get_pusherpool().start()
hs.get_datastore().db.updates.start_doing_background_updates() hs.get_datastore().db.updates.start_doing_background_updates()
except Exception: except Exception:
# Print the exception and bail out. # Print the exception and bail out.

View File

@ -20,7 +20,7 @@ import os
import platform import platform
import threading import threading
import time import time
from typing import Dict, Union from typing import Callable, Dict, Iterable, Optional, Tuple, Union
import six import six
@ -59,10 +59,12 @@ class RegistryProxy(object):
@attr.s(hash=True) @attr.s(hash=True)
class LaterGauge(object): class LaterGauge(object):
name = attr.ib() name = attr.ib(type=str)
desc = attr.ib() desc = attr.ib(type=str)
labels = attr.ib(hash=False) labels = attr.ib(hash=False, type=Optional[Iterable[str]])
caller = attr.ib() # callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
def collect(self): def collect(self):

View File

@ -17,6 +17,7 @@ import logging
import threading import threading
from asyncio import iscoroutine from asyncio import iscoroutine
from functools import wraps from functools import wraps
from typing import Dict, Set
import six import six
@ -80,13 +81,13 @@ _background_process_db_sched_duration = Counter(
# map from description to a counter, so that we can name our logcontexts # map from description to a counter, so that we can name our logcontexts
# incrementally. (It actually duplicates _background_process_start_count, but # incrementally. (It actually duplicates _background_process_start_count, but
# it's much simpler to do so than to try to combine them.) # it's much simpler to do so than to try to combine them.)
_background_process_counts = {} # type: dict[str, int] _background_process_counts = {} # type: Dict[str, int]
# map from description to the currently running background processes. # map from description to the currently running background processes.
# #
# it's kept as a dict of sets rather than a big set so that we can keep track # it's kept as a dict of sets rather than a big set so that we can keep track
# of process descriptions that no longer have any active processes. # of process descriptions that no longer have any active processes.
_background_processes = {} # type: dict[str, set[_BackgroundProcess]] _background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
# A lock that covers the above dicts # A lock that covers the above dicts
_bg_metrics_lock = threading.Lock() _bg_metrics_lock = threading.Lock()

View File

@ -15,11 +15,16 @@
# limitations under the License. # limitations under the License.
import logging import logging
from collections import defaultdict
from typing import Dict, Tuple, Union
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import concurrently_execute
@ -47,7 +52,24 @@ class PusherPool:
self._should_start_pushers = _hs.config.start_pushers self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore() self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock() self.clock = self.hs.get_clock()
self.pushers = {}
# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
def count_pushers():
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
for pushers in self.pushers.values():
for pusher in pushers.values():
k = (type(pusher).__name__, pusher.app_id)
results[k] += 1
return results
LaterGauge(
name="synapse_pushers",
desc="the number of active pushers",
labels=["kind", "app_id"],
caller=count_pushers,
)
def start(self): def start(self):
"""Starts the pushers off in a background process. """Starts the pushers off in a background process.

View File

@ -191,7 +191,9 @@ commands = mypy \
synapse/handlers/sync.py \ synapse/handlers/sync.py \
synapse/handlers/ui_auth \ synapse/handlers/ui_auth \
synapse/logging/ \ synapse/logging/ \
synapse/metrics \
synapse/module_api \ synapse/module_api \
synapse/push/pusherpool.py \
synapse/replication \ synapse/replication \
synapse/rest \ synapse/rest \
synapse/spam_checker_api \ synapse/spam_checker_api \