Compare commits
5 Commits
f86962cb6b
...
7b66a1f0d9
Author | SHA1 | Date |
---|---|---|
Richard van der Hoff | 7b66a1f0d9 | |
Richard van der Hoff | e913823a22 | |
Richard van der Hoff | 059e91bdce | |
Richard van der Hoff | 8c75667ad7 | |
Richard van der Hoff | 443162e577 |
|
@ -0,0 +1 @@
|
|||
Add prometheus metrics for the number of active pushers.
|
|
@ -0,0 +1 @@
|
|||
Merge worker apps together.
|
|
@ -0,0 +1 @@
|
|||
Add prometheus metrics for the number of active pushers.
|
|
@ -276,6 +276,7 @@ def start(hs, listeners=None):
|
|||
# It is now safe to start your Synapse.
|
||||
hs.start_listening(listeners)
|
||||
hs.get_datastore().db.start_profiling()
|
||||
hs.get_pusherpool().start()
|
||||
|
||||
setup_sentry(hs)
|
||||
setup_sdnotify(hs)
|
||||
|
|
|
@ -408,7 +408,6 @@ def setup(config_options):
|
|||
|
||||
_base.start(hs, config.listeners)
|
||||
|
||||
hs.get_pusherpool().start()
|
||||
hs.get_datastore().db.updates.start_doing_background_updates()
|
||||
except Exception:
|
||||
# Print the exception and bail out.
|
||||
|
|
|
@ -20,7 +20,7 @@ import os
|
|||
import platform
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, Union
|
||||
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
|
||||
|
||||
import six
|
||||
|
||||
|
@ -59,10 +59,12 @@ class RegistryProxy(object):
|
|||
@attr.s(hash=True)
|
||||
class LaterGauge(object):
|
||||
|
||||
name = attr.ib()
|
||||
desc = attr.ib()
|
||||
labels = attr.ib(hash=False)
|
||||
caller = attr.ib()
|
||||
name = attr.ib(type=str)
|
||||
desc = attr.ib(type=str)
|
||||
labels = attr.ib(hash=False, type=Optional[Iterable[str]])
|
||||
# callback: should either return a value (if there are no labels for this metric),
|
||||
# or dict mapping from a label tuple to a value
|
||||
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
|
||||
|
||||
def collect(self):
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import logging
|
|||
import threading
|
||||
from asyncio import iscoroutine
|
||||
from functools import wraps
|
||||
from typing import Dict, Set
|
||||
|
||||
import six
|
||||
|
||||
|
@ -80,13 +81,13 @@ _background_process_db_sched_duration = Counter(
|
|||
# map from description to a counter, so that we can name our logcontexts
|
||||
# incrementally. (It actually duplicates _background_process_start_count, but
|
||||
# it's much simpler to do so than to try to combine them.)
|
||||
_background_process_counts = {} # type: dict[str, int]
|
||||
_background_process_counts = {} # type: Dict[str, int]
|
||||
|
||||
# map from description to the currently running background processes.
|
||||
#
|
||||
# it's kept as a dict of sets rather than a big set so that we can keep track
|
||||
# of process descriptions that no longer have any active processes.
|
||||
_background_processes = {} # type: dict[str, set[_BackgroundProcess]]
|
||||
_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
|
||||
|
||||
# A lock that covers the above dicts
|
||||
_bg_metrics_lock = threading.Lock()
|
||||
|
|
|
@ -15,11 +15,17 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from threading import Lock
|
||||
from typing import Dict, Tuple, Union
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.push import PusherConfigException
|
||||
from synapse.push.emailpusher import EmailPusher
|
||||
from synapse.push.httppusher import HttpPusher
|
||||
from synapse.push.pusher import PusherFactory
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
|
||||
|
@ -47,7 +53,29 @@ class PusherPool:
|
|||
self._should_start_pushers = _hs.config.start_pushers
|
||||
self.store = self.hs.get_datastore()
|
||||
self.clock = self.hs.get_clock()
|
||||
self.pushers = {}
|
||||
|
||||
# map from user id to app_id:pushkey to pusher
|
||||
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
||||
|
||||
# a lock for the pushers dict, since `count_pushers` is called from an different
|
||||
# and we otherwise get concurrent modification errors
|
||||
self._pushers_lock = Lock()
|
||||
|
||||
def count_pushers():
|
||||
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
|
||||
with self._pushers_lock:
|
||||
for pushers in self.pushers.values():
|
||||
for pusher in pushers.values():
|
||||
k = (type(pusher).__name__, pusher.app_id)
|
||||
results[k] += 1
|
||||
return results
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_pushers",
|
||||
desc="the number of active pushers",
|
||||
labels=["kind", "app_id"],
|
||||
caller=count_pushers,
|
||||
)
|
||||
|
||||
def start(self):
|
||||
"""Starts the pushers off in a background process.
|
||||
|
@ -271,11 +299,12 @@ class PusherPool:
|
|||
return
|
||||
|
||||
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
|
||||
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
|
||||
|
||||
if appid_pushkey in byuser:
|
||||
byuser[appid_pushkey].on_stop()
|
||||
byuser[appid_pushkey] = p
|
||||
with self._pushers_lock:
|
||||
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
|
||||
if appid_pushkey in byuser:
|
||||
byuser[appid_pushkey].on_stop()
|
||||
byuser[appid_pushkey] = p
|
||||
|
||||
# Check if there *may* be push to process. We do this as this check is a
|
||||
# lot cheaper to do than actually fetching the exact rows we need to
|
||||
|
@ -304,7 +333,9 @@ class PusherPool:
|
|||
if appid_pushkey in byuser:
|
||||
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
|
||||
byuser[appid_pushkey].on_stop()
|
||||
del byuser[appid_pushkey]
|
||||
with self._pushers_lock:
|
||||
del byuser[appid_pushkey]
|
||||
|
||||
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
||||
app_id, pushkey, user_id
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue