879 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			879 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
| # -*- coding: utf-8 -*-
 | ||
| # Copyright 2018, 2019 New Vector Ltd
 | ||
| # Copyright 2019 The Matrix.org Foundation C.I.C.
 | ||
| #
 | ||
| # Licensed under the Apache License, Version 2.0 (the "License");
 | ||
| # you may not use this file except in compliance with the License.
 | ||
| # You may obtain a copy of the License at
 | ||
| #
 | ||
| #     http://www.apache.org/licenses/LICENSE-2.0
 | ||
| #
 | ||
| # Unless required by applicable law or agreed to in writing, software
 | ||
| # distributed under the License is distributed on an "AS IS" BASIS,
 | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | ||
| # See the License for the specific language governing permissions and
 | ||
| # limitations under the License.
 | ||
| 
 | ||
| import logging
 | ||
| from itertools import chain
 | ||
| 
 | ||
| from twisted.internet import defer
 | ||
| from twisted.internet.defer import DeferredLock
 | ||
| 
 | ||
| from synapse.api.constants import EventTypes, Membership
 | ||
| from synapse.storage import PostgresEngine
 | ||
| from synapse.storage.state_deltas import StateDeltasStore
 | ||
| from synapse.util.caches.descriptors import cached
 | ||
| 
 | ||
| logger = logging.getLogger(__name__)
 | ||
| 
 | ||
| # these fields track absolutes (e.g. total number of rooms on the server)
 | ||
| # You can think of these as Prometheus Gauges.
 | ||
| # You can draw these stats on a line graph.
 | ||
| # Example: number of users in a room
 | ||
| ABSOLUTE_STATS_FIELDS = {
 | ||
|     "room": (
 | ||
|         "current_state_events",
 | ||
|         "joined_members",
 | ||
|         "invited_members",
 | ||
|         "left_members",
 | ||
|         "banned_members",
 | ||
|         "local_users_in_room",
 | ||
|     ),
 | ||
|     "user": ("joined_rooms",),
 | ||
| }
 | ||
| 
 | ||
| # these fields are per-timeslice and so should be reset to 0 upon a new slice
 | ||
| # You can draw these stats on a histogram.
 | ||
| # Example: number of events sent locally during a time slice
 | ||
| PER_SLICE_FIELDS = {
 | ||
|     "room": ("total_events", "total_event_bytes"),
 | ||
|     "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
 | ||
| }
 | ||
| 
 | ||
| TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 | ||
| 
 | ||
| # these are the tables (& ID columns) which contain our actual subjects
 | ||
| TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
 | ||
| 
 | ||
| 
 | ||
| class StatsStore(StateDeltasStore):
 | ||
|     def __init__(self, db_conn, hs):
 | ||
|         super(StatsStore, self).__init__(db_conn, hs)
 | ||
| 
 | ||
|         self.server_name = hs.hostname
 | ||
|         self.clock = self.hs.get_clock()
 | ||
|         self.stats_enabled = hs.config.stats_enabled
 | ||
|         self.stats_bucket_size = hs.config.stats_bucket_size
 | ||
| 
 | ||
|         self.stats_delta_processing_lock = DeferredLock()
 | ||
| 
 | ||
|         self.register_background_update_handler(
 | ||
|             "populate_stats_process_rooms", self._populate_stats_process_rooms
 | ||
|         )
 | ||
|         self.register_background_update_handler(
 | ||
|             "populate_stats_process_users", self._populate_stats_process_users
 | ||
|         )
 | ||
|         # we no longer need to perform clean-up, but we will give ourselves
 | ||
|         # the potential to reintroduce it in the future – so documentation
 | ||
|         # will still encourage the use of this no-op handler.
 | ||
|         self.register_noop_background_update("populate_stats_cleanup")
 | ||
|         self.register_noop_background_update("populate_stats_prepare")
 | ||
| 
 | ||
|     def quantise_stats_time(self, ts):
 | ||
|         """
 | ||
|         Quantises a timestamp to be a multiple of the bucket size.
 | ||
| 
 | ||
|         Args:
 | ||
|             ts (int): the timestamp to quantise, in milliseconds since the Unix
 | ||
|                 Epoch
 | ||
| 
 | ||
|         Returns:
 | ||
|             int: a timestamp which
 | ||
|               - is divisible by the bucket size;
 | ||
|               - is no later than `ts`; and
 | ||
|               - is the largest such timestamp.
 | ||
|         """
 | ||
|         return (ts // self.stats_bucket_size) * self.stats_bucket_size
 | ||
| 
 | ||
|     @defer.inlineCallbacks
 | ||
|     def _populate_stats_process_users(self, progress, batch_size):
 | ||
|         """
 | ||
|         This is a background update which regenerates statistics for users.
 | ||
|         """
 | ||
|         if not self.stats_enabled:
 | ||
|             yield self._end_background_update("populate_stats_process_users")
 | ||
|             return 1
 | ||
| 
 | ||
|         last_user_id = progress.get("last_user_id", "")
 | ||
| 
 | ||
|         def _get_next_batch(txn):
 | ||
|             sql = """
 | ||
|                     SELECT DISTINCT name FROM users
 | ||
|                     WHERE name > ?
 | ||
|                     ORDER BY name ASC
 | ||
|                     LIMIT ?
 | ||
|                 """
 | ||
|             txn.execute(sql, (last_user_id, batch_size))
 | ||
|             return [r for r, in txn]
 | ||
| 
 | ||
|         users_to_work_on = yield self.runInteraction(
 | ||
|             "_populate_stats_process_users", _get_next_batch
 | ||
|         )
 | ||
| 
 | ||
|         # No more rooms -- complete the transaction.
 | ||
|         if not users_to_work_on:
 | ||
|             yield self._end_background_update("populate_stats_process_users")
 | ||
|             return 1
 | ||
| 
 | ||
|         for user_id in users_to_work_on:
 | ||
|             yield self._calculate_and_set_initial_state_for_user(user_id)
 | ||
|             progress["last_user_id"] = user_id
 | ||
| 
 | ||
|         yield self.runInteraction(
 | ||
|             "populate_stats_process_users",
 | ||
|             self._background_update_progress_txn,
 | ||
|             "populate_stats_process_users",
 | ||
|             progress,
 | ||
|         )
 | ||
| 
 | ||
|         return len(users_to_work_on)
 | ||
| 
 | ||
|     @defer.inlineCallbacks
 | ||
|     def _populate_stats_process_rooms(self, progress, batch_size):
 | ||
|         """
 | ||
|         This is a background update which regenerates statistics for rooms.
 | ||
|         """
 | ||
|         if not self.stats_enabled:
 | ||
|             yield self._end_background_update("populate_stats_process_rooms")
 | ||
|             return 1
 | ||
| 
 | ||
|         last_room_id = progress.get("last_room_id", "")
 | ||
| 
 | ||
|         def _get_next_batch(txn):
 | ||
|             sql = """
 | ||
|                     SELECT DISTINCT room_id FROM current_state_events
 | ||
|                     WHERE room_id > ?
 | ||
|                     ORDER BY room_id ASC
 | ||
|                     LIMIT ?
 | ||
|                 """
 | ||
|             txn.execute(sql, (last_room_id, batch_size))
 | ||
|             return [r for r, in txn]
 | ||
| 
 | ||
|         rooms_to_work_on = yield self.runInteraction(
 | ||
|             "populate_stats_rooms_get_batch", _get_next_batch
 | ||
|         )
 | ||
| 
 | ||
|         # No more rooms -- complete the transaction.
 | ||
|         if not rooms_to_work_on:
 | ||
|             yield self._end_background_update("populate_stats_process_rooms")
 | ||
|             return 1
 | ||
| 
 | ||
|         for room_id in rooms_to_work_on:
 | ||
|             yield self._calculate_and_set_initial_state_for_room(room_id)
 | ||
|             progress["last_room_id"] = room_id
 | ||
| 
 | ||
|         yield self.runInteraction(
 | ||
|             "_populate_stats_process_rooms",
 | ||
|             self._background_update_progress_txn,
 | ||
|             "populate_stats_process_rooms",
 | ||
|             progress,
 | ||
|         )
 | ||
| 
 | ||
|         return len(rooms_to_work_on)
 | ||
| 
 | ||
|     def get_stats_positions(self):
 | ||
|         """
 | ||
|         Returns the stats processor positions.
 | ||
|         """
 | ||
|         return self._simple_select_one_onecol(
 | ||
|             table="stats_incremental_position",
 | ||
|             keyvalues={},
 | ||
|             retcol="stream_id",
 | ||
|             desc="stats_incremental_position",
 | ||
|         )
 | ||
| 
 | ||
|     def update_room_state(self, room_id, fields):
 | ||
|         """
 | ||
|         Args:
 | ||
|             room_id (str)
 | ||
|             fields (dict[str:Any])
 | ||
|         """
 | ||
| 
 | ||
|         # For whatever reason some of the fields may contain null bytes, which
 | ||
|         # postgres isn't a fan of, so we replace those fields with null.
 | ||
|         for col in (
 | ||
|             "join_rules",
 | ||
|             "history_visibility",
 | ||
|             "encryption",
 | ||
|             "name",
 | ||
|             "topic",
 | ||
|             "avatar",
 | ||
|             "canonical_alias",
 | ||
|         ):
 | ||
|             field = fields.get(col)
 | ||
|             if field and "\0" in field:
 | ||
|                 fields[col] = None
 | ||
| 
 | ||
|         return self._simple_upsert(
 | ||
|             table="room_stats_state",
 | ||
|             keyvalues={"room_id": room_id},
 | ||
|             values=fields,
 | ||
|             desc="update_room_state",
 | ||
|         )
 | ||
| 
 | ||
|     def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
 | ||
|         """
 | ||
|         Get statistics for a given subject.
 | ||
| 
 | ||
|         Args:
 | ||
|             stats_type (str): The type of subject
 | ||
|             stats_id (str): The ID of the subject (e.g. room_id or user_id)
 | ||
|             start (int): Pagination start. Number of entries, not timestamp.
 | ||
|             size (int): How many entries to return.
 | ||
| 
 | ||
|         Returns:
 | ||
|             Deferred[list[dict]], where the dict has the keys of
 | ||
|             ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
 | ||
|         """
 | ||
|         return self.runInteraction(
 | ||
|             "get_statistics_for_subject",
 | ||
|             self._get_statistics_for_subject_txn,
 | ||
|             stats_type,
 | ||
|             stats_id,
 | ||
|             start,
 | ||
|             size,
 | ||
|         )
 | ||
| 
 | ||
|     def _get_statistics_for_subject_txn(
 | ||
|         self, txn, stats_type, stats_id, start, size=100
 | ||
|     ):
 | ||
|         """
 | ||
|         Transaction-bound version of L{get_statistics_for_subject}.
 | ||
|         """
 | ||
| 
 | ||
|         table, id_col = TYPE_TO_TABLE[stats_type]
 | ||
|         selected_columns = list(
 | ||
|             ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
 | ||
|         )
 | ||
| 
 | ||
|         slice_list = self._simple_select_list_paginate_txn(
 | ||
|             txn,
 | ||
|             table + "_historical",
 | ||
|             {id_col: stats_id},
 | ||
|             "end_ts",
 | ||
|             start,
 | ||
|             size,
 | ||
|             retcols=selected_columns + ["bucket_size", "end_ts"],
 | ||
|             order_direction="DESC",
 | ||
|         )
 | ||
| 
 | ||
|         return slice_list
 | ||
| 
 | ||
|     def get_room_stats_state(self, room_id):
 | ||
|         """
 | ||
|         Returns the current room_stats_state for a room.
 | ||
| 
 | ||
|         Args:
 | ||
|             room_id (str): The ID of the room to return state for.
 | ||
| 
 | ||
|         Returns (dict):
 | ||
|             Dictionary containing these keys:
 | ||
|                 "name", "topic", "canonical_alias", "avatar", "join_rules",
 | ||
|                 "history_visibility"
 | ||
|         """
 | ||
|         return self._simple_select_one(
 | ||
|             "room_stats_state",
 | ||
|             {"room_id": room_id},
 | ||
|             retcols=(
 | ||
|                 "name",
 | ||
|                 "topic",
 | ||
|                 "canonical_alias",
 | ||
|                 "avatar",
 | ||
|                 "join_rules",
 | ||
|                 "history_visibility",
 | ||
|             ),
 | ||
|         )
 | ||
| 
 | ||
|     @cached()
 | ||
|     def get_earliest_token_for_stats(self, stats_type, id):
 | ||
|         """
 | ||
|         Fetch the "earliest token". This is used by the room stats delta
 | ||
|         processor to ignore deltas that have been processed between the
 | ||
|         start of the background task and any particular room's stats
 | ||
|         being calculated.
 | ||
| 
 | ||
|         Returns:
 | ||
|             Deferred[int]
 | ||
|         """
 | ||
|         table, id_col = TYPE_TO_TABLE[stats_type]
 | ||
| 
 | ||
|         return self._simple_select_one_onecol(
 | ||
|             "%s_current" % (table,),
 | ||
|             keyvalues={id_col: id},
 | ||
|             retcol="completed_delta_stream_id",
 | ||
|             allow_none=True,
 | ||
|         )
 | ||
| 
 | ||
|     def bulk_update_stats_delta(self, ts, updates, stream_id):
 | ||
|         """Bulk update stats tables for a given stream_id and updates the stats
 | ||
|         incremental position.
 | ||
| 
 | ||
|         Args:
 | ||
|             ts (int): Current timestamp in ms
 | ||
|             updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
 | ||
|                 commit as a mapping stats_type -> stats_id -> field -> delta.
 | ||
|             stream_id (int): Current position.
 | ||
| 
 | ||
|         Returns:
 | ||
|             Deferred
 | ||
|         """
 | ||
| 
 | ||
|         def _bulk_update_stats_delta_txn(txn):
 | ||
|             for stats_type, stats_updates in updates.items():
 | ||
|                 for stats_id, fields in stats_updates.items():
 | ||
|                     self._update_stats_delta_txn(
 | ||
|                         txn,
 | ||
|                         ts=ts,
 | ||
|                         stats_type=stats_type,
 | ||
|                         stats_id=stats_id,
 | ||
|                         fields=fields,
 | ||
|                         complete_with_stream_id=stream_id,
 | ||
|                     )
 | ||
| 
 | ||
|             self._simple_update_one_txn(
 | ||
|                 txn,
 | ||
|                 table="stats_incremental_position",
 | ||
|                 keyvalues={},
 | ||
|                 updatevalues={"stream_id": stream_id},
 | ||
|             )
 | ||
| 
 | ||
|         return self.runInteraction(
 | ||
|             "bulk_update_stats_delta", _bulk_update_stats_delta_txn
 | ||
|         )
 | ||
| 
 | ||
|     def update_stats_delta(
 | ||
|         self,
 | ||
|         ts,
 | ||
|         stats_type,
 | ||
|         stats_id,
 | ||
|         fields,
 | ||
|         complete_with_stream_id,
 | ||
|         absolute_field_overrides=None,
 | ||
|     ):
 | ||
|         """
 | ||
|         Updates the statistics for a subject, with a delta (difference/relative
 | ||
|         change).
 | ||
| 
 | ||
|         Args:
 | ||
|             ts (int): timestamp of the change
 | ||
|             stats_type (str): "room" or "user" – the kind of subject
 | ||
|             stats_id (str): the subject's ID (room ID or user ID)
 | ||
|             fields (dict[str, int]): Deltas of stats values.
 | ||
|             complete_with_stream_id (int, optional):
 | ||
|                 If supplied, converts an incomplete row into a complete row,
 | ||
|                 with the supplied stream_id marked as the stream_id where the
 | ||
|                 row was completed.
 | ||
|             absolute_field_overrides (dict[str, int]): Current stats values
 | ||
|                 (i.e. not deltas) of absolute fields.
 | ||
|                 Does not work with per-slice fields.
 | ||
|         """
 | ||
| 
 | ||
|         return self.runInteraction(
 | ||
|             "update_stats_delta",
 | ||
|             self._update_stats_delta_txn,
 | ||
|             ts,
 | ||
|             stats_type,
 | ||
|             stats_id,
 | ||
|             fields,
 | ||
|             complete_with_stream_id=complete_with_stream_id,
 | ||
|             absolute_field_overrides=absolute_field_overrides,
 | ||
|         )
 | ||
| 
 | ||
|     def _update_stats_delta_txn(
 | ||
|         self,
 | ||
|         txn,
 | ||
|         ts,
 | ||
|         stats_type,
 | ||
|         stats_id,
 | ||
|         fields,
 | ||
|         complete_with_stream_id,
 | ||
|         absolute_field_overrides=None,
 | ||
|     ):
 | ||
|         if absolute_field_overrides is None:
 | ||
|             absolute_field_overrides = {}
 | ||
| 
 | ||
|         table, id_col = TYPE_TO_TABLE[stats_type]
 | ||
| 
 | ||
|         quantised_ts = self.quantise_stats_time(int(ts))
 | ||
|         end_ts = quantised_ts + self.stats_bucket_size
 | ||
| 
 | ||
|         # Lets be paranoid and check that all the given field names are known
 | ||
|         abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
 | ||
|         slice_field_names = PER_SLICE_FIELDS[stats_type]
 | ||
|         for field in chain(fields.keys(), absolute_field_overrides.keys()):
 | ||
|             if field not in abs_field_names and field not in slice_field_names:
 | ||
|                 # guard against potential SQL injection dodginess
 | ||
|                 raise ValueError(
 | ||
|                     "%s is not a recognised field"
 | ||
|                     " for stats type %s" % (field, stats_type)
 | ||
|                 )
 | ||
| 
 | ||
|         # Per slice fields do not get added to the _current table
 | ||
| 
 | ||
|         # This calculates the deltas (`field = field + ?` values)
 | ||
|         # for absolute fields,
 | ||
|         # * defaulting to 0 if not specified
 | ||
|         #     (required for the INSERT part of upserting to work)
 | ||
|         # * omitting overrides specified in `absolute_field_overrides`
 | ||
|         deltas_of_absolute_fields = {
 | ||
|             key: fields.get(key, 0)
 | ||
|             for key in abs_field_names
 | ||
|             if key not in absolute_field_overrides
 | ||
|         }
 | ||
| 
 | ||
|         # Keep the delta stream ID field up to date
 | ||
|         absolute_field_overrides = absolute_field_overrides.copy()
 | ||
|         absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id
 | ||
| 
 | ||
|         # first upsert the `_current` table
 | ||
|         self._upsert_with_additive_relatives_txn(
 | ||
|             txn=txn,
 | ||
|             table=table + "_current",
 | ||
|             keyvalues={id_col: stats_id},
 | ||
|             absolutes=absolute_field_overrides,
 | ||
|             additive_relatives=deltas_of_absolute_fields,
 | ||
|         )
 | ||
| 
 | ||
|         per_slice_additive_relatives = {
 | ||
|             key: fields.get(key, 0) for key in slice_field_names
 | ||
|         }
 | ||
|         self._upsert_copy_from_table_with_additive_relatives_txn(
 | ||
|             txn=txn,
 | ||
|             into_table=table + "_historical",
 | ||
|             keyvalues={id_col: stats_id},
 | ||
|             extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
 | ||
|             extra_dst_keyvalues={"end_ts": end_ts},
 | ||
|             additive_relatives=per_slice_additive_relatives,
 | ||
|             src_table=table + "_current",
 | ||
|             copy_columns=abs_field_names,
 | ||
|         )
 | ||
| 
 | ||
|     def _upsert_with_additive_relatives_txn(
 | ||
|         self, txn, table, keyvalues, absolutes, additive_relatives
 | ||
|     ):
 | ||
|         """Used to update values in the stats tables.
 | ||
| 
 | ||
|         This is basically a slightly convoluted upsert that *adds* to any
 | ||
|         existing rows.
 | ||
| 
 | ||
|         Args:
 | ||
|             txn
 | ||
|             table (str): Table name
 | ||
|             keyvalues (dict[str, any]): Row-identifying key values
 | ||
|             absolutes (dict[str, any]): Absolute (set) fields
 | ||
|             additive_relatives (dict[str, int]): Fields that will be added onto
 | ||
|                 if existing row present.
 | ||
|         """
 | ||
|         if self.database_engine.can_native_upsert:
 | ||
|             absolute_updates = [
 | ||
|                 "%(field)s = EXCLUDED.%(field)s" % {"field": field}
 | ||
|                 for field in absolutes.keys()
 | ||
|             ]
 | ||
| 
 | ||
|             relative_updates = [
 | ||
|                 "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
 | ||
|                 % {"table": table, "field": field}
 | ||
|                 for field in additive_relatives.keys()
 | ||
|             ]
 | ||
| 
 | ||
|             insert_cols = []
 | ||
|             qargs = []
 | ||
| 
 | ||
|             for (key, val) in chain(
 | ||
|                 keyvalues.items(), absolutes.items(), additive_relatives.items()
 | ||
|             ):
 | ||
|                 insert_cols.append(key)
 | ||
|                 qargs.append(val)
 | ||
| 
 | ||
|             sql = """
 | ||
|                 INSERT INTO %(table)s (%(insert_cols_cs)s)
 | ||
|                 VALUES (%(insert_vals_qs)s)
 | ||
|                 ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
 | ||
|             """ % {
 | ||
|                 "table": table,
 | ||
|                 "insert_cols_cs": ", ".join(insert_cols),
 | ||
|                 "insert_vals_qs": ", ".join(
 | ||
|                     ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
 | ||
|                 ),
 | ||
|                 "key_columns": ", ".join(keyvalues),
 | ||
|                 "updates": ", ".join(chain(absolute_updates, relative_updates)),
 | ||
|             }
 | ||
| 
 | ||
|             txn.execute(sql, qargs)
 | ||
|         else:
 | ||
|             self.database_engine.lock_table(txn, table)
 | ||
|             retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
 | ||
|             current_row = self._simple_select_one_txn(
 | ||
|                 txn, table, keyvalues, retcols, allow_none=True
 | ||
|             )
 | ||
|             if current_row is None:
 | ||
|                 merged_dict = {**keyvalues, **absolutes, **additive_relatives}
 | ||
|                 self._simple_insert_txn(txn, table, merged_dict)
 | ||
|             else:
 | ||
|                 for (key, val) in additive_relatives.items():
 | ||
|                     current_row[key] += val
 | ||
|                 current_row.update(absolutes)
 | ||
|                 self._simple_update_one_txn(txn, table, keyvalues, current_row)
 | ||
| 
 | ||
|     def _upsert_copy_from_table_with_additive_relatives_txn(
 | ||
|         self,
 | ||
|         txn,
 | ||
|         into_table,
 | ||
|         keyvalues,
 | ||
|         extra_dst_keyvalues,
 | ||
|         extra_dst_insvalues,
 | ||
|         additive_relatives,
 | ||
|         src_table,
 | ||
|         copy_columns,
 | ||
|     ):
 | ||
|         """Updates the historic stats table with latest updates.
 | ||
| 
 | ||
|         This involves copying "absolute" fields from the `_current` table, and
 | ||
|         adding relative fields to any existing values.
 | ||
| 
 | ||
|         Args:
 | ||
|              txn: Transaction
 | ||
|              into_table (str): The destination table to UPSERT the row into
 | ||
|              keyvalues (dict[str, any]): Row-identifying key values
 | ||
|              extra_dst_keyvalues (dict[str, any]): Additional keyvalues
 | ||
|                 for `into_table`.
 | ||
|              extra_dst_insvalues (dict[str, any]): Additional values to insert
 | ||
|                 on new row creation for `into_table`.
 | ||
|              additive_relatives (dict[str, any]): Fields that will be added onto
 | ||
|                 if existing row present. (Must be disjoint from copy_columns.)
 | ||
|              src_table (str): The source table to copy from
 | ||
|              copy_columns (iterable[str]): The list of columns to copy
 | ||
|         """
 | ||
|         if self.database_engine.can_native_upsert:
 | ||
|             ins_columns = chain(
 | ||
|                 keyvalues,
 | ||
|                 copy_columns,
 | ||
|                 additive_relatives,
 | ||
|                 extra_dst_keyvalues,
 | ||
|                 extra_dst_insvalues,
 | ||
|             )
 | ||
|             sel_exprs = chain(
 | ||
|                 keyvalues,
 | ||
|                 copy_columns,
 | ||
|                 (
 | ||
|                     "?"
 | ||
|                     for _ in chain(
 | ||
|                         additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
 | ||
|                     )
 | ||
|                 ),
 | ||
|             )
 | ||
|             keyvalues_where = ("%s = ?" % f for f in keyvalues)
 | ||
| 
 | ||
|             sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
 | ||
|             sets_ar = (
 | ||
|                 "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
 | ||
|                 for f in additive_relatives
 | ||
|             )
 | ||
| 
 | ||
|             sql = """
 | ||
|                 INSERT INTO %(into_table)s (%(ins_columns)s)
 | ||
|                 SELECT %(sel_exprs)s
 | ||
|                 FROM %(src_table)s
 | ||
|                 WHERE %(keyvalues_where)s
 | ||
|                 ON CONFLICT (%(keyvalues)s)
 | ||
|                 DO UPDATE SET %(sets)s
 | ||
|             """ % {
 | ||
|                 "into_table": into_table,
 | ||
|                 "ins_columns": ", ".join(ins_columns),
 | ||
|                 "sel_exprs": ", ".join(sel_exprs),
 | ||
|                 "keyvalues_where": " AND ".join(keyvalues_where),
 | ||
|                 "src_table": src_table,
 | ||
|                 "keyvalues": ", ".join(
 | ||
|                     chain(keyvalues.keys(), extra_dst_keyvalues.keys())
 | ||
|                 ),
 | ||
|                 "sets": ", ".join(chain(sets_cc, sets_ar)),
 | ||
|             }
 | ||
| 
 | ||
|             qargs = list(
 | ||
|                 chain(
 | ||
|                     additive_relatives.values(),
 | ||
|                     extra_dst_keyvalues.values(),
 | ||
|                     extra_dst_insvalues.values(),
 | ||
|                     keyvalues.values(),
 | ||
|                 )
 | ||
|             )
 | ||
|             txn.execute(sql, qargs)
 | ||
|         else:
 | ||
|             self.database_engine.lock_table(txn, into_table)
 | ||
|             src_row = self._simple_select_one_txn(
 | ||
|                 txn, src_table, keyvalues, copy_columns
 | ||
|             )
 | ||
|             all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
 | ||
|             dest_current_row = self._simple_select_one_txn(
 | ||
|                 txn,
 | ||
|                 into_table,
 | ||
|                 keyvalues=all_dest_keyvalues,
 | ||
|                 retcols=list(chain(additive_relatives.keys(), copy_columns)),
 | ||
|                 allow_none=True,
 | ||
|             )
 | ||
| 
 | ||
|             if dest_current_row is None:
 | ||
|                 merged_dict = {
 | ||
|                     **keyvalues,
 | ||
|                     **extra_dst_keyvalues,
 | ||
|                     **extra_dst_insvalues,
 | ||
|                     **src_row,
 | ||
|                     **additive_relatives,
 | ||
|                 }
 | ||
|                 self._simple_insert_txn(txn, into_table, merged_dict)
 | ||
|             else:
 | ||
|                 for (key, val) in additive_relatives.items():
 | ||
|                     src_row[key] = dest_current_row[key] + val
 | ||
|                 self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
 | ||
| 
 | ||
|     def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
 | ||
|         """Fetches the counts of events in the given range of stream IDs.
 | ||
| 
 | ||
|         Args:
 | ||
|             min_pos (int)
 | ||
|             max_pos (int)
 | ||
| 
 | ||
|         Returns:
 | ||
|             Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
 | ||
|             changes.
 | ||
|         """
 | ||
| 
 | ||
|         return self.runInteraction(
 | ||
|             "stats_incremental_total_events_and_bytes",
 | ||
|             self.get_changes_room_total_events_and_bytes_txn,
 | ||
|             min_pos,
 | ||
|             max_pos,
 | ||
|         )
 | ||
| 
 | ||
|     def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
 | ||
|         """Gets the total_events and total_event_bytes counts for rooms and
 | ||
|         senders, in a range of stream_orderings (including backfilled events).
 | ||
| 
 | ||
|         Args:
 | ||
|             txn
 | ||
|             low_pos (int): Low stream ordering
 | ||
|             high_pos (int): High stream ordering
 | ||
| 
 | ||
|         Returns:
 | ||
|             tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
 | ||
|             room and user deltas for total_events/total_event_bytes in the
 | ||
|             format of `stats_id` -> fields
 | ||
|         """
 | ||
| 
 | ||
|         if low_pos >= high_pos:
 | ||
|             # nothing to do here.
 | ||
|             return {}, {}
 | ||
| 
 | ||
|         if isinstance(self.database_engine, PostgresEngine):
 | ||
|             new_bytes_expression = "OCTET_LENGTH(json)"
 | ||
|         else:
 | ||
|             new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
 | ||
| 
 | ||
|         sql = """
 | ||
|             SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
 | ||
|             FROM events INNER JOIN event_json USING (event_id)
 | ||
|             WHERE (? < stream_ordering AND stream_ordering <= ?)
 | ||
|                 OR (? <= stream_ordering AND stream_ordering <= ?)
 | ||
|             GROUP BY events.room_id
 | ||
|         """ % (
 | ||
|             new_bytes_expression,
 | ||
|         )
 | ||
| 
 | ||
|         txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
 | ||
| 
 | ||
|         room_deltas = {
 | ||
|             room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
 | ||
|             for room_id, new_events, new_bytes in txn
 | ||
|         }
 | ||
| 
 | ||
|         sql = """
 | ||
|             SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
 | ||
|             FROM events INNER JOIN event_json USING (event_id)
 | ||
|             WHERE (? < stream_ordering AND stream_ordering <= ?)
 | ||
|                 OR (? <= stream_ordering AND stream_ordering <= ?)
 | ||
|             GROUP BY events.sender
 | ||
|         """ % (
 | ||
|             new_bytes_expression,
 | ||
|         )
 | ||
| 
 | ||
|         txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
 | ||
| 
 | ||
|         user_deltas = {
 | ||
|             user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
 | ||
|             for user_id, new_events, new_bytes in txn
 | ||
|             if self.hs.is_mine_id(user_id)
 | ||
|         }
 | ||
| 
 | ||
|         return room_deltas, user_deltas
 | ||
| 
 | ||
|     @defer.inlineCallbacks
 | ||
|     def _calculate_and_set_initial_state_for_room(self, room_id):
 | ||
|         """Calculate and insert an entry into room_stats_current.
 | ||
| 
 | ||
|         Args:
 | ||
|             room_id (str)
 | ||
| 
 | ||
|         Returns:
 | ||
|             Deferred[tuple[dict, dict, int]]: A tuple of room state, membership
 | ||
|             counts and stream position.
 | ||
|         """
 | ||
| 
 | ||
|         def _fetch_current_state_stats(txn):
 | ||
|             pos = self.get_room_max_stream_ordering()
 | ||
| 
 | ||
|             rows = self._simple_select_many_txn(
 | ||
|                 txn,
 | ||
|                 table="current_state_events",
 | ||
|                 column="type",
 | ||
|                 iterable=[
 | ||
|                     EventTypes.Create,
 | ||
|                     EventTypes.JoinRules,
 | ||
|                     EventTypes.RoomHistoryVisibility,
 | ||
|                     EventTypes.Encryption,
 | ||
|                     EventTypes.Name,
 | ||
|                     EventTypes.Topic,
 | ||
|                     EventTypes.RoomAvatar,
 | ||
|                     EventTypes.CanonicalAlias,
 | ||
|                 ],
 | ||
|                 keyvalues={"room_id": room_id, "state_key": ""},
 | ||
|                 retcols=["event_id"],
 | ||
|             )
 | ||
| 
 | ||
|             event_ids = [row["event_id"] for row in rows]
 | ||
| 
 | ||
|             txn.execute(
 | ||
|                 """
 | ||
|                     SELECT membership, count(*) FROM current_state_events
 | ||
|                     WHERE room_id = ? AND type = 'm.room.member'
 | ||
|                     GROUP BY membership
 | ||
|                 """,
 | ||
|                 (room_id,),
 | ||
|             )
 | ||
|             membership_counts = {membership: cnt for membership, cnt in txn}
 | ||
| 
 | ||
|             txn.execute(
 | ||
|                 """
 | ||
|                     SELECT COALESCE(count(*), 0) FROM current_state_events
 | ||
|                     WHERE room_id = ?
 | ||
|                 """,
 | ||
|                 (room_id,),
 | ||
|             )
 | ||
| 
 | ||
|             current_state_events_count, = txn.fetchone()
 | ||
| 
 | ||
|             users_in_room = self.get_users_in_room_txn(txn, room_id)
 | ||
| 
 | ||
|             return (
 | ||
|                 event_ids,
 | ||
|                 membership_counts,
 | ||
|                 current_state_events_count,
 | ||
|                 users_in_room,
 | ||
|                 pos,
 | ||
|             )
 | ||
| 
 | ||
|         (
 | ||
|             event_ids,
 | ||
|             membership_counts,
 | ||
|             current_state_events_count,
 | ||
|             users_in_room,
 | ||
|             pos,
 | ||
|         ) = yield self.runInteraction(
 | ||
|             "get_initial_state_for_room", _fetch_current_state_stats
 | ||
|         )
 | ||
| 
 | ||
|         state_event_map = yield self.get_events(event_ids, get_prev_content=False)
 | ||
| 
 | ||
|         room_state = {
 | ||
|             "join_rules": None,
 | ||
|             "history_visibility": None,
 | ||
|             "encryption": None,
 | ||
|             "name": None,
 | ||
|             "topic": None,
 | ||
|             "avatar": None,
 | ||
|             "canonical_alias": None,
 | ||
|             "is_federatable": True,
 | ||
|         }
 | ||
| 
 | ||
|         for event in state_event_map.values():
 | ||
|             if event.type == EventTypes.JoinRules:
 | ||
|                 room_state["join_rules"] = event.content.get("join_rule")
 | ||
|             elif event.type == EventTypes.RoomHistoryVisibility:
 | ||
|                 room_state["history_visibility"] = event.content.get(
 | ||
|                     "history_visibility"
 | ||
|                 )
 | ||
|             elif event.type == EventTypes.Encryption:
 | ||
|                 room_state["encryption"] = event.content.get("algorithm")
 | ||
|             elif event.type == EventTypes.Name:
 | ||
|                 room_state["name"] = event.content.get("name")
 | ||
|             elif event.type == EventTypes.Topic:
 | ||
|                 room_state["topic"] = event.content.get("topic")
 | ||
|             elif event.type == EventTypes.RoomAvatar:
 | ||
|                 room_state["avatar"] = event.content.get("url")
 | ||
|             elif event.type == EventTypes.CanonicalAlias:
 | ||
|                 room_state["canonical_alias"] = event.content.get("alias")
 | ||
|             elif event.type == EventTypes.Create:
 | ||
|                 room_state["is_federatable"] = (
 | ||
|                     event.content.get("m.federate", True) is True
 | ||
|                 )
 | ||
| 
 | ||
|         yield self.update_room_state(room_id, room_state)
 | ||
| 
 | ||
|         local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
 | ||
| 
 | ||
|         yield self.update_stats_delta(
 | ||
|             ts=self.clock.time_msec(),
 | ||
|             stats_type="room",
 | ||
|             stats_id=room_id,
 | ||
|             fields={},
 | ||
|             complete_with_stream_id=pos,
 | ||
|             absolute_field_overrides={
 | ||
|                 "current_state_events": current_state_events_count,
 | ||
|                 "joined_members": membership_counts.get(Membership.JOIN, 0),
 | ||
|                 "invited_members": membership_counts.get(Membership.INVITE, 0),
 | ||
|                 "left_members": membership_counts.get(Membership.LEAVE, 0),
 | ||
|                 "banned_members": membership_counts.get(Membership.BAN, 0),
 | ||
|                 "local_users_in_room": len(local_users_in_room),
 | ||
|             },
 | ||
|         )
 | ||
| 
 | ||
|     @defer.inlineCallbacks
 | ||
|     def _calculate_and_set_initial_state_for_user(self, user_id):
 | ||
|         def _calculate_and_set_initial_state_for_user_txn(txn):
 | ||
|             pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
 | ||
| 
 | ||
|             txn.execute(
 | ||
|                 """
 | ||
|                 SELECT COUNT(distinct room_id) FROM current_state_events
 | ||
|                     WHERE type = 'm.room.member' AND state_key = ?
 | ||
|                         AND membership = 'join'
 | ||
|                 """,
 | ||
|                 (user_id,),
 | ||
|             )
 | ||
|             count, = txn.fetchone()
 | ||
|             return count, pos
 | ||
| 
 | ||
|         joined_rooms, pos = yield self.runInteraction(
 | ||
|             "calculate_and_set_initial_state_for_user",
 | ||
|             _calculate_and_set_initial_state_for_user_txn,
 | ||
|         )
 | ||
| 
 | ||
|         yield self.update_stats_delta(
 | ||
|             ts=self.clock.time_msec(),
 | ||
|             stats_type="user",
 | ||
|             stats_id=user_id,
 | ||
|             fields={},
 | ||
|             complete_with_stream_id=pos,
 | ||
|             absolute_field_overrides={"joined_rooms": joined_rooms},
 | ||
|         )
 |