Compare commits

..

No commits in common. "7b66a1f0d9b7ec0aaa9809880aaa05a0d3df00ea" and "f86962cb6b8fc27620b0b8899809380057925032" have entirely different histories.

9 changed files with 14 additions and 53 deletions

View File

@ -1 +0,0 @@
Add prometheus metrics for the number of active pushers.

View File

@ -1 +0,0 @@
Merge worker apps together.

View File

@ -1 +0,0 @@
Add prometheus metrics for the number of active pushers.

View File

@ -276,7 +276,6 @@ def start(hs, listeners=None):
# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().db.start_profiling()
hs.get_pusherpool().start()
setup_sentry(hs)
setup_sdnotify(hs)

View File

@ -408,6 +408,7 @@ def setup(config_options):
_base.start(hs, config.listeners)
hs.get_pusherpool().start()
hs.get_datastore().db.updates.start_doing_background_updates()
except Exception:
# Print the exception and bail out.

View File

@ -20,7 +20,7 @@ import os
import platform
import threading
import time
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
from typing import Dict, Union
import six
@ -59,12 +59,10 @@ class RegistryProxy(object):
@attr.s(hash=True)
class LaterGauge(object):
name = attr.ib(type=str)
desc = attr.ib(type=str)
labels = attr.ib(hash=False, type=Optional[Iterable[str]])
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
name = attr.ib()
desc = attr.ib()
labels = attr.ib(hash=False)
caller = attr.ib()
def collect(self):

View File

@ -17,7 +17,6 @@ import logging
import threading
from asyncio import iscoroutine
from functools import wraps
from typing import Dict, Set
import six
@ -81,13 +80,13 @@ _background_process_db_sched_duration = Counter(
# map from description to a counter, so that we can name our logcontexts
# incrementally. (It actually duplicates _background_process_start_count, but
# it's much simpler to do so than to try to combine them.)
_background_process_counts = {} # type: Dict[str, int]
_background_process_counts = {} # type: dict[str, int]
# map from description to the currently running background processes.
#
# it's kept as a dict of sets rather than a big set so that we can keep track
# of process descriptions that no longer have any active processes.
_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
_background_processes = {} # type: dict[str, set[_BackgroundProcess]]
# A lock that covers the above dicts
_bg_metrics_lock = threading.Lock()

View File

@ -15,17 +15,11 @@
# limitations under the License.
import logging
from collections import defaultdict
from threading import Lock
from typing import Dict, Tuple, Union
from twisted.internet import defer
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute
@ -53,29 +47,7 @@ class PusherPool:
self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
# map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
# a lock for the pushers dict, since `count_pushers` is called from an different
# and we otherwise get concurrent modification errors
self._pushers_lock = Lock()
def count_pushers():
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
with self._pushers_lock:
for pushers in self.pushers.values():
for pusher in pushers.values():
k = (type(pusher).__name__, pusher.app_id)
results[k] += 1
return results
LaterGauge(
name="synapse_pushers",
desc="the number of active pushers",
labels=["kind", "app_id"],
caller=count_pushers,
)
self.pushers = {}
def start(self):
"""Starts the pushers off in a background process.
@ -299,12 +271,11 @@ class PusherPool:
return
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
with self._pushers_lock:
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
@ -333,9 +304,7 @@ class PusherPool:
if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
byuser[appid_pushkey].on_stop()
with self._pushers_lock:
del byuser[appid_pushkey]
del byuser[appid_pushkey]
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)

View File

@ -191,9 +191,7 @@ commands = mypy \
synapse/handlers/sync.py \
synapse/handlers/ui_auth \
synapse/logging/ \
synapse/metrics \
synapse/module_api \
synapse/push/pusherpool.py \
synapse/replication \
synapse/rest \
synapse/spam_checker_api \