Fix concurrent modification errors in pusher metrics (#7106)
add a lock to try to make this metric actually workpull/7107/head
parent
8c75667ad7
commit
e913823a22
|
@ -0,0 +1 @@
|
||||||
|
Add prometheus metrics for the number of active pushers.
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
from threading import Lock
|
||||||
from typing import Dict, Tuple, Union
|
from typing import Dict, Tuple, Union
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
@ -56,12 +57,17 @@ class PusherPool:
|
||||||
# map from user id to app_id:pushkey to pusher
|
# map from user id to app_id:pushkey to pusher
|
||||||
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
||||||
|
|
||||||
|
# a lock for the pushers dict, since `count_pushers` is called from an different
|
||||||
|
# and we otherwise get concurrent modification errors
|
||||||
|
self._pushers_lock = Lock()
|
||||||
|
|
||||||
def count_pushers():
|
def count_pushers():
|
||||||
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
|
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
|
||||||
for pushers in self.pushers.values():
|
with self._pushers_lock:
|
||||||
for pusher in pushers.values():
|
for pushers in self.pushers.values():
|
||||||
k = (type(pusher).__name__, pusher.app_id)
|
for pusher in pushers.values():
|
||||||
results[k] += 1
|
k = (type(pusher).__name__, pusher.app_id)
|
||||||
|
results[k] += 1
|
||||||
return results
|
return results
|
||||||
|
|
||||||
LaterGauge(
|
LaterGauge(
|
||||||
|
@ -293,11 +299,12 @@ class PusherPool:
|
||||||
return
|
return
|
||||||
|
|
||||||
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
|
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
|
||||||
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
|
|
||||||
|
|
||||||
if appid_pushkey in byuser:
|
with self._pushers_lock:
|
||||||
byuser[appid_pushkey].on_stop()
|
byuser = self.pushers.setdefault(pusherdict["user_name"], {})
|
||||||
byuser[appid_pushkey] = p
|
if appid_pushkey in byuser:
|
||||||
|
byuser[appid_pushkey].on_stop()
|
||||||
|
byuser[appid_pushkey] = p
|
||||||
|
|
||||||
# Check if there *may* be push to process. We do this as this check is a
|
# Check if there *may* be push to process. We do this as this check is a
|
||||||
# lot cheaper to do than actually fetching the exact rows we need to
|
# lot cheaper to do than actually fetching the exact rows we need to
|
||||||
|
@ -326,7 +333,9 @@ class PusherPool:
|
||||||
if appid_pushkey in byuser:
|
if appid_pushkey in byuser:
|
||||||
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
|
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
|
||||||
byuser[appid_pushkey].on_stop()
|
byuser[appid_pushkey].on_stop()
|
||||||
del byuser[appid_pushkey]
|
with self._pushers_lock:
|
||||||
|
del byuser[appid_pushkey]
|
||||||
|
|
||||||
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
||||||
app_id, pushkey, user_id
|
app_id, pushkey, user_id
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue