439 lines
17 KiB
Python
439 lines
17 KiB
Python
# Copyright 2018 New Vector
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import logging
|
|
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, cast
|
|
|
|
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
|
from synapse.storage.database import (
|
|
DatabasePool,
|
|
LoggingDatabaseConnection,
|
|
LoggingTransaction,
|
|
make_in_list_sql_clause,
|
|
)
|
|
from synapse.storage.databases.main.registration import RegistrationWorkerStore
|
|
from synapse.util.caches.descriptors import cached
|
|
from synapse.util.threepids import canonicalise_email
|
|
|
|
if TYPE_CHECKING:
|
|
from synapse.server import HomeServer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Number of msec of granularity to store the monthly_active_user timestamp
|
|
# This means it is not necessary to update the table on every request
|
|
LAST_SEEN_GRANULARITY = 60 * 60 * 1000
|
|
|
|
|
|
class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
|
|
def __init__(
|
|
self,
|
|
database: DatabasePool,
|
|
db_conn: LoggingDatabaseConnection,
|
|
hs: "HomeServer",
|
|
):
|
|
super().__init__(database, db_conn, hs)
|
|
self._clock = hs.get_clock()
|
|
self.hs = hs
|
|
|
|
if hs.config.redis.redis_enabled:
|
|
# If we're using Redis, we can shift this update process off to
|
|
# the background worker
|
|
self._update_on_this_worker = hs.config.worker.run_background_tasks
|
|
else:
|
|
# If we're NOT using Redis, this must be handled by the master
|
|
self._update_on_this_worker = hs.get_instance_name() == "master"
|
|
|
|
self._limit_usage_by_mau = hs.config.server.limit_usage_by_mau
|
|
self._max_mau_value = hs.config.server.max_mau_value
|
|
|
|
self._mau_stats_only = hs.config.server.mau_stats_only
|
|
|
|
if self._update_on_this_worker:
|
|
# Do not add more reserved users than the total allowable number
|
|
self.db_pool.new_transaction(
|
|
db_conn,
|
|
"initialise_mau_threepids",
|
|
[],
|
|
[],
|
|
[],
|
|
self._initialise_reserved_users,
|
|
hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
|
|
)
|
|
|
|
@cached(num_args=0)
|
|
async def get_monthly_active_count(self) -> int:
|
|
"""Generates current count of monthly active users
|
|
|
|
Returns:
|
|
Number of current monthly active users
|
|
"""
|
|
|
|
def _count_users(txn: LoggingTransaction) -> int:
|
|
# Exclude app service users
|
|
sql = """
|
|
SELECT COUNT(*)
|
|
FROM monthly_active_users
|
|
LEFT JOIN users
|
|
ON monthly_active_users.user_id=users.name
|
|
WHERE (users.appservice_id IS NULL OR users.appservice_id = '');
|
|
"""
|
|
txn.execute(sql)
|
|
(count,) = cast(Tuple[int], txn.fetchone())
|
|
return count
|
|
|
|
return await self.db_pool.runInteraction("count_users", _count_users)
|
|
|
|
@cached(num_args=0)
|
|
async def get_monthly_active_count_by_service(self) -> Mapping[str, int]:
|
|
"""Generates current count of monthly active users broken down by service.
|
|
A service is typically an appservice but also includes native matrix users.
|
|
Since the `monthly_active_users` table is populated from the `user_ips` table
|
|
`config.appservice.track_appservice_user_ips` must be set to `true` for this
|
|
method to return anything other than native matrix users.
|
|
|
|
Returns:
|
|
A mapping between app_service_id and the number of occurrences.
|
|
|
|
"""
|
|
|
|
def _count_users_by_service(txn: LoggingTransaction) -> Dict[str, int]:
|
|
sql = """
|
|
SELECT COALESCE(appservice_id, 'native'), COUNT(*)
|
|
FROM monthly_active_users
|
|
LEFT JOIN users ON monthly_active_users.user_id=users.name
|
|
GROUP BY appservice_id;
|
|
"""
|
|
|
|
txn.execute(sql)
|
|
result = cast(List[Tuple[str, int]], txn.fetchall())
|
|
return dict(result)
|
|
|
|
return await self.db_pool.runInteraction(
|
|
"count_users_by_service", _count_users_by_service
|
|
)
|
|
|
|
async def get_monthly_active_users_by_service(
|
|
self, start_timestamp: Optional[int] = None, end_timestamp: Optional[int] = None
|
|
) -> List[Tuple[str, str]]:
|
|
"""Generates list of monthly active users and their services.
|
|
Please see "get_monthly_active_count_by_service" docstring for more details
|
|
about services.
|
|
|
|
Arguments:
|
|
start_timestamp: If specified, only include users that were first active
|
|
at or after this point
|
|
end_timestamp: If specified, only include users that were first active
|
|
at or before this point
|
|
|
|
Returns:
|
|
A list of tuples (appservice_id, user_id). "native" is emitted as the
|
|
appservice for users that don't come from appservices (i.e. native Matrix
|
|
users).
|
|
|
|
"""
|
|
if start_timestamp is not None and end_timestamp is not None:
|
|
where_clause = 'WHERE "timestamp" >= ? and "timestamp" <= ?'
|
|
query_params = [start_timestamp, end_timestamp]
|
|
elif start_timestamp is not None:
|
|
where_clause = 'WHERE "timestamp" >= ?'
|
|
query_params = [start_timestamp]
|
|
elif end_timestamp is not None:
|
|
where_clause = 'WHERE "timestamp" <= ?'
|
|
query_params = [end_timestamp]
|
|
else:
|
|
where_clause = ""
|
|
query_params = []
|
|
|
|
def _list_users(txn: LoggingTransaction) -> List[Tuple[str, str]]:
|
|
sql = f"""
|
|
SELECT COALESCE(appservice_id, 'native'), user_id
|
|
FROM monthly_active_users
|
|
LEFT JOIN users ON monthly_active_users.user_id=users.name
|
|
{where_clause};
|
|
"""
|
|
|
|
txn.execute(sql, query_params)
|
|
return cast(List[Tuple[str, str]], txn.fetchall())
|
|
|
|
return await self.db_pool.runInteraction("list_users", _list_users)
|
|
|
|
async def get_registered_reserved_users(self) -> List[str]:
|
|
"""Of the reserved threepids defined in config, retrieve those that are associated
|
|
with registered users
|
|
|
|
Returns:
|
|
User IDs of actual users that are reserved
|
|
"""
|
|
users = []
|
|
|
|
for tp in self.hs.config.server.mau_limits_reserved_threepids[
|
|
: self.hs.config.server.max_mau_value
|
|
]:
|
|
user_id = await self.hs.get_datastores().main.get_user_id_by_threepid(
|
|
tp["medium"], canonicalise_email(tp["address"])
|
|
)
|
|
if user_id:
|
|
users.append(user_id)
|
|
|
|
return users
|
|
|
|
@cached(num_args=1)
|
|
async def user_last_seen_monthly_active(self, user_id: str) -> Optional[int]:
|
|
"""
|
|
Checks if a given user is part of the monthly active user group
|
|
|
|
Arguments:
|
|
user_id: user to add/update
|
|
|
|
Return:
|
|
Timestamp since last seen, None if never seen
|
|
"""
|
|
|
|
return await self.db_pool.simple_select_one_onecol(
|
|
table="monthly_active_users",
|
|
keyvalues={"user_id": user_id},
|
|
retcol="timestamp",
|
|
allow_none=True,
|
|
desc="user_last_seen_monthly_active",
|
|
)
|
|
|
|
@wrap_as_background_process("reap_monthly_active_users")
|
|
async def reap_monthly_active_users(self) -> None:
|
|
"""Cleans out monthly active user table to ensure that no stale
|
|
entries exist.
|
|
"""
|
|
|
|
def _reap_users(txn: LoggingTransaction, reserved_users: List[str]) -> None:
|
|
"""
|
|
Args:
|
|
reserved_users: reserved users to preserve
|
|
"""
|
|
|
|
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
|
|
|
in_clause, in_clause_args = make_in_list_sql_clause(
|
|
self.database_engine, "user_id", reserved_users
|
|
)
|
|
|
|
txn.execute(
|
|
"DELETE FROM monthly_active_users WHERE timestamp < ? AND NOT %s"
|
|
% (in_clause,),
|
|
[thirty_days_ago] + in_clause_args,
|
|
)
|
|
|
|
if self._limit_usage_by_mau:
|
|
# If MAU user count still exceeds the MAU threshold, then delete on
|
|
# a least recently active basis.
|
|
# Note it is not possible to write this query using OFFSET due to
|
|
# incompatibilities in how sqlite and postgres support the feature.
|
|
# Sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present,
|
|
# while Postgres does not require 'LIMIT', but also does not support
|
|
# negative LIMIT values. So there is no way to write it that both can
|
|
# support
|
|
|
|
# Limit must be >= 0 for postgres
|
|
num_of_non_reserved_users_to_remove = max(
|
|
self._max_mau_value - len(reserved_users), 0
|
|
)
|
|
|
|
# It is important to filter reserved users twice to guard
|
|
# against the case where the reserved user is present in the
|
|
# SELECT, meaning that a legitimate mau is deleted.
|
|
sql = """
|
|
DELETE FROM monthly_active_users
|
|
WHERE user_id NOT IN (
|
|
SELECT user_id FROM monthly_active_users
|
|
WHERE NOT %s
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
)
|
|
AND NOT %s
|
|
""" % (
|
|
in_clause,
|
|
in_clause,
|
|
)
|
|
|
|
query_args = (
|
|
in_clause_args
|
|
+ [num_of_non_reserved_users_to_remove]
|
|
+ in_clause_args
|
|
)
|
|
txn.execute(sql, query_args)
|
|
|
|
# It seems poor to invalidate the whole cache. Postgres supports
|
|
# 'Returning' which would allow me to invalidate only the
|
|
# specific users, but sqlite has no way to do this and instead
|
|
# I would need to SELECT and the DELETE which without locking
|
|
# is racy.
|
|
# Have resolved to invalidate the whole cache for now and do
|
|
# something about it if and when the perf becomes significant
|
|
self._invalidate_all_cache_and_stream(
|
|
txn, self.user_last_seen_monthly_active
|
|
)
|
|
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
|
|
|
|
reserved_users = await self.get_registered_reserved_users()
|
|
await self.db_pool.runInteraction(
|
|
"reap_monthly_active_users", _reap_users, reserved_users
|
|
)
|
|
|
|
def _initialise_reserved_users(
|
|
self, txn: LoggingTransaction, threepids: List[dict]
|
|
) -> None:
|
|
"""Ensures that reserved threepids are accounted for in the MAU table, should
|
|
be called on start up.
|
|
|
|
Args:
|
|
txn:
|
|
threepids: List of threepid dicts to reserve
|
|
"""
|
|
assert (
|
|
self._update_on_this_worker
|
|
), "This worker is not designated to update MAUs"
|
|
|
|
# XXX what is this function trying to achieve? It upserts into
|
|
# monthly_active_users for each *registered* reserved mau user, but why?
|
|
#
|
|
# - shouldn't there already be an entry for each reserved user (at least
|
|
# if they have been active recently)?
|
|
#
|
|
# - if it's important that the timestamp is kept up to date, why do we only
|
|
# run this at startup?
|
|
|
|
for tp in threepids:
|
|
user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
|
|
|
|
if user_id:
|
|
is_support = self.is_support_user_txn(txn, user_id)
|
|
if not is_support:
|
|
# We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791
|
|
self.db_pool.simple_upsert_txn(
|
|
txn,
|
|
table="monthly_active_users",
|
|
keyvalues={"user_id": user_id},
|
|
values={"timestamp": int(self._clock.time_msec())},
|
|
)
|
|
else:
|
|
logger.warning("mau limit reserved threepid %s not found in db" % tp)
|
|
|
|
async def upsert_monthly_active_user(self, user_id: str) -> None:
|
|
"""Updates or inserts the user into the monthly active user table, which
|
|
is used to track the current MAU usage of the server
|
|
|
|
Args:
|
|
user_id: user to add/update
|
|
"""
|
|
assert (
|
|
self._update_on_this_worker
|
|
), "This worker is not designated to update MAUs"
|
|
|
|
# Support user never to be included in MAU stats. Note I can't easily call this
|
|
# from upsert_monthly_active_user_txn because then I need a _txn form of
|
|
# is_support_user which is complicated because I want to cache the result.
|
|
# Therefore I call it here and ignore the case where
|
|
# upsert_monthly_active_user_txn is called directly from
|
|
# _initialise_reserved_users reasoning that it would be very strange to
|
|
# include a support user in this context.
|
|
|
|
is_support = await self.is_support_user(user_id)
|
|
if is_support:
|
|
return
|
|
|
|
await self.db_pool.runInteraction(
|
|
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
|
|
)
|
|
|
|
def upsert_monthly_active_user_txn(
|
|
self, txn: LoggingTransaction, user_id: str
|
|
) -> None:
|
|
"""Updates or inserts monthly active user member
|
|
|
|
We consciously do not call is_support_txn from this method because it
|
|
is not possible to cache the response. is_support_txn will be false in
|
|
almost all cases, so it seems reasonable to call it only for
|
|
upsert_monthly_active_user and to call is_support_txn manually
|
|
for cases where upsert_monthly_active_user_txn is called directly,
|
|
like _initialise_reserved_users
|
|
|
|
In short, don't call this method with support users. (Support users
|
|
should not appear in the MAU stats).
|
|
|
|
Args:
|
|
txn:
|
|
user_id: user to add/update
|
|
"""
|
|
assert (
|
|
self._update_on_this_worker
|
|
), "This worker is not designated to update MAUs"
|
|
|
|
# Am consciously deciding to lock the table on the basis that is ought
|
|
# never be a big table and alternative approaches (batching multiple
|
|
# upserts into a single txn) introduced a lot of extra complexity.
|
|
# See https://github.com/matrix-org/synapse/issues/3854 for more
|
|
self.db_pool.simple_upsert_txn(
|
|
txn,
|
|
table="monthly_active_users",
|
|
keyvalues={"user_id": user_id},
|
|
values={"timestamp": int(self._clock.time_msec())},
|
|
)
|
|
|
|
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
|
|
self._invalidate_cache_and_stream(
|
|
txn, self.get_monthly_active_count_by_service, ()
|
|
)
|
|
self._invalidate_cache_and_stream(
|
|
txn, self.user_last_seen_monthly_active, (user_id,)
|
|
)
|
|
|
|
async def populate_monthly_active_users(self, user_id: str) -> None:
|
|
"""Checks on the state of monthly active user limits and optionally
|
|
add the user to the monthly active tables
|
|
|
|
Args:
|
|
user_id: the user_id to query
|
|
"""
|
|
assert (
|
|
self._update_on_this_worker
|
|
), "This worker is not designated to update MAUs"
|
|
|
|
if self._limit_usage_by_mau or self._mau_stats_only:
|
|
# Trial users and guests should not be included as part of MAU group
|
|
is_guest = await self.is_guest(user_id)
|
|
if is_guest:
|
|
return
|
|
is_trial = await self.is_trial_user(user_id)
|
|
if is_trial:
|
|
return
|
|
|
|
last_seen_timestamp = await self.user_last_seen_monthly_active(user_id)
|
|
now = self.hs.get_clock().time_msec()
|
|
|
|
# We want to reduce to the total number of db writes, and are happy
|
|
# to trade accuracy of timestamp in order to lighten load. This means
|
|
# We always insert new users (where MAU threshold has not been reached),
|
|
# but only update if we have not previously seen the user for
|
|
# LAST_SEEN_GRANULARITY ms
|
|
if last_seen_timestamp is None:
|
|
# In the case where mau_stats_only is True and limit_usage_by_mau is
|
|
# False, there is no point in checking get_monthly_active_count - it
|
|
# adds no value and will break the logic if max_mau_value is exceeded.
|
|
if not self._limit_usage_by_mau:
|
|
await self.upsert_monthly_active_user(user_id)
|
|
else:
|
|
count = await self.get_monthly_active_count()
|
|
if count < self._max_mau_value:
|
|
await self.upsert_monthly_active_user(user_id)
|
|
elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
|
|
await self.upsert_monthly_active_user(user_id)
|