MatrixSynapse/synapse/storage/__init__.py

312 lines
11 KiB
Python
Raw Normal View History

2014-08-12 16:10:52 +02:00
# -*- coding: utf-8 -*-
2016-01-07 05:26:29 +01:00
# Copyright 2014-2016 OpenMarket Ltd
2014-08-12 16:10:52 +02:00
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2014-08-14 19:40:50 +02:00
from twisted.internet import defer
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
from ._base import Cache
2014-08-12 16:10:52 +02:00
from .directory import DirectoryStore
2015-03-20 14:52:56 +01:00
from .events import EventsStore
2016-02-15 18:10:40 +01:00
from .presence import PresenceStore, UserPresenceState
2014-08-12 16:10:52 +02:00
from .profile import ProfileStore
from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
from .stream import StreamStore
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
from .pusher import PusherStore
2015-01-22 18:38:53 +01:00
from .push_rule import PushRuleStore
from .media_repository import MediaRepositoryStore
from .rejections import RejectionsStore
from .event_push_actions import EventPushActionsStore
2014-10-14 17:59:51 +02:00
from .state import StateStore
from .signatures import SignatureStore
from .filtering import FilteringStore
2015-07-06 19:46:47 +02:00
from .end_to_end_keys import EndToEndKeyStore
2015-07-07 11:55:31 +02:00
from .receipts import ReceiptsStore
2015-10-09 16:48:31 +02:00
from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
2015-07-07 11:55:31 +02:00
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
2016-02-15 18:10:40 +01:00
from synapse.api.constants import PresenceState
2016-01-29 15:37:59 +01:00
from synapse.util.caches.stream_change_cache import StreamChangeCache
2014-08-19 15:20:03 +02:00
import logging
2014-08-12 16:10:52 +02:00
2014-08-19 15:20:03 +02:00
logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
2016-02-02 18:18:50 +01:00
LAST_SEEN_GRANULARITY = 120 * 1000
2014-08-19 15:20:03 +02:00
class DataStore(RoomMemberStore, RoomStore,
2015-03-20 14:52:56 +01:00
RegistrationStore, StreamStore, ProfileStore,
2014-10-31 15:00:14 +01:00
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
2015-02-02 17:05:34 +01:00
ApplicationServiceStore,
EventFederationStore,
MediaRepositoryStore,
RejectionsStore,
FilteringStore,
PusherStore,
2015-03-20 14:52:56 +01:00
PushRuleStore,
2015-03-16 11:16:59 +01:00
ApplicationServiceTransactionStore,
2015-03-20 14:52:56 +01:00
EventsStore,
2015-07-07 11:55:31 +02:00
ReceiptsStore,
2015-07-06 19:46:47 +02:00
EndToEndKeyStore,
2015-10-09 16:48:31 +02:00
SearchStore,
TagsStore,
AccountDataStore,
EventPushActionsStore
):
2014-08-12 16:10:52 +02:00
2016-01-28 15:32:05 +01:00
def __init__(self, db_conn, hs):
self.hs = hs
2016-01-29 15:41:16 +01:00
self.database_engine = hs.database_engine
2014-08-12 16:10:52 +02:00
2016-01-28 15:32:05 +01:00
cur = db_conn.cursor()
try:
cur.execute("SELECT MIN(stream_ordering) FROM events",)
rows = cur.fetchall()
self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
self.min_stream_token = min(self.min_stream_token, -1)
finally:
cur.close()
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
2014-09-24 15:18:08 +02:00
)
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering"
)
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
self._account_data_id_gen = StreamIdGenerator(
db_conn, "account_data_max_stream_id", "stream_id"
)
2016-02-15 18:10:40 +01:00
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
2016-03-01 14:35:37 +01:00
self._push_rules_stream_id_gen = ChainedIdGenerator(
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
)
2016-02-15 18:10:40 +01:00
events_max = self._stream_id_gen.get_max_token()
2016-01-29 15:49:11 +01:00
event_cache_prefill, min_event_val = self._get_cache_dict(
2016-01-29 15:37:59 +01:00
db_conn, "events",
entity_column="room_id",
stream_column="stream_ordering",
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
2016-01-29 15:49:11 +01:00
"EventsRoomStreamChangeCache", min_event_val,
2016-01-29 15:37:59 +01:00
prefilled_cache=event_cache_prefill,
)
2016-01-29 17:52:48 +01:00
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max,
)
2016-02-15 18:10:40 +01:00
account_max = self._account_data_id_gen.get_max_token()
2016-01-29 15:37:59 +01:00
self._account_data_stream_cache = StreamChangeCache(
2016-01-29 15:53:59 +01:00
"AccountDataAndTagsChangeCache", account_max,
2016-01-29 15:37:59 +01:00
)
2016-02-15 18:10:40 +01:00
self.__presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self._get_cache_dict(
db_conn, "presence_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=self._presence_id_gen.get_max_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache", min_presence_val,
prefilled_cache=presence_cache_prefill
)
push_rules_prefill, push_rules_id = self._get_cache_dict(
2016-03-04 17:20:22 +01:00
db_conn, "push_rules_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=self._push_rules_stream_id_gen.get_max_token()[0],
)
self.push_rules_stream_cache = StreamChangeCache(
"PushRulesStreamChangeCache", push_rules_id,
prefilled_cache=push_rules_prefill,
)
super(DataStore, self).__init__(hs)
2016-02-15 18:10:40 +01:00
def take_presence_startup_info(self):
active_on_startup = self.__presence_on_startup
self.__presence_on_startup = None
return active_on_startup
2016-01-29 15:37:59 +01:00
def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
2016-01-29 16:39:17 +01:00
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
# It doesn't really matter how many we get, the StreamChangeCache will
# do the right thing to ensure it respects the max size of cache.
2016-01-29 15:37:59 +01:00
sql = (
"SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
2016-01-29 15:42:01 +01:00
" WHERE %(stream)s > ? - 100000"
2016-01-29 15:37:59 +01:00
" GROUP BY %(entity)s"
) % {
"table": table,
"entity": entity_column,
"stream": stream_column,
}
2016-01-29 15:41:16 +01:00
sql = self.database_engine.convert_param_style(sql)
2016-01-29 15:37:59 +01:00
txn = db_conn.cursor()
txn.execute(sql, (int(max_value),))
rows = txn.fetchall()
2016-02-15 18:10:40 +01:00
txn.close()
2016-01-29 15:37:59 +01:00
2016-01-29 15:49:11 +01:00
cache = {
row[0]: int(row[1])
2016-01-29 15:37:59 +01:00
for row in rows
}
2016-01-29 15:49:11 +01:00
if cache:
min_val = min(cache.values())
else:
min_val = max_value
return cache, min_val
2016-02-15 18:10:40 +01:00
def _get_active_presence(self, db_conn):
"""Fetch non-offline presence from the database so that we can register
the appropriate time outs.
"""
sql = (
2016-02-18 11:11:43 +01:00
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
2016-02-15 18:10:40 +01:00
" WHERE state != ?"
)
sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor()
txn.execute(sql, (PresenceState.OFFLINE,))
rows = self.cursor_to_dict(txn)
2016-02-18 17:39:28 +01:00
txn.close()
2016-02-15 18:10:40 +01:00
for row in rows:
row["currently_active"] = bool(row["currently_active"])
return [UserPresenceState(**row) for row in rows]
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
key = (user.to_string(), access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
2014-08-19 15:20:03 +02:00
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
self.client_ip_last_seen.prefill(key, now)
2015-05-01 11:46:48 +02:00
# It's safe not to lock here: a) no unique constraint,
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert(
"user_ips",
2015-03-24 17:17:39 +01:00
keyvalues={
2015-04-14 14:54:09 +02:00
"user_id": user.to_string(),
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
2015-03-24 17:17:39 +01:00
},
values={
"last_seen": now,
},
desc="insert_client_ip",
2015-05-01 11:46:48 +02:00
lock=False,
)
@defer.inlineCallbacks
def count_daily_users(self):
2015-09-22 14:47:40 +02:00
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
def _count_users(txn):
txn.execute(
"SELECT COUNT(DISTINCT user_id) AS users"
" FROM user_ips"
" WHERE last_seen > ?",
# This is close enough to a day for our purposes.
(int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
)
rows = self.cursor_to_dict(txn)
if rows:
return rows[0]["users"]
return 0
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
2014-09-29 15:59:52 +02:00
def get_user_ip_and_agents(self, user):
return self._simple_select_list(
table="user_ips",
2015-04-14 14:54:09 +02:00
keyvalues={"user_id": user.to_string()},
2014-09-29 15:59:52 +02:00
retcols=[
"access_token", "ip", "user_agent", "last_seen"
2014-09-29 15:59:52 +02:00
],
desc="get_user_ip_and_agents",
2014-09-29 15:59:52 +02:00
)
def are_all_users_on_domain(txn, database_engine, domain):
sql = database_engine.convert_param_style(
"SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
)
pat = "%:" + domain
txn.execute(sql, (pat,))
num_not_matching = txn.fetchall()[0][0]
if num_not_matching == 0:
return True
2015-04-27 12:49:18 +02:00
return False