Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

pull/8675/head
Richard van der Hoff 2020-03-19 10:29:20 +00:00
commit 7b66a1f0d9
2 changed files with 19 additions and 9 deletions

1
changelog.d/7106.feature Normal file
View File

@ -0,0 +1 @@
Add prometheus metrics for the number of active pushers.

View File

@ -16,6 +16,7 @@
import logging import logging
from collections import defaultdict from collections import defaultdict
from threading import Lock
from typing import Dict, Tuple, Union from typing import Dict, Tuple, Union
from twisted.internet import defer from twisted.internet import defer
@ -56,8 +57,13 @@ class PusherPool:
# map from user id to app_id:pushkey to pusher # map from user id to app_id:pushkey to pusher
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
# a lock for the pushers dict, since `count_pushers` is called from an different
# and we otherwise get concurrent modification errors
self._pushers_lock = Lock()
def count_pushers(): def count_pushers():
results = defaultdict(int) # type: Dict[Tuple[str, str], int] results = defaultdict(int) # type: Dict[Tuple[str, str], int]
with self._pushers_lock:
for pushers in self.pushers.values(): for pushers in self.pushers.values():
for pusher in pushers.values(): for pusher in pushers.values():
k = (type(pusher).__name__, pusher.app_id) k = (type(pusher).__name__, pusher.app_id)
@ -293,8 +299,9 @@ class PusherPool:
return return
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"]) appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
with self._pushers_lock:
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
if appid_pushkey in byuser: if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop() byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p byuser[appid_pushkey] = p
@ -326,7 +333,9 @@ class PusherPool:
if appid_pushkey in byuser: if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
byuser[appid_pushkey].on_stop() byuser[appid_pushkey].on_stop()
with self._pushers_lock:
del byuser[appid_pushkey] del byuser[appid_pushkey]
yield self.store.delete_pusher_by_app_id_pushkey_user_id( yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id app_id, pushkey, user_id
) )