Compare commits

...

5 Commits

Author SHA1 Message Date
Richard van der Hoff 7b66a1f0d9 Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes 2020-03-19 10:29:20 +00:00
Richard van der Hoff e913823a22
Fix concurrent modification errors in pusher metrics (#7106)
add a lock to try to make this metric actually work
2020-03-19 10:28:49 +00:00
Richard van der Hoff 059e91bdce Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes 2020-03-19 10:03:10 +00:00
Richard van der Hoff 8c75667ad7
Add prometheus metrics for the number of active pushers (#7103) 2020-03-19 10:00:24 +00:00
Richard van der Hoff 443162e577
Move pusherpool startup into _base.setup (#7104)
This should be safe to do on all workers/masters because it is guarded by
a config option which will ensure it is only actually done on the worker
assigned as a pusher.
2020-03-19 09:48:45 +00:00
9 changed files with 53 additions and 14 deletions

1
changelog.d/7103.feature Normal file
View File

@ -0,0 +1 @@
Add prometheus metrics for the number of active pushers.

1
changelog.d/7104.misc Normal file
View File

@ -0,0 +1 @@
Merge worker apps together.

1
changelog.d/7106.feature Normal file
View File

@ -0,0 +1 @@
Add prometheus metrics for the number of active pushers.

View File

@ -276,6 +276,7 @@ def start(hs, listeners=None):
# It is now safe to start your Synapse. # It is now safe to start your Synapse.
hs.start_listening(listeners) hs.start_listening(listeners)
hs.get_datastore().db.start_profiling() hs.get_datastore().db.start_profiling()
hs.get_pusherpool().start()
setup_sentry(hs) setup_sentry(hs)
setup_sdnotify(hs) setup_sdnotify(hs)

View File

@ -408,7 +408,6 @@ def setup(config_options):
_base.start(hs, config.listeners) _base.start(hs, config.listeners)
hs.get_pusherpool().start()
hs.get_datastore().db.updates.start_doing_background_updates() hs.get_datastore().db.updates.start_doing_background_updates()
except Exception: except Exception:
# Print the exception and bail out. # Print the exception and bail out.

View File

@ -20,7 +20,7 @@ import os
import platform import platform
import threading import threading
import time import time
from typing import Dict, Union from typing import Callable, Dict, Iterable, Optional, Tuple, Union
import six import six
@ -59,10 +59,12 @@ class RegistryProxy(object):
@attr.s(hash=True) @attr.s(hash=True)
class LaterGauge(object): class LaterGauge(object):
name = attr.ib() name = attr.ib(type=str)
desc = attr.ib() desc = attr.ib(type=str)
labels = attr.ib(hash=False) labels = attr.ib(hash=False, type=Optional[Iterable[str]])
caller = attr.ib() # callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
def collect(self): def collect(self):

View File

@ -17,6 +17,7 @@ import logging
import threading import threading
from asyncio import iscoroutine from asyncio import iscoroutine
from functools import wraps from functools import wraps
from typing import Dict, Set
import six import six
@ -80,13 +81,13 @@ _background_process_db_sched_duration = Counter(
# map from description to a counter, so that we can name our logcontexts # map from description to a counter, so that we can name our logcontexts
# incrementally. (It actually duplicates _background_process_start_count, but # incrementally. (It actually duplicates _background_process_start_count, but
# it's much simpler to do so than to try to combine them.) # it's much simpler to do so than to try to combine them.)
_background_process_counts = {} # type: dict[str, int] _background_process_counts = {} # type: Dict[str, int]
# map from description to the currently running background processes. # map from description to the currently running background processes.
# #
# it's kept as a dict of sets rather than a big set so that we can keep track # it's kept as a dict of sets rather than a big set so that we can keep track
# of process descriptions that no longer have any active processes. # of process descriptions that no longer have any active processes.
_background_processes = {} # type: dict[str, set[_BackgroundProcess]] _background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
# A lock that covers the above dicts # A lock that covers the above dicts
_bg_metrics_lock = threading.Lock() _bg_metrics_lock = threading.Lock()

View File

@ -15,11 +15,17 @@
# limitations under the License. # limitations under the License.
import logging import logging
from collections import defaultdict
from threading import Lock
from typing import Dict, Tuple, Union
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import concurrently_execute
@ -47,7 +53,29 @@ class PusherPool:
self._should_start_pushers = _hs.config.start_pushers self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore() self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock() self.clock = self.hs.get_clock()
self.pushers = {}
# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
# a lock for the pushers dict, since `count_pushers` is called from an different
# and we otherwise get concurrent modification errors
self._pushers_lock = Lock()
def count_pushers():
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
with self._pushers_lock:
for pushers in self.pushers.values():
for pusher in pushers.values():
k = (type(pusher).__name__, pusher.app_id)
results[k] += 1
return results
LaterGauge(
name="synapse_pushers",
desc="the number of active pushers",
labels=["kind", "app_id"],
caller=count_pushers,
)
def start(self): def start(self):
"""Starts the pushers off in a background process. """Starts the pushers off in a background process.
@ -271,11 +299,12 @@ class PusherPool:
return return
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"]) appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
if appid_pushkey in byuser: with self._pushers_lock:
byuser[appid_pushkey].on_stop() byuser = self.pushers.setdefault(pusherdict["user_name"], {})
byuser[appid_pushkey] = p if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
# Check if there *may* be push to process. We do this as this check is a # Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to # lot cheaper to do than actually fetching the exact rows we need to
@ -304,7 +333,9 @@ class PusherPool:
if appid_pushkey in byuser: if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
byuser[appid_pushkey].on_stop() byuser[appid_pushkey].on_stop()
del byuser[appid_pushkey] with self._pushers_lock:
del byuser[appid_pushkey]
yield self.store.delete_pusher_by_app_id_pushkey_user_id( yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id app_id, pushkey, user_id
) )

View File

@ -191,7 +191,9 @@ commands = mypy \
synapse/handlers/sync.py \ synapse/handlers/sync.py \
synapse/handlers/ui_auth \ synapse/handlers/ui_auth \
synapse/logging/ \ synapse/logging/ \
synapse/metrics \
synapse/module_api \ synapse/module_api \
synapse/push/pusherpool.py \
synapse/replication \ synapse/replication \
synapse/rest \ synapse/rest \
synapse/spam_checker_api \ synapse/spam_checker_api \