Compare commits
5 Commits
f86962cb6b
...
7b66a1f0d9
| Author | SHA1 | Date |
|---|---|---|
|
|
7b66a1f0d9 | |
|
|
e913823a22 | |
|
|
059e91bdce | |
|
|
8c75667ad7 | |
|
|
443162e577 |
|
|
@ -0,0 +1 @@
|
||||||
|
Add prometheus metrics for the number of active pushers.
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Merge worker apps together.
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Add prometheus metrics for the number of active pushers.
|
||||||
|
|
@ -276,6 +276,7 @@ def start(hs, listeners=None):
|
||||||
# It is now safe to start your Synapse.
|
# It is now safe to start your Synapse.
|
||||||
hs.start_listening(listeners)
|
hs.start_listening(listeners)
|
||||||
hs.get_datastore().db.start_profiling()
|
hs.get_datastore().db.start_profiling()
|
||||||
|
hs.get_pusherpool().start()
|
||||||
|
|
||||||
setup_sentry(hs)
|
setup_sentry(hs)
|
||||||
setup_sdnotify(hs)
|
setup_sdnotify(hs)
|
||||||
|
|
|
||||||
|
|
@ -408,7 +408,6 @@ def setup(config_options):
|
||||||
|
|
||||||
_base.start(hs, config.listeners)
|
_base.start(hs, config.listeners)
|
||||||
|
|
||||||
hs.get_pusherpool().start()
|
|
||||||
hs.get_datastore().db.updates.start_doing_background_updates()
|
hs.get_datastore().db.updates.start_doing_background_updates()
|
||||||
except Exception:
|
except Exception:
|
||||||
# Print the exception and bail out.
|
# Print the exception and bail out.
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import os
|
||||||
import platform
|
import platform
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from typing import Dict, Union
|
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
|
@ -59,10 +59,12 @@ class RegistryProxy(object):
|
||||||
@attr.s(hash=True)
|
@attr.s(hash=True)
|
||||||
class LaterGauge(object):
|
class LaterGauge(object):
|
||||||
|
|
||||||
name = attr.ib()
|
name = attr.ib(type=str)
|
||||||
desc = attr.ib()
|
desc = attr.ib(type=str)
|
||||||
labels = attr.ib(hash=False)
|
labels = attr.ib(hash=False, type=Optional[Iterable[str]])
|
||||||
caller = attr.ib()
|
# callback: should either return a value (if there are no labels for this metric),
|
||||||
|
# or dict mapping from a label tuple to a value
|
||||||
|
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
|
||||||
|
|
||||||
def collect(self):
|
def collect(self):
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import logging
|
||||||
import threading
|
import threading
|
||||||
from asyncio import iscoroutine
|
from asyncio import iscoroutine
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
from typing import Dict, Set
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
|
@ -80,13 +81,13 @@ _background_process_db_sched_duration = Counter(
|
||||||
# map from description to a counter, so that we can name our logcontexts
|
# map from description to a counter, so that we can name our logcontexts
|
||||||
# incrementally. (It actually duplicates _background_process_start_count, but
|
# incrementally. (It actually duplicates _background_process_start_count, but
|
||||||
# it's much simpler to do so than to try to combine them.)
|
# it's much simpler to do so than to try to combine them.)
|
||||||
_background_process_counts = {} # type: dict[str, int]
|
_background_process_counts = {} # type: Dict[str, int]
|
||||||
|
|
||||||
# map from description to the currently running background processes.
|
# map from description to the currently running background processes.
|
||||||
#
|
#
|
||||||
# it's kept as a dict of sets rather than a big set so that we can keep track
|
# it's kept as a dict of sets rather than a big set so that we can keep track
|
||||||
# of process descriptions that no longer have any active processes.
|
# of process descriptions that no longer have any active processes.
|
||||||
_background_processes = {} # type: dict[str, set[_BackgroundProcess]]
|
_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
|
||||||
|
|
||||||
# A lock that covers the above dicts
|
# A lock that covers the above dicts
|
||||||
_bg_metrics_lock = threading.Lock()
|
_bg_metrics_lock = threading.Lock()
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,17 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from collections import defaultdict
|
||||||
|
from threading import Lock
|
||||||
|
from typing import Dict, Tuple, Union
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.push import PusherConfigException
|
from synapse.push import PusherConfigException
|
||||||
|
from synapse.push.emailpusher import EmailPusher
|
||||||
|
from synapse.push.httppusher import HttpPusher
|
||||||
from synapse.push.pusher import PusherFactory
|
from synapse.push.pusher import PusherFactory
|
||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
|
|
||||||
|
|
@ -47,7 +53,29 @@ class PusherPool:
|
||||||
self._should_start_pushers = _hs.config.start_pushers
|
self._should_start_pushers = _hs.config.start_pushers
|
||||||
self.store = self.hs.get_datastore()
|
self.store = self.hs.get_datastore()
|
||||||
self.clock = self.hs.get_clock()
|
self.clock = self.hs.get_clock()
|
||||||
self.pushers = {}
|
|
||||||
|
# map from user id to app_id:pushkey to pusher
|
||||||
|
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
||||||
|
|
||||||
|
# a lock for the pushers dict, since `count_pushers` is called from an different
|
||||||
|
# and we otherwise get concurrent modification errors
|
||||||
|
self._pushers_lock = Lock()
|
||||||
|
|
||||||
|
def count_pushers():
|
||||||
|
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
|
||||||
|
with self._pushers_lock:
|
||||||
|
for pushers in self.pushers.values():
|
||||||
|
for pusher in pushers.values():
|
||||||
|
k = (type(pusher).__name__, pusher.app_id)
|
||||||
|
results[k] += 1
|
||||||
|
return results
|
||||||
|
|
||||||
|
LaterGauge(
|
||||||
|
name="synapse_pushers",
|
||||||
|
desc="the number of active pushers",
|
||||||
|
labels=["kind", "app_id"],
|
||||||
|
caller=count_pushers,
|
||||||
|
)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Starts the pushers off in a background process.
|
"""Starts the pushers off in a background process.
|
||||||
|
|
@ -271,11 +299,12 @@ class PusherPool:
|
||||||
return
|
return
|
||||||
|
|
||||||
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
|
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
|
||||||
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
|
|
||||||
|
|
||||||
if appid_pushkey in byuser:
|
with self._pushers_lock:
|
||||||
byuser[appid_pushkey].on_stop()
|
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
|
||||||
byuser[appid_pushkey] = p
|
if appid_pushkey in byuser:
|
||||||
|
byuser[appid_pushkey].on_stop()
|
||||||
|
byuser[appid_pushkey] = p
|
||||||
|
|
||||||
# Check if there *may* be push to process. We do this as this check is a
|
# Check if there *may* be push to process. We do this as this check is a
|
||||||
# lot cheaper to do than actually fetching the exact rows we need to
|
# lot cheaper to do than actually fetching the exact rows we need to
|
||||||
|
|
@ -304,7 +333,9 @@ class PusherPool:
|
||||||
if appid_pushkey in byuser:
|
if appid_pushkey in byuser:
|
||||||
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
|
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
|
||||||
byuser[appid_pushkey].on_stop()
|
byuser[appid_pushkey].on_stop()
|
||||||
del byuser[appid_pushkey]
|
with self._pushers_lock:
|
||||||
|
del byuser[appid_pushkey]
|
||||||
|
|
||||||
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
||||||
app_id, pushkey, user_id
|
app_id, pushkey, user_id
|
||||||
)
|
)
|
||||||
|
|
|
||||||
2
tox.ini
2
tox.ini
|
|
@ -191,7 +191,9 @@ commands = mypy \
|
||||||
synapse/handlers/sync.py \
|
synapse/handlers/sync.py \
|
||||||
synapse/handlers/ui_auth \
|
synapse/handlers/ui_auth \
|
||||||
synapse/logging/ \
|
synapse/logging/ \
|
||||||
|
synapse/metrics \
|
||||||
synapse/module_api \
|
synapse/module_api \
|
||||||
|
synapse/push/pusherpool.py \
|
||||||
synapse/replication \
|
synapse/replication \
|
||||||
synapse/rest \
|
synapse/rest \
|
||||||
synapse/spam_checker_api \
|
synapse/spam_checker_api \
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue