fix race condiftion in calling initialise_reserved_users
parent
7aea00069c
commit
6105c6101f
|
@ -0,0 +1,2 @@
|
||||||
|
Fix race condition in populating reserved users
|
||||||
|
|
|
@ -553,14 +553,6 @@ def run(hs):
|
||||||
generate_monthly_active_users,
|
generate_monthly_active_users,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX is this really supposed to be a background process? it looks
|
|
||||||
# like it needs to complete before some of the other stuff runs.
|
|
||||||
run_as_background_process(
|
|
||||||
"initialise_reserved_users",
|
|
||||||
hs.get_datastore().initialise_reserved_users,
|
|
||||||
hs.config.mau_limits_reserved_threepids,
|
|
||||||
)
|
|
||||||
|
|
||||||
start_generate_monthly_active_users()
|
start_generate_monthly_active_users()
|
||||||
if hs.config.limit_usage_by_mau:
|
if hs.config.limit_usage_by_mau:
|
||||||
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
|
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
|
||||||
|
|
|
@ -33,19 +33,28 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.reserved_users = ()
|
self.reserved_users = ()
|
||||||
|
self.initialise_reserved_users(
|
||||||
|
dbconn.cursor(), hs.config.mau_limits_reserved_threepids
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
def initialise_reserved_users(self, txn, threepids):
|
||||||
def initialise_reserved_users(self, threepids):
|
"""
|
||||||
store = self.hs.get_datastore()
|
Ensures that reserved threepids are accounted for in the MAU table, should
|
||||||
|
be called on start up.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
threepids []: List of threepid dicts to reserve
|
||||||
|
"""
|
||||||
reserved_user_list = []
|
reserved_user_list = []
|
||||||
|
|
||||||
# Do not add more reserved users than the total allowable number
|
# Do not add more reserved users than the total allowable number
|
||||||
for tp in threepids[:self.hs.config.max_mau_value]:
|
for tp in threepids[:self.hs.config.max_mau_value]:
|
||||||
user_id = yield store.get_user_id_by_threepid(
|
user_id = self.get_user_id_by_threepid_txn(
|
||||||
|
txn,
|
||||||
tp["medium"], tp["address"]
|
tp["medium"], tp["address"]
|
||||||
)
|
)
|
||||||
if user_id:
|
if user_id:
|
||||||
yield self.upsert_monthly_active_user(user_id)
|
self.upsert_monthly_active_user_txn(txn, user_id)
|
||||||
reserved_user_list.append(user_id)
|
reserved_user_list.append(user_id)
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
@ -55,8 +64,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def reap_monthly_active_users(self):
|
def reap_monthly_active_users(self):
|
||||||
"""
|
"""Cleans out monthly active user table to ensure that no stale
|
||||||
Cleans out monthly active user table to ensure that no stale
|
|
||||||
entries exist.
|
entries exist.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -165,19 +173,33 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def upsert_monthly_active_user(self, user_id):
|
def upsert_monthly_active_user(self, user_id):
|
||||||
|
"""Updates or inserts monthly active user member
|
||||||
|
Arguments:
|
||||||
|
user_id (str): user to add/update
|
||||||
|
"""
|
||||||
|
is_insert = yield self.runInteraction(
|
||||||
|
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
if is_insert:
|
||||||
|
self.user_last_seen_monthly_active.invalidate((user_id,))
|
||||||
|
self.get_monthly_active_count.invalidate(())
|
||||||
|
|
||||||
|
def upsert_monthly_active_user_txn(self, txn, user_id):
|
||||||
"""
|
"""
|
||||||
Updates or inserts monthly active user member
|
Updates or inserts monthly active user member
|
||||||
Arguments:
|
Arguments:
|
||||||
|
txn (cursor):
|
||||||
user_id (str): user to add/update
|
user_id (str): user to add/update
|
||||||
Deferred[bool]: True if a new entry was created, False if an
|
bool: True if a new entry was created, False if an
|
||||||
existing one was updated.
|
existing one was updated.
|
||||||
"""
|
"""
|
||||||
# Am consciously deciding to lock the table on the basis that is ought
|
# Am consciously deciding to lock the table on the basis that is ought
|
||||||
# never be a big table and alternative approaches (batching multiple
|
# never be a big table and alternative approaches (batching multiple
|
||||||
# upserts into a single txn) introduced a lot of extra complexity.
|
# upserts into a single txn) introduced a lot of extra complexity.
|
||||||
# See https://github.com/matrix-org/synapse/issues/3854 for more
|
# See https://github.com/matrix-org/synapse/issues/3854 for more
|
||||||
is_insert = yield self._simple_upsert(
|
is_insert = self._simple_upsert_txn(
|
||||||
desc="upsert_monthly_active_user",
|
txn,
|
||||||
table="monthly_active_users",
|
table="monthly_active_users",
|
||||||
keyvalues={
|
keyvalues={
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
|
@ -186,9 +208,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
||||||
"timestamp": int(self._clock.time_msec()),
|
"timestamp": int(self._clock.time_msec()),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if is_insert:
|
return is_insert
|
||||||
self.user_last_seen_monthly_active.invalidate((user_id,))
|
|
||||||
self.get_monthly_active_count.invalidate(())
|
|
||||||
|
|
||||||
@cached(num_args=1)
|
@cached(num_args=1)
|
||||||
def user_last_seen_monthly_active(self, user_id):
|
def user_last_seen_monthly_active(self, user_id):
|
||||||
|
|
|
@ -474,17 +474,25 @@ class RegistrationStore(RegistrationWorkerStore,
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_user_id_by_threepid(self, medium, address):
|
def get_user_id_by_threepid(self, medium, address):
|
||||||
ret = yield self._simple_select_one(
|
user_id = yield self.runInteraction(
|
||||||
|
"get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
|
||||||
|
medium, address
|
||||||
|
)
|
||||||
|
defer.returnValue(user_id)
|
||||||
|
|
||||||
|
def get_user_id_by_threepid_txn(self, txn, medium, address):
|
||||||
|
ret = self._simple_select_one_txn(
|
||||||
|
txn,
|
||||||
"user_threepids",
|
"user_threepids",
|
||||||
{
|
{
|
||||||
"medium": medium,
|
"medium": medium,
|
||||||
"address": address
|
"address": address
|
||||||
},
|
},
|
||||||
['user_id'], True, 'get_user_id_by_threepid'
|
['user_id'], True
|
||||||
)
|
)
|
||||||
if ret:
|
if ret:
|
||||||
defer.returnValue(ret['user_id'])
|
return ret['user_id']
|
||||||
defer.returnValue(None)
|
return None
|
||||||
|
|
||||||
def user_delete_threepid(self, user_id, medium, address):
|
def user_delete_threepid(self, user_id, medium, address):
|
||||||
return self._simple_delete(
|
return self._simple_delete(
|
||||||
|
|
|
@ -52,7 +52,10 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase):
|
||||||
now = int(self.hs.get_clock().time_msec())
|
now = int(self.hs.get_clock().time_msec())
|
||||||
self.store.user_add_threepid(user1, "email", user1_email, now, now)
|
self.store.user_add_threepid(user1, "email", user1_email, now, now)
|
||||||
self.store.user_add_threepid(user2, "email", user2_email, now, now)
|
self.store.user_add_threepid(user2, "email", user2_email, now, now)
|
||||||
self.store.initialise_reserved_users(threepids)
|
|
||||||
|
self.store.runInteraction(
|
||||||
|
"initialise", self.store.initialise_reserved_users, threepids
|
||||||
|
)
|
||||||
self.pump()
|
self.pump()
|
||||||
|
|
||||||
active_count = self.store.get_monthly_active_count()
|
active_count = self.store.get_monthly_active_count()
|
||||||
|
@ -199,7 +202,10 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase):
|
||||||
{'medium': 'email', 'address': user2_email},
|
{'medium': 'email', 'address': user2_email},
|
||||||
]
|
]
|
||||||
self.hs.config.mau_limits_reserved_threepids = threepids
|
self.hs.config.mau_limits_reserved_threepids = threepids
|
||||||
self.store.initialise_reserved_users(threepids)
|
self.store.runInteraction(
|
||||||
|
"initialise", self.store.initialise_reserved_users, threepids
|
||||||
|
)
|
||||||
|
|
||||||
self.pump()
|
self.pump()
|
||||||
count = self.store.get_registered_reserved_users_count()
|
count = self.store.get_registered_reserved_users_count()
|
||||||
self.assertEquals(self.get_success(count), 0)
|
self.assertEquals(self.get_success(count), 0)
|
||||||
|
|
Loading…
Reference in New Issue