Move storage classes into a main "data store".
This is in preparation for having multiple data stores that offer different functionality, e.g. splitting out state or event storage.pull/6231/head
parent
1ee97cbd01
commit
c66a06ac6b
|
@ -56,8 +56,8 @@ from synapse.rest.client.v1.room import (
|
|||
RoomStateEventRestServlet,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
|
|
@ -39,8 +39,8 @@ from synapse.replication.tcp.client import ReplicationClientHandler
|
|||
from synapse.rest.admin import register_servlets_for_media_repo
|
||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.media_repository import MediaRepositoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
|
|
@ -54,8 +54,8 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
|
|||
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
|
||||
from synapse.rest.client.v2_alpha import sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.presence import UserPresenceState
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.stringutils import random_string
|
||||
|
|
|
@ -42,8 +42,8 @@ from synapse.replication.tcp.streams.events import (
|
|||
)
|
||||
from synapse.rest.client.v2_alpha import user_directory
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.manhole import manhole
|
||||
|
|
|
@ -30,7 +30,7 @@ from synapse.federation.units import Edu
|
|||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.metrics import sent_transactions_counter
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage import UserPresenceState
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
|
||||
# This is defined in the Matrix spec and enforced by the receiver.
|
||||
|
|
|
@ -16,8 +16,8 @@
|
|||
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage.account_data import AccountDataWorkerStore
|
||||
from synapse.storage.tags import TagsWorkerStore
|
||||
from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore
|
||||
from synapse.storage.data_stores.main.tags import TagsWorkerStore
|
||||
|
||||
|
||||
class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.appservice import (
|
||||
from synapse.storage.data_stores.main.appservice import (
|
||||
ApplicationServiceTransactionWorkerStore,
|
||||
ApplicationServiceWorkerStore,
|
||||
)
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
|
||||
from synapse.storage.data_stores.main.client_ips import LAST_SEEN_GRANULARITY
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches.descriptors import Cache
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage.deviceinbox import DeviceInboxWorkerStore
|
||||
from synapse.storage.data_stores.main.deviceinbox import DeviceInboxWorkerStore
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.storage.devices import DeviceWorkerStore
|
||||
from synapse.storage.end_to_end_keys import EndToEndKeyWorkerStore
|
||||
from synapse.storage.data_stores.main.devices import DeviceWorkerStore
|
||||
from synapse.storage.data_stores.main.end_to_end_keys import EndToEndKeyWorkerStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.directory import DirectoryWorkerStore
|
||||
from synapse.storage.data_stores.main.directory import DirectoryWorkerStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
|
||||
|
|
|
@ -20,15 +20,17 @@ from synapse.replication.tcp.streams.events import (
|
|||
EventsStreamCurrentStateRow,
|
||||
EventsStreamEventRow,
|
||||
)
|
||||
from synapse.storage.event_federation import EventFederationWorkerStore
|
||||
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.storage.relations import RelationsWorkerStore
|
||||
from synapse.storage.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.signatures import SignatureWorkerStore
|
||||
from synapse.storage.state import StateGroupWorkerStore
|
||||
from synapse.storage.stream import StreamWorkerStore
|
||||
from synapse.storage.user_erasure_store import UserErasureWorkerStore
|
||||
from synapse.storage.data_stores.main.event_federation import EventFederationWorkerStore
|
||||
from synapse.storage.data_stores.main.event_push_actions import (
|
||||
EventPushActionsWorkerStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.data_stores.main.relations import RelationsWorkerStore
|
||||
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.data_stores.main.signatures import SignatureWorkerStore
|
||||
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
|
||||
from synapse.storage.data_stores.main.stream import StreamWorkerStore
|
||||
from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.filtering import FilteringStore
|
||||
from synapse.storage.data_stores.main.filtering import FilteringStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage import KeyStore
|
||||
from synapse.storage.data_stores.main.keys import KeyStore
|
||||
|
||||
# KeyStore isn't really safe to use from a worker, but for now we do so and hope that
|
||||
# the races it creates aren't too bad.
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.presence import PresenceStore
|
||||
from synapse.storage.data_stores.main.presence import PresenceStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from ._base import BaseSlavedStore, __func__
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.storage.profile import ProfileWorkerStore
|
||||
from synapse.storage.data_stores.main.profile import ProfileWorkerStore
|
||||
|
||||
|
||||
class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.push_rule import PushRulesWorkerStore
|
||||
from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
|
||||
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
from .events import SlavedEventStore
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.pusher import PusherWorkerStore
|
||||
from synapse.storage.data_stores.main.pusher import PusherWorkerStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.receipts import ReceiptsWorkerStore
|
||||
from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.registration import RegistrationWorkerStore
|
||||
from synapse.storage.data_stores.main.registration import RegistrationWorkerStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.room import RoomWorkerStore
|
||||
from synapse.storage.data_stores.main.room import RoomWorkerStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
from ._slaved_id_tracker import SlavedIdTracker
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.transactions import TransactionStore
|
||||
from synapse.storage.data_stores.main.transactions import TransactionStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
|
||||
|
|
|
@ -14,509 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import calendar
|
||||
import logging
|
||||
import time
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import PresenceState
|
||||
from synapse.storage.devices import DeviceStore
|
||||
from synapse.storage.user_erasure_store import UserErasureStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from .account_data import AccountDataStore
|
||||
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
|
||||
from .client_ips import ClientIpStore
|
||||
from .deviceinbox import DeviceInboxStore
|
||||
from .directory import DirectoryStore
|
||||
from .e2e_room_keys import EndToEndRoomKeyStore
|
||||
from .end_to_end_keys import EndToEndKeyStore
|
||||
from .engines import PostgresEngine
|
||||
from .event_federation import EventFederationStore
|
||||
from .event_push_actions import EventPushActionsStore
|
||||
from .events import EventsStore
|
||||
from .events_bg_updates import EventsBackgroundUpdatesStore
|
||||
from .filtering import FilteringStore
|
||||
from .group_server import GroupServerStore
|
||||
from .keys import KeyStore
|
||||
from .media_repository import MediaRepositoryStore
|
||||
from .monthly_active_users import MonthlyActiveUsersStore
|
||||
from .openid import OpenIdStore
|
||||
from .presence import PresenceStore, UserPresenceState
|
||||
from .profile import ProfileStore
|
||||
from .push_rule import PushRuleStore
|
||||
from .pusher import PusherStore
|
||||
from .receipts import ReceiptsStore
|
||||
from .registration import RegistrationStore
|
||||
from .rejections import RejectionsStore
|
||||
from .relations import RelationsStore
|
||||
from .room import RoomStore
|
||||
from .roommember import RoomMemberStore
|
||||
from .search import SearchStore
|
||||
from .signatures import SignatureStore
|
||||
from .state import StateStore
|
||||
from .stats import StatsStore
|
||||
from .stream import StreamStore
|
||||
from .tags import TagsStore
|
||||
from .transactions import TransactionStore
|
||||
from .user_directory import UserDirectoryStore
|
||||
from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataStore(
|
||||
EventsBackgroundUpdatesStore,
|
||||
RoomMemberStore,
|
||||
RoomStore,
|
||||
RegistrationStore,
|
||||
StreamStore,
|
||||
ProfileStore,
|
||||
PresenceStore,
|
||||
TransactionStore,
|
||||
DirectoryStore,
|
||||
KeyStore,
|
||||
StateStore,
|
||||
SignatureStore,
|
||||
ApplicationServiceStore,
|
||||
EventsStore,
|
||||
EventFederationStore,
|
||||
MediaRepositoryStore,
|
||||
RejectionsStore,
|
||||
FilteringStore,
|
||||
PusherStore,
|
||||
PushRuleStore,
|
||||
ApplicationServiceTransactionStore,
|
||||
ReceiptsStore,
|
||||
EndToEndKeyStore,
|
||||
EndToEndRoomKeyStore,
|
||||
SearchStore,
|
||||
TagsStore,
|
||||
AccountDataStore,
|
||||
EventPushActionsStore,
|
||||
OpenIdStore,
|
||||
ClientIpStore,
|
||||
DeviceStore,
|
||||
DeviceInboxStore,
|
||||
UserDirectoryStore,
|
||||
GroupServerStore,
|
||||
UserErasureStore,
|
||||
MonthlyActiveUsersStore,
|
||||
StatsStore,
|
||||
RelationsStore,
|
||||
):
|
||||
def __init__(self, db_conn, hs):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
self.database_engine = hs.database_engine
|
||||
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
extra_tables=[("local_invites", "stream_id")],
|
||||
)
|
||||
self._backfill_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
step=-1,
|
||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||
)
|
||||
self._presence_id_gen = StreamIdGenerator(
|
||||
db_conn, "presence_stream", "stream_id"
|
||||
)
|
||||
self._device_inbox_id_gen = StreamIdGenerator(
|
||||
db_conn, "device_max_stream_id", "stream_id"
|
||||
)
|
||||
self._public_room_id_gen = StreamIdGenerator(
|
||||
db_conn, "public_room_list_stream", "stream_id"
|
||||
)
|
||||
self._device_list_id_gen = StreamIdGenerator(
|
||||
db_conn, "device_lists_stream", "stream_id"
|
||||
)
|
||||
|
||||
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
||||
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
|
||||
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
||||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
||||
)
|
||||
self._pushers_id_gen = StreamIdGenerator(
|
||||
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
|
||||
)
|
||||
self._group_updates_id_gen = StreamIdGenerator(
|
||||
db_conn, "local_group_updates", "stream_id"
|
||||
)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
self._cache_id_gen = StreamIdGenerator(
|
||||
db_conn, "cache_invalidation_stream", "stream_id"
|
||||
)
|
||||
else:
|
||||
self._cache_id_gen = None
|
||||
|
||||
self._presence_on_startup = self._get_active_presence(db_conn)
|
||||
|
||||
presence_cache_prefill, min_presence_val = self._get_cache_dict(
|
||||
db_conn,
|
||||
"presence_stream",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=self._presence_id_gen.get_current_token(),
|
||||
)
|
||||
self.presence_stream_cache = StreamChangeCache(
|
||||
"PresenceStreamChangeCache",
|
||||
min_presence_val,
|
||||
prefilled_cache=presence_cache_prefill,
|
||||
)
|
||||
|
||||
max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
|
||||
device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"device_inbox",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=max_device_inbox_id,
|
||||
limit=1000,
|
||||
)
|
||||
self._device_inbox_stream_cache = StreamChangeCache(
|
||||
"DeviceInboxStreamChangeCache",
|
||||
min_device_inbox_id,
|
||||
prefilled_cache=device_inbox_prefill,
|
||||
)
|
||||
# The federation outbox and the local device inbox uses the same
|
||||
# stream_id generator.
|
||||
device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"device_federation_outbox",
|
||||
entity_column="destination",
|
||||
stream_column="stream_id",
|
||||
max_value=max_device_inbox_id,
|
||||
limit=1000,
|
||||
)
|
||||
self._device_federation_outbox_stream_cache = StreamChangeCache(
|
||||
"DeviceFederationOutboxStreamChangeCache",
|
||||
min_device_outbox_id,
|
||||
prefilled_cache=device_outbox_prefill,
|
||||
)
|
||||
|
||||
device_list_max = self._device_list_id_gen.get_current_token()
|
||||
self._device_list_stream_cache = StreamChangeCache(
|
||||
"DeviceListStreamChangeCache", device_list_max
|
||||
)
|
||||
self._device_list_federation_stream_cache = StreamChangeCache(
|
||||
"DeviceListFederationStreamChangeCache", device_list_max
|
||||
)
|
||||
|
||||
events_max = self._stream_id_gen.get_current_token()
|
||||
curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"current_state_delta_stream",
|
||||
entity_column="room_id",
|
||||
stream_column="stream_id",
|
||||
max_value=events_max, # As we share the stream id with events token
|
||||
limit=1000,
|
||||
)
|
||||
self._curr_state_delta_stream_cache = StreamChangeCache(
|
||||
"_curr_state_delta_stream_cache",
|
||||
min_curr_state_delta_id,
|
||||
prefilled_cache=curr_state_delta_prefill,
|
||||
)
|
||||
|
||||
_group_updates_prefill, min_group_updates_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"local_group_updates",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=self._group_updates_id_gen.get_current_token(),
|
||||
limit=1000,
|
||||
)
|
||||
self._group_updates_stream_cache = StreamChangeCache(
|
||||
"_group_updates_stream_cache",
|
||||
min_group_updates_id,
|
||||
prefilled_cache=_group_updates_prefill,
|
||||
)
|
||||
|
||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||
|
||||
# Used in _generate_user_daily_visits to keep track of progress
|
||||
self._last_user_visit_update = self._get_start_of_day()
|
||||
|
||||
super(DataStore, self).__init__(db_conn, hs)
|
||||
|
||||
def take_presence_startup_info(self):
|
||||
active_on_startup = self._presence_on_startup
|
||||
self._presence_on_startup = None
|
||||
return active_on_startup
|
||||
|
||||
def _get_active_presence(self, db_conn):
|
||||
"""Fetch non-offline presence from the database so that we can register
|
||||
the appropriate time outs.
|
||||
"""
|
||||
|
||||
sql = (
|
||||
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
|
||||
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
|
||||
" WHERE state != ?"
|
||||
)
|
||||
sql = self.database_engine.convert_param_style(sql)
|
||||
|
||||
txn = db_conn.cursor()
|
||||
txn.execute(sql, (PresenceState.OFFLINE,))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
txn.close()
|
||||
|
||||
for row in rows:
|
||||
row["currently_active"] = bool(row["currently_active"])
|
||||
|
||||
return [UserPresenceState(**row) for row in rows]
|
||||
|
||||
def count_daily_users(self):
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 24 hours.
|
||||
"""
|
||||
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
|
||||
return self.runInteraction("count_daily_users", self._count_users, yesterday)
|
||||
|
||||
def count_monthly_users(self):
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 30 days.
|
||||
Note this method is intended for phonehome metrics only and is different
|
||||
from the mau figure in synapse.storage.monthly_active_users which,
|
||||
amongst other things, includes a 3 day grace period before a user counts.
|
||||
"""
|
||||
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||
return self.runInteraction(
|
||||
"count_monthly_users", self._count_users, thirty_days_ago
|
||||
)
|
||||
|
||||
def _count_users(self, txn, time_from):
|
||||
"""
|
||||
Returns number of users seen in the past time_from period
|
||||
"""
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT user_id FROM user_ips
|
||||
WHERE last_seen > ?
|
||||
GROUP BY user_id
|
||||
) u
|
||||
"""
|
||||
txn.execute(sql, (time_from,))
|
||||
count, = txn.fetchone()
|
||||
return count
|
||||
|
||||
def count_r30_users(self):
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days ago
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days apart
|
||||
|
||||
Returns counts globaly for a given user as well as breaking
|
||||
by platform
|
||||
"""
|
||||
|
||||
def _count_r30_users(txn):
|
||||
thirty_days_in_secs = 86400 * 30
|
||||
now = int(self._clock.time())
|
||||
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
||||
|
||||
sql = """
|
||||
SELECT platform, COALESCE(count(*), 0) FROM (
|
||||
SELECT
|
||||
users.name, platform, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen,
|
||||
CASE
|
||||
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
||||
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
||||
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
||||
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
||||
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
||||
ELSE 'unknown'
|
||||
END
|
||||
AS platform
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND users.appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, platform, users.creation_ts
|
||||
) u GROUP BY platform
|
||||
"""
|
||||
|
||||
results = {}
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
for row in txn:
|
||||
if row[0] == "unknown":
|
||||
pass
|
||||
results[row[0]] = row[1]
|
||||
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT users.name, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, users.creation_ts
|
||||
) u
|
||||
"""
|
||||
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
count, = txn.fetchone()
|
||||
results["all"] = count
|
||||
|
||||
return results
|
||||
|
||||
return self.runInteraction("count_r30_users", _count_r30_users)
|
||||
|
||||
def _get_start_of_day(self):
|
||||
"""
|
||||
Returns millisecond unixtime for start of UTC day.
|
||||
"""
|
||||
now = time.gmtime()
|
||||
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
|
||||
return today_start * 1000
|
||||
|
||||
def generate_user_daily_visits(self):
|
||||
"""
|
||||
Generates daily visit data for use in cohort/ retention analysis
|
||||
"""
|
||||
|
||||
def _generate_user_daily_visits(txn):
|
||||
logger.info("Calling _generate_user_daily_visits")
|
||||
today_start = self._get_start_of_day()
|
||||
a_day_in_milliseconds = 24 * 60 * 60 * 1000
|
||||
now = self.clock.time_msec()
|
||||
|
||||
sql = """
|
||||
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
||||
SELECT u.user_id, u.device_id, ?
|
||||
FROM user_ips AS u
|
||||
LEFT JOIN (
|
||||
SELECT user_id, device_id, timestamp FROM user_daily_visits
|
||||
WHERE timestamp = ?
|
||||
) udv
|
||||
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
|
||||
INNER JOIN users ON users.name=u.user_id
|
||||
WHERE last_seen > ? AND last_seen <= ?
|
||||
AND udv.timestamp IS NULL AND users.is_guest=0
|
||||
AND users.appservice_id IS NULL
|
||||
GROUP BY u.user_id, u.device_id
|
||||
"""
|
||||
|
||||
# This means that the day has rolled over but there could still
|
||||
# be entries from the previous day. There is an edge case
|
||||
# where if the user logs in at 23:59 and overwrites their
|
||||
# last_seen at 00:01 then they will not be counted in the
|
||||
# previous day's stats - it is important that the query is run
|
||||
# often to minimise this case.
|
||||
if today_start > self._last_user_visit_update:
|
||||
yesterday_start = today_start - a_day_in_milliseconds
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
yesterday_start,
|
||||
yesterday_start,
|
||||
self._last_user_visit_update,
|
||||
today_start,
|
||||
),
|
||||
)
|
||||
self._last_user_visit_update = today_start
|
||||
|
||||
txn.execute(
|
||||
sql, (today_start, today_start, self._last_user_visit_update, now)
|
||||
)
|
||||
# Update _last_user_visit_update to now. The reason to do this
|
||||
# rather just clamping to the beginning of the day is to limit
|
||||
# the size of the join - meaning that the query can be run more
|
||||
# frequently
|
||||
self._last_user_visit_update = now
|
||||
|
||||
return self.runInteraction(
|
||||
"generate_user_daily_visits", _generate_user_daily_visits
|
||||
)
|
||||
|
||||
def get_users(self):
|
||||
"""Function to reterive a list of users in users table.
|
||||
|
||||
Args:
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
return self._simple_select_list(
|
||||
table="users",
|
||||
keyvalues={},
|
||||
retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
|
||||
desc="get_users",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_paginate(self, order, start, limit):
|
||||
"""Function to reterive a paginated list of users from
|
||||
users list. This will return a json object, which contains
|
||||
list of users and the total number of users in users table.
|
||||
|
||||
Args:
|
||||
order (str): column name to order the select by this column
|
||||
start (int): start number to begin the query from
|
||||
limit (int): number of rows to reterive
|
||||
Returns:
|
||||
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
|
||||
"""
|
||||
users = yield self.runInteraction(
|
||||
"get_users_paginate",
|
||||
self._simple_select_list_paginate_txn,
|
||||
table="users",
|
||||
keyvalues={"is_guest": False},
|
||||
orderby=order,
|
||||
start=start,
|
||||
limit=limit,
|
||||
retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
|
||||
)
|
||||
count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
|
||||
retval = {"users": users, "total": count}
|
||||
return retval
|
||||
|
||||
def search_users(self, term):
|
||||
"""Function to search users list for one or more users with
|
||||
the matched term.
|
||||
|
||||
Args:
|
||||
term (str): search term
|
||||
col (str): column to query term should be matched to
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
return self._simple_search_list(
|
||||
table="users",
|
||||
term=term,
|
||||
col="name",
|
||||
retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
|
||||
desc="search_users",
|
||||
)
|
||||
from synapse.storage.data_stores.main import DataStore # noqa: F401
|
||||
|
||||
|
||||
def are_all_users_on_domain(txn, database_engine, domain):
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
|
@ -0,0 +1,524 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import calendar
|
||||
import logging
|
||||
import time
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import PresenceState
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import (
|
||||
ChainedIdGenerator,
|
||||
IdGenerator,
|
||||
StreamIdGenerator,
|
||||
)
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
from .account_data import AccountDataStore
|
||||
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
|
||||
from .client_ips import ClientIpStore
|
||||
from .deviceinbox import DeviceInboxStore
|
||||
from .devices import DeviceStore
|
||||
from .directory import DirectoryStore
|
||||
from .e2e_room_keys import EndToEndRoomKeyStore
|
||||
from .end_to_end_keys import EndToEndKeyStore
|
||||
from .event_federation import EventFederationStore
|
||||
from .event_push_actions import EventPushActionsStore
|
||||
from .events import EventsStore
|
||||
from .events_bg_updates import EventsBackgroundUpdatesStore
|
||||
from .filtering import FilteringStore
|
||||
from .group_server import GroupServerStore
|
||||
from .keys import KeyStore
|
||||
from .media_repository import MediaRepositoryStore
|
||||
from .monthly_active_users import MonthlyActiveUsersStore
|
||||
from .openid import OpenIdStore
|
||||
from .presence import PresenceStore, UserPresenceState
|
||||
from .profile import ProfileStore
|
||||
from .push_rule import PushRuleStore
|
||||
from .pusher import PusherStore
|
||||
from .receipts import ReceiptsStore
|
||||
from .registration import RegistrationStore
|
||||
from .rejections import RejectionsStore
|
||||
from .relations import RelationsStore
|
||||
from .room import RoomStore
|
||||
from .roommember import RoomMemberStore
|
||||
from .search import SearchStore
|
||||
from .signatures import SignatureStore
|
||||
from .state import StateStore
|
||||
from .stats import StatsStore
|
||||
from .stream import StreamStore
|
||||
from .tags import TagsStore
|
||||
from .transactions import TransactionStore
|
||||
from .user_directory import UserDirectoryStore
|
||||
from .user_erasure_store import UserErasureStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataStore(
|
||||
EventsBackgroundUpdatesStore,
|
||||
RoomMemberStore,
|
||||
RoomStore,
|
||||
RegistrationStore,
|
||||
StreamStore,
|
||||
ProfileStore,
|
||||
PresenceStore,
|
||||
TransactionStore,
|
||||
DirectoryStore,
|
||||
KeyStore,
|
||||
StateStore,
|
||||
SignatureStore,
|
||||
ApplicationServiceStore,
|
||||
EventsStore,
|
||||
EventFederationStore,
|
||||
MediaRepositoryStore,
|
||||
RejectionsStore,
|
||||
FilteringStore,
|
||||
PusherStore,
|
||||
PushRuleStore,
|
||||
ApplicationServiceTransactionStore,
|
||||
ReceiptsStore,
|
||||
EndToEndKeyStore,
|
||||
EndToEndRoomKeyStore,
|
||||
SearchStore,
|
||||
TagsStore,
|
||||
AccountDataStore,
|
||||
EventPushActionsStore,
|
||||
OpenIdStore,
|
||||
ClientIpStore,
|
||||
DeviceStore,
|
||||
DeviceInboxStore,
|
||||
UserDirectoryStore,
|
||||
GroupServerStore,
|
||||
UserErasureStore,
|
||||
MonthlyActiveUsersStore,
|
||||
StatsStore,
|
||||
RelationsStore,
|
||||
):
|
||||
def __init__(self, db_conn, hs):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
self.database_engine = hs.database_engine
|
||||
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
extra_tables=[("local_invites", "stream_id")],
|
||||
)
|
||||
self._backfill_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
step=-1,
|
||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||
)
|
||||
self._presence_id_gen = StreamIdGenerator(
|
||||
db_conn, "presence_stream", "stream_id"
|
||||
)
|
||||
self._device_inbox_id_gen = StreamIdGenerator(
|
||||
db_conn, "device_max_stream_id", "stream_id"
|
||||
)
|
||||
self._public_room_id_gen = StreamIdGenerator(
|
||||
db_conn, "public_room_list_stream", "stream_id"
|
||||
)
|
||||
self._device_list_id_gen = StreamIdGenerator(
|
||||
db_conn, "device_lists_stream", "stream_id"
|
||||
)
|
||||
|
||||
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
|
||||
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
|
||||
self._push_rules_stream_id_gen = ChainedIdGenerator(
|
||||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
|
||||
)
|
||||
self._pushers_id_gen = StreamIdGenerator(
|
||||
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
|
||||
)
|
||||
self._group_updates_id_gen = StreamIdGenerator(
|
||||
db_conn, "local_group_updates", "stream_id"
|
||||
)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
self._cache_id_gen = StreamIdGenerator(
|
||||
db_conn, "cache_invalidation_stream", "stream_id"
|
||||
)
|
||||
else:
|
||||
self._cache_id_gen = None
|
||||
|
||||
self._presence_on_startup = self._get_active_presence(db_conn)
|
||||
|
||||
presence_cache_prefill, min_presence_val = self._get_cache_dict(
|
||||
db_conn,
|
||||
"presence_stream",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=self._presence_id_gen.get_current_token(),
|
||||
)
|
||||
self.presence_stream_cache = StreamChangeCache(
|
||||
"PresenceStreamChangeCache",
|
||||
min_presence_val,
|
||||
prefilled_cache=presence_cache_prefill,
|
||||
)
|
||||
|
||||
max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
|
||||
device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"device_inbox",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=max_device_inbox_id,
|
||||
limit=1000,
|
||||
)
|
||||
self._device_inbox_stream_cache = StreamChangeCache(
|
||||
"DeviceInboxStreamChangeCache",
|
||||
min_device_inbox_id,
|
||||
prefilled_cache=device_inbox_prefill,
|
||||
)
|
||||
# The federation outbox and the local device inbox uses the same
|
||||
# stream_id generator.
|
||||
device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"device_federation_outbox",
|
||||
entity_column="destination",
|
||||
stream_column="stream_id",
|
||||
max_value=max_device_inbox_id,
|
||||
limit=1000,
|
||||
)
|
||||
self._device_federation_outbox_stream_cache = StreamChangeCache(
|
||||
"DeviceFederationOutboxStreamChangeCache",
|
||||
min_device_outbox_id,
|
||||
prefilled_cache=device_outbox_prefill,
|
||||
)
|
||||
|
||||
device_list_max = self._device_list_id_gen.get_current_token()
|
||||
self._device_list_stream_cache = StreamChangeCache(
|
||||
"DeviceListStreamChangeCache", device_list_max
|
||||
)
|
||||
self._device_list_federation_stream_cache = StreamChangeCache(
|
||||
"DeviceListFederationStreamChangeCache", device_list_max
|
||||
)
|
||||
|
||||
events_max = self._stream_id_gen.get_current_token()
|
||||
curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"current_state_delta_stream",
|
||||
entity_column="room_id",
|
||||
stream_column="stream_id",
|
||||
max_value=events_max, # As we share the stream id with events token
|
||||
limit=1000,
|
||||
)
|
||||
self._curr_state_delta_stream_cache = StreamChangeCache(
|
||||
"_curr_state_delta_stream_cache",
|
||||
min_curr_state_delta_id,
|
||||
prefilled_cache=curr_state_delta_prefill,
|
||||
)
|
||||
|
||||
_group_updates_prefill, min_group_updates_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"local_group_updates",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=self._group_updates_id_gen.get_current_token(),
|
||||
limit=1000,
|
||||
)
|
||||
self._group_updates_stream_cache = StreamChangeCache(
|
||||
"_group_updates_stream_cache",
|
||||
min_group_updates_id,
|
||||
prefilled_cache=_group_updates_prefill,
|
||||
)
|
||||
|
||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||
|
||||
# Used in _generate_user_daily_visits to keep track of progress
|
||||
self._last_user_visit_update = self._get_start_of_day()
|
||||
|
||||
super(DataStore, self).__init__(db_conn, hs)
|
||||
|
||||
def take_presence_startup_info(self):
|
||||
active_on_startup = self._presence_on_startup
|
||||
self._presence_on_startup = None
|
||||
return active_on_startup
|
||||
|
||||
def _get_active_presence(self, db_conn):
|
||||
"""Fetch non-offline presence from the database so that we can register
|
||||
the appropriate time outs.
|
||||
"""
|
||||
|
||||
sql = (
|
||||
"SELECT user_id, state, last_active_ts, last_federation_update_ts,"
|
||||
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
|
||||
" WHERE state != ?"
|
||||
)
|
||||
sql = self.database_engine.convert_param_style(sql)
|
||||
|
||||
txn = db_conn.cursor()
|
||||
txn.execute(sql, (PresenceState.OFFLINE,))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
txn.close()
|
||||
|
||||
for row in rows:
|
||||
row["currently_active"] = bool(row["currently_active"])
|
||||
|
||||
return [UserPresenceState(**row) for row in rows]
|
||||
|
||||
def count_daily_users(self):
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 24 hours.
|
||||
"""
|
||||
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
|
||||
return self.runInteraction("count_daily_users", self._count_users, yesterday)
|
||||
|
||||
def count_monthly_users(self):
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 30 days.
|
||||
Note this method is intended for phonehome metrics only and is different
|
||||
from the mau figure in synapse.storage.monthly_active_users which,
|
||||
amongst other things, includes a 3 day grace period before a user counts.
|
||||
"""
|
||||
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||
return self.runInteraction(
|
||||
"count_monthly_users", self._count_users, thirty_days_ago
|
||||
)
|
||||
|
||||
def _count_users(self, txn, time_from):
|
||||
"""
|
||||
Returns number of users seen in the past time_from period
|
||||
"""
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT user_id FROM user_ips
|
||||
WHERE last_seen > ?
|
||||
GROUP BY user_id
|
||||
) u
|
||||
"""
|
||||
txn.execute(sql, (time_from,))
|
||||
count, = txn.fetchone()
|
||||
return count
|
||||
|
||||
def count_r30_users(self):
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days ago
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days apart
|
||||
|
||||
Returns counts globaly for a given user as well as breaking
|
||||
by platform
|
||||
"""
|
||||
|
||||
def _count_r30_users(txn):
|
||||
thirty_days_in_secs = 86400 * 30
|
||||
now = int(self._clock.time())
|
||||
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
||||
|
||||
sql = """
|
||||
SELECT platform, COALESCE(count(*), 0) FROM (
|
||||
SELECT
|
||||
users.name, platform, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen,
|
||||
CASE
|
||||
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
||||
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
||||
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
||||
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
||||
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
||||
ELSE 'unknown'
|
||||
END
|
||||
AS platform
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND users.appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, platform, users.creation_ts
|
||||
) u GROUP BY platform
|
||||
"""
|
||||
|
||||
results = {}
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
for row in txn:
|
||||
if row[0] == "unknown":
|
||||
pass
|
||||
results[row[0]] = row[1]
|
||||
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT users.name, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, users.creation_ts
|
||||
) u
|
||||
"""
|
||||
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
count, = txn.fetchone()
|
||||
results["all"] = count
|
||||
|
||||
return results
|
||||
|
||||
return self.runInteraction("count_r30_users", _count_r30_users)
|
||||
|
||||
def _get_start_of_day(self):
|
||||
"""
|
||||
Returns millisecond unixtime for start of UTC day.
|
||||
"""
|
||||
now = time.gmtime()
|
||||
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
|
||||
return today_start * 1000
|
||||
|
||||
def generate_user_daily_visits(self):
|
||||
"""
|
||||
Generates daily visit data for use in cohort/ retention analysis
|
||||
"""
|
||||
|
||||
def _generate_user_daily_visits(txn):
|
||||
logger.info("Calling _generate_user_daily_visits")
|
||||
today_start = self._get_start_of_day()
|
||||
a_day_in_milliseconds = 24 * 60 * 60 * 1000
|
||||
now = self.clock.time_msec()
|
||||
|
||||
sql = """
|
||||
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
||||
SELECT u.user_id, u.device_id, ?
|
||||
FROM user_ips AS u
|
||||
LEFT JOIN (
|
||||
SELECT user_id, device_id, timestamp FROM user_daily_visits
|
||||
WHERE timestamp = ?
|
||||
) udv
|
||||
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
|
||||
INNER JOIN users ON users.name=u.user_id
|
||||
WHERE last_seen > ? AND last_seen <= ?
|
||||
AND udv.timestamp IS NULL AND users.is_guest=0
|
||||
AND users.appservice_id IS NULL
|
||||
GROUP BY u.user_id, u.device_id
|
||||
"""
|
||||
|
||||
# This means that the day has rolled over but there could still
|
||||
# be entries from the previous day. There is an edge case
|
||||
# where if the user logs in at 23:59 and overwrites their
|
||||
# last_seen at 00:01 then they will not be counted in the
|
||||
# previous day's stats - it is important that the query is run
|
||||
# often to minimise this case.
|
||||
if today_start > self._last_user_visit_update:
|
||||
yesterday_start = today_start - a_day_in_milliseconds
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
yesterday_start,
|
||||
yesterday_start,
|
||||
self._last_user_visit_update,
|
||||
today_start,
|
||||
),
|
||||
)
|
||||
self._last_user_visit_update = today_start
|
||||
|
||||
txn.execute(
|
||||
sql, (today_start, today_start, self._last_user_visit_update, now)
|
||||
)
|
||||
# Update _last_user_visit_update to now. The reason to do this
|
||||
# rather just clamping to the beginning of the day is to limit
|
||||
# the size of the join - meaning that the query can be run more
|
||||
# frequently
|
||||
self._last_user_visit_update = now
|
||||
|
||||
return self.runInteraction(
|
||||
"generate_user_daily_visits", _generate_user_daily_visits
|
||||
)
|
||||
|
||||
def get_users(self):
|
||||
"""Function to reterive a list of users in users table.
|
||||
|
||||
Args:
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
return self._simple_select_list(
|
||||
table="users",
|
||||
keyvalues={},
|
||||
retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
|
||||
desc="get_users",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_paginate(self, order, start, limit):
|
||||
"""Function to reterive a paginated list of users from
|
||||
users list. This will return a json object, which contains
|
||||
list of users and the total number of users in users table.
|
||||
|
||||
Args:
|
||||
order (str): column name to order the select by this column
|
||||
start (int): start number to begin the query from
|
||||
limit (int): number of rows to reterive
|
||||
Returns:
|
||||
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
|
||||
"""
|
||||
users = yield self.runInteraction(
|
||||
"get_users_paginate",
|
||||
self._simple_select_list_paginate_txn,
|
||||
table="users",
|
||||
keyvalues={"is_guest": False},
|
||||
orderby=order,
|
||||
start=start,
|
||||
limit=limit,
|
||||
retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
|
||||
)
|
||||
count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
|
||||
retval = {"users": users, "total": count}
|
||||
return retval
|
||||
|
||||
def search_users(self, term):
|
||||
"""Function to search users list for one or more users with
|
||||
the matched term.
|
||||
|
||||
Args:
|
||||
term (str): search term
|
||||
col (str): column to query term should be matched to
|
||||
Returns:
|
||||
defer.Deferred: resolves to list[dict[str, Any]]
|
||||
"""
|
||||
return self._simple_search_list(
|
||||
table="users",
|
||||
term=term,
|
||||
col="name",
|
||||
retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
|
||||
desc="search_users",
|
||||
)
|
|
@ -22,9 +22,8 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.appservice import AppServiceTransaction
|
||||
from synapse.config.appservice import load_appservices
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -20,11 +20,10 @@ from six import iteritems
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage import background_updates
|
||||
from synapse.storage._base import Cache
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
|
||||
from . import background_updates
|
||||
from ._base import Cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
|
@ -18,10 +18,9 @@ from collections import namedtuple
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
RoomAliasMapping = namedtuple("RoomAliasMapping", ("room_id", "room_alias", "servers"))
|
||||
|
||||
|
|
@ -19,8 +19,7 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
||||
|
||||
class EndToEndRoomKeyStore(SQLBaseStore):
|
|
@ -19,10 +19,9 @@ from canonicaljson import encode_canonical_json
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from ._base import SQLBaseStore, db_to_json
|
||||
|
||||
|
||||
class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
@trace
|
|
@ -26,8 +26,8 @@ from twisted.internet import defer
|
|||
from synapse.api.errors import StoreError
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.storage.signatures import SignatureWorkerStore
|
||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.data_stores.main.signatures import SignatureWorkerStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
|
@ -41,9 +41,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
|||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage._base import make_in_list_sql_clause
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.storage.event_federation import EventFederationStore
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.storage.state import StateGroupWorkerStore
|
||||
from synapse.storage.data_stores.main.event_federation import EventFederationStore
|
||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.util import batch_iter
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
|
@ -16,10 +16,9 @@
|
|||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
|
||||
from ._base import SQLBaseStore, db_to_json
|
||||
|
||||
|
||||
class FilteringStore(SQLBaseStore):
|
||||
@cachedInlineCallbacks(num_args=2)
|
|
@ -19,8 +19,7 @@ from canonicaljson import json
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
||||
# The category ID for the "default" category. We don't store as null in the
|
||||
# database to avoid the fun of null != null
|
|
@ -0,0 +1,214 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 New Vector Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.util import batch_iter
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
|
||||
# despite being deprecated and removed in favor of memoryview
|
||||
if six.PY2:
|
||||
db_binary_type = six.moves.builtins.buffer
|
||||
else:
|
||||
db_binary_type = memoryview
|
||||
|
||||
|
||||
class KeyStore(SQLBaseStore):
|
||||
"""Persistence for signature verification keys
|
||||
"""
|
||||
|
||||
@cached()
|
||||
def _get_server_verify_key(self, server_name_and_key_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="_get_server_verify_key", list_name="server_name_and_key_ids"
|
||||
)
|
||||
def get_server_verify_keys(self, server_name_and_key_ids):
|
||||
"""
|
||||
Args:
|
||||
server_name_and_key_ids (iterable[Tuple[str, str]]):
|
||||
iterable of (server_name, key-id) tuples to fetch keys for
|
||||
|
||||
Returns:
|
||||
Deferred: resolves to dict[Tuple[str, str], FetchKeyResult|None]:
|
||||
map from (server_name, key_id) -> FetchKeyResult, or None if the key is
|
||||
unknown
|
||||
"""
|
||||
keys = {}
|
||||
|
||||
def _get_keys(txn, batch):
|
||||
"""Processes a batch of keys to fetch, and adds the result to `keys`."""
|
||||
|
||||
# batch_iter always returns tuples so it's safe to do len(batch)
|
||||
sql = (
|
||||
"SELECT server_name, key_id, verify_key, ts_valid_until_ms "
|
||||
"FROM server_signature_keys WHERE 1=0"
|
||||
) + " OR (server_name=? AND key_id=?)" * len(batch)
|
||||
|
||||
txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
|
||||
|
||||
for row in txn:
|
||||
server_name, key_id, key_bytes, ts_valid_until_ms = row
|
||||
|
||||
if ts_valid_until_ms is None:
|
||||
# Old keys may be stored with a ts_valid_until_ms of null,
|
||||
# in which case we treat this as if it was set to `0`, i.e.
|
||||
# it won't match key requests that define a minimum
|
||||
# `ts_valid_until_ms`.
|
||||
ts_valid_until_ms = 0
|
||||
|
||||
res = FetchKeyResult(
|
||||
verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
|
||||
valid_until_ts=ts_valid_until_ms,
|
||||
)
|
||||
keys[(server_name, key_id)] = res
|
||||
|
||||
def _txn(txn):
|
||||
for batch in batch_iter(server_name_and_key_ids, 50):
|
||||
_get_keys(txn, batch)
|
||||
return keys
|
||||
|
||||
return self.runInteraction("get_server_verify_keys", _txn)
|
||||
|
||||
def store_server_verify_keys(self, from_server, ts_added_ms, verify_keys):
|
||||
"""Stores NACL verification keys for remote servers.
|
||||
Args:
|
||||
from_server (str): Where the verification keys were looked up
|
||||
ts_added_ms (int): The time to record that the key was added
|
||||
verify_keys (iterable[tuple[str, str, FetchKeyResult]]):
|
||||
keys to be stored. Each entry is a triplet of
|
||||
(server_name, key_id, key).
|
||||
"""
|
||||
key_values = []
|
||||
value_values = []
|
||||
invalidations = []
|
||||
for server_name, key_id, fetch_result in verify_keys:
|
||||
key_values.append((server_name, key_id))
|
||||
value_values.append(
|
||||
(
|
||||
from_server,
|
||||
ts_added_ms,
|
||||
fetch_result.valid_until_ts,
|
||||
db_binary_type(fetch_result.verify_key.encode()),
|
||||
)
|
||||
)
|
||||
# invalidate takes a tuple corresponding to the params of
|
||||
# _get_server_verify_key. _get_server_verify_key only takes one
|
||||
# param, which is itself the 2-tuple (server_name, key_id).
|
||||
invalidations.append((server_name, key_id))
|
||||
|
||||
def _invalidate(res):
|
||||
f = self._get_server_verify_key.invalidate
|
||||
for i in invalidations:
|
||||
f((i,))
|
||||
return res
|
||||
|
||||
return self.runInteraction(
|
||||
"store_server_verify_keys",
|
||||
self._simple_upsert_many_txn,
|
||||
table="server_signature_keys",
|
||||
key_names=("server_name", "key_id"),
|
||||
key_values=key_values,
|
||||
value_names=(
|
||||
"from_server",
|
||||
"ts_added_ms",
|
||||
"ts_valid_until_ms",
|
||||
"verify_key",
|
||||
),
|
||||
value_values=value_values,
|
||||
).addCallback(_invalidate)
|
||||
|
||||
def store_server_keys_json(
|
||||
self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes
|
||||
):
|
||||
"""Stores the JSON bytes for a set of keys from a server
|
||||
The JSON should be signed by the originating server, the intermediate
|
||||
server, and by this server. Updates the value for the
|
||||
(server_name, key_id, from_server) triplet if one already existed.
|
||||
Args:
|
||||
server_name (str): The name of the server.
|
||||
key_id (str): The identifer of the key this JSON is for.
|
||||
from_server (str): The server this JSON was fetched from.
|
||||
ts_now_ms (int): The time now in milliseconds.
|
||||
ts_valid_until_ms (int): The time when this json stops being valid.
|
||||
key_json (bytes): The encoded JSON.
|
||||
"""
|
||||
return self._simple_upsert(
|
||||
table="server_keys_json",
|
||||
keyvalues={
|
||||
"server_name": server_name,
|
||||
"key_id": key_id,
|
||||
"from_server": from_server,
|
||||
},
|
||||
values={
|
||||
"server_name": server_name,
|
||||
"key_id": key_id,
|
||||
"from_server": from_server,
|
||||
"ts_added_ms": ts_now_ms,
|
||||
"ts_valid_until_ms": ts_expires_ms,
|
||||
"key_json": db_binary_type(key_json_bytes),
|
||||
},
|
||||
desc="store_server_keys_json",
|
||||
)
|
||||
|
||||
def get_server_keys_json(self, server_keys):
|
||||
"""Retrive the key json for a list of server_keys and key ids.
|
||||
If no keys are found for a given server, key_id and source then
|
||||
that server, key_id, and source triplet entry will be an empty list.
|
||||
The JSON is returned as a byte array so that it can be efficiently
|
||||
used in an HTTP response.
|
||||
Args:
|
||||
server_keys (list): List of (server_name, key_id, source) triplets.
|
||||
Returns:
|
||||
Deferred[dict[Tuple[str, str, str|None], list[dict]]]:
|
||||
Dict mapping (server_name, key_id, source) triplets to lists of dicts
|
||||
"""
|
||||
|
||||
def _get_server_keys_json_txn(txn):
|
||||
results = {}
|
||||
for server_name, key_id, from_server in server_keys:
|
||||
keyvalues = {"server_name": server_name}
|
||||
if key_id is not None:
|
||||
keyvalues["key_id"] = key_id
|
||||
if from_server is not None:
|
||||
keyvalues["from_server"] = from_server
|
||||
rows = self._simple_select_list_txn(
|
||||
txn,
|
||||
"server_keys_json",
|
||||
keyvalues=keyvalues,
|
||||
retcols=(
|
||||
"key_id",
|
||||
"from_server",
|
||||
"ts_added_ms",
|
||||
"ts_valid_until_ms",
|
||||
"key_json",
|
||||
),
|
||||
)
|
||||
results[(server_name, key_id, from_server)] = rows
|
||||
return results
|
||||
|
||||
return self.runInteraction("get_server_keys_json", _get_server_keys_json_txn)
|
|
@ -16,10 +16,9 @@ import logging
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Number of msec of granularity to store the monthly_active_user timestamp
|
|
@ -1,4 +1,4 @@
|
|||
from ._base import SQLBaseStore
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
||||
|
||||
class OpenIdStore(SQLBaseStore):
|
|
@ -0,0 +1,150 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.util import batch_iter
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
|
||||
|
||||
class PresenceStore(SQLBaseStore):
|
||||
@defer.inlineCallbacks
|
||||
def update_presence(self, presence_states):
|
||||
stream_ordering_manager = self._presence_id_gen.get_next_mult(
|
||||
len(presence_states)
|
||||
)
|
||||
|
||||
with stream_ordering_manager as stream_orderings:
|
||||
yield self.runInteraction(
|
||||
"update_presence",
|
||||
self._update_presence_txn,
|
||||
stream_orderings,
|
||||
presence_states,
|
||||
)
|
||||
|
||||
return stream_orderings[-1], self._presence_id_gen.get_current_token()
|
||||
|
||||
def _update_presence_txn(self, txn, stream_orderings, presence_states):
|
||||
for stream_id, state in zip(stream_orderings, presence_states):
|
||||
txn.call_after(
|
||||
self.presence_stream_cache.entity_has_changed, state.user_id, stream_id
|
||||
)
|
||||
txn.call_after(self._get_presence_for_user.invalidate, (state.user_id,))
|
||||
|
||||
# Actually insert new rows
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="presence_stream",
|
||||
values=[
|
||||
{
|
||||
"stream_id": stream_id,
|
||||
"user_id": state.user_id,
|
||||
"state": state.state,
|
||||
"last_active_ts": state.last_active_ts,
|
||||
"last_federation_update_ts": state.last_federation_update_ts,
|
||||
"last_user_sync_ts": state.last_user_sync_ts,
|
||||
"status_msg": state.status_msg,
|
||||
"currently_active": state.currently_active,
|
||||
}
|
||||
for state in presence_states
|
||||
],
|
||||
)
|
||||
|
||||
# Delete old rows to stop database from getting really big
|
||||
sql = "DELETE FROM presence_stream WHERE stream_id < ? AND "
|
||||
|
||||
for states in batch_iter(presence_states, 50):
|
||||
clause, args = make_in_list_sql_clause(
|
||||
self.database_engine, "user_id", [s.user_id for s in states]
|
||||
)
|
||||
txn.execute(sql + clause, [stream_id] + list(args))
|
||||
|
||||
def get_all_presence_updates(self, last_id, current_id):
|
||||
if last_id == current_id:
|
||||
return defer.succeed([])
|
||||
|
||||
def get_all_presence_updates_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_id, user_id, state, last_active_ts,"
|
||||
" last_federation_update_ts, last_user_sync_ts, status_msg,"
|
||||
" currently_active"
|
||||
" FROM presence_stream"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id))
|
||||
return txn.fetchall()
|
||||
|
||||
return self.runInteraction(
|
||||
"get_all_presence_updates", get_all_presence_updates_txn
|
||||
)
|
||||
|
||||
@cached()
|
||||
def _get_presence_for_user(self, user_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="_get_presence_for_user",
|
||||
list_name="user_ids",
|
||||
num_args=1,
|
||||
inlineCallbacks=True,
|
||||
)
|
||||
def get_presence_for_users(self, user_ids):
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="presence_stream",
|
||||
column="user_id",
|
||||
iterable=user_ids,
|
||||
keyvalues={},
|
||||
retcols=(
|
||||
"user_id",
|
||||
"state",
|
||||
"last_active_ts",
|
||||
"last_federation_update_ts",
|
||||
"last_user_sync_ts",
|
||||
"status_msg",
|
||||
"currently_active",
|
||||
),
|
||||
desc="get_presence_for_users",
|
||||
)
|
||||
|
||||
for row in rows:
|
||||
row["currently_active"] = bool(row["currently_active"])
|
||||
|
||||
return {row["user_id"]: UserPresenceState(**row) for row in rows}
|
||||
|
||||
def get_current_presence_token(self):
|
||||
return self._presence_id_gen.get_current_token()
|
||||
|
||||
def allow_presence_visible(self, observed_localpart, observer_userid):
|
||||
return self._simple_insert(
|
||||
table="presence_allow_inbound",
|
||||
values={
|
||||
"observed_user_id": observed_localpart,
|
||||
"observer_user_id": observer_userid,
|
||||
},
|
||||
desc="allow_presence_visible",
|
||||
or_ignore=True,
|
||||
)
|
||||
|
||||
def disallow_presence_visible(self, observed_localpart, observer_userid):
|
||||
return self._simple_delete_one(
|
||||
table="presence_allow_inbound",
|
||||
keyvalues={
|
||||
"observed_user_id": observed_localpart,
|
||||
"observer_user_id": observer_userid,
|
||||
},
|
||||
desc="disallow_presence_visible",
|
||||
)
|
|
@ -16,9 +16,8 @@
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage.roommember import ProfileInfo
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.data_stores.main.roommember import ProfileInfo
|
||||
|
||||
|
||||
class ProfileWorkerStore(SQLBaseStore):
|
|
@ -0,0 +1,713 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.push.baserules import list_with_base_rules
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.data_stores.main.appservice import ApplicationServiceWorkerStore
|
||||
from synapse.storage.data_stores.main.pusher import PusherWorkerStore
|
||||
from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore
|
||||
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _load_rules(rawrules, enabled_map):
|
||||
ruleslist = []
|
||||
for rawrule in rawrules:
|
||||
rule = dict(rawrule)
|
||||
rule["conditions"] = json.loads(rawrule["conditions"])
|
||||
rule["actions"] = json.loads(rawrule["actions"])
|
||||
ruleslist.append(rule)
|
||||
|
||||
# We're going to be mutating this a lot, so do a deep copy
|
||||
rules = list(list_with_base_rules(ruleslist))
|
||||
|
||||
for i, rule in enumerate(rules):
|
||||
rule_id = rule["rule_id"]
|
||||
if rule_id in enabled_map:
|
||||
if rule.get("enabled", True) != bool(enabled_map[rule_id]):
|
||||
# Rules are cached across users.
|
||||
rule = dict(rule)
|
||||
rule["enabled"] = bool(enabled_map[rule_id])
|
||||
rules[i] = rule
|
||||
|
||||
return rules
|
||||
|
||||
|
||||
class PushRulesWorkerStore(
|
||||
ApplicationServiceWorkerStore,
|
||||
ReceiptsWorkerStore,
|
||||
PusherWorkerStore,
|
||||
RoomMemberWorkerStore,
|
||||
SQLBaseStore,
|
||||
):
|
||||
"""This is an abstract base class where subclasses must implement
|
||||
`get_max_push_rules_stream_id` which can be called in the initializer.
|
||||
"""
|
||||
|
||||
# This ABCMeta metaclass ensures that we cannot be instantiated without
|
||||
# the abstract methods being implemented.
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(PushRulesWorkerStore, self).__init__(db_conn, hs)
|
||||
|
||||
push_rules_prefill, push_rules_id = self._get_cache_dict(
|
||||
db_conn,
|
||||
"push_rules_stream",
|
||||
entity_column="user_id",
|
||||
stream_column="stream_id",
|
||||
max_value=self.get_max_push_rules_stream_id(),
|
||||
)
|
||||
|
||||
self.push_rules_stream_cache = StreamChangeCache(
|
||||
"PushRulesStreamChangeCache",
|
||||
push_rules_id,
|
||||
prefilled_cache=push_rules_prefill,
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_max_push_rules_stream_id(self):
|
||||
"""Get the position of the push rules stream.
|
||||
|
||||
Returns:
|
||||
int
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedInlineCallbacks(max_entries=5000)
|
||||
def get_push_rules_for_user(self, user_id):
|
||||
rows = yield self._simple_select_list(
|
||||
table="push_rules",
|
||||
keyvalues={"user_name": user_id},
|
||||
retcols=(
|
||||
"user_name",
|
||||
"rule_id",
|
||||
"priority_class",
|
||||
"priority",
|
||||
"conditions",
|
||||
"actions",
|
||||
),
|
||||
desc="get_push_rules_enabled_for_user",
|
||||
)
|
||||
|
||||
rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
|
||||
|
||||
enabled_map = yield self.get_push_rules_enabled_for_user(user_id)
|
||||
|
||||
rules = _load_rules(rows, enabled_map)
|
||||
|
||||
return rules
|
||||
|
||||
@cachedInlineCallbacks(max_entries=5000)
|
||||
def get_push_rules_enabled_for_user(self, user_id):
|
||||
results = yield self._simple_select_list(
|
||||
table="push_rules_enable",
|
||||
keyvalues={"user_name": user_id},
|
||||
retcols=("user_name", "rule_id", "enabled"),
|
||||
desc="get_push_rules_enabled_for_user",
|
||||
)
|
||||
return {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
|
||||
|
||||
def have_push_rules_changed_for_user(self, user_id, last_id):
|
||||
if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
|
||||
return defer.succeed(False)
|
||||
else:
|
||||
|
||||
def have_push_rules_changed_txn(txn):
|
||||
sql = (
|
||||
"SELECT COUNT(stream_id) FROM push_rules_stream"
|
||||
" WHERE user_id = ? AND ? < stream_id"
|
||||
)
|
||||
txn.execute(sql, (user_id, last_id))
|
||||
count, = txn.fetchone()
|
||||
return bool(count)
|
||||
|
||||
return self.runInteraction(
|
||||
"have_push_rules_changed", have_push_rules_changed_txn
|
||||
)
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="get_push_rules_for_user",
|
||||
list_name="user_ids",
|
||||
num_args=1,
|
||||
inlineCallbacks=True,
|
||||
)
|
||||
def bulk_get_push_rules(self, user_ids):
|
||||
if not user_ids:
|
||||
return {}
|
||||
|
||||
results = {user_id: [] for user_id in user_ids}
|
||||
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="push_rules",
|
||||
column="user_name",
|
||||
iterable=user_ids,
|
||||
retcols=("*",),
|
||||
desc="bulk_get_push_rules",
|
||||
)
|
||||
|
||||
rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
|
||||
|
||||
for row in rows:
|
||||
results.setdefault(row["user_name"], []).append(row)
|
||||
|
||||
enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
|
||||
|
||||
for user_id, rules in results.items():
|
||||
results[user_id] = _load_rules(rules, enabled_map_by_user.get(user_id, {}))
|
||||
|
||||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
|
||||
"""Copy a single push rule from one room to another for a specific user.
|
||||
|
||||
Args:
|
||||
new_room_id (str): ID of the new room.
|
||||
user_id (str): ID of user the push rule belongs to.
|
||||
rule (Dict): A push rule.
|
||||
"""
|
||||
# Create new rule id
|
||||
rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
|
||||
new_rule_id = rule_id_scope + "/" + new_room_id
|
||||
|
||||
# Change room id in each condition
|
||||
for condition in rule.get("conditions", []):
|
||||
if condition.get("key") == "room_id":
|
||||
condition["pattern"] = new_room_id
|
||||
|
||||
# Add the rule for the new room
|
||||
yield self.add_push_rule(
|
||||
user_id=user_id,
|
||||
rule_id=new_rule_id,
|
||||
priority_class=rule["priority_class"],
|
||||
conditions=rule["conditions"],
|
||||
actions=rule["actions"],
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def copy_push_rules_from_room_to_room_for_user(
|
||||
self, old_room_id, new_room_id, user_id
|
||||
):
|
||||
"""Copy all of the push rules from one room to another for a specific
|
||||
user.
|
||||
|
||||
Args:
|
||||
old_room_id (str): ID of the old room.
|
||||
new_room_id (str): ID of the new room.
|
||||
user_id (str): ID of user to copy push rules for.
|
||||
"""
|
||||
# Retrieve push rules for this user
|
||||
user_push_rules = yield self.get_push_rules_for_user(user_id)
|
||||
|
||||
# Get rules relating to the old room and copy them to the new room
|
||||
for rule in user_push_rules:
|
||||
conditions = rule.get("conditions", [])
|
||||
if any(
|
||||
(c.get("key") == "room_id" and c.get("pattern") == old_room_id)
|
||||
for c in conditions
|
||||
):
|
||||
yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bulk_get_push_rules_for_room(self, event, context):
|
||||
state_group = context.state_group
|
||||
if not state_group:
|
||||
# If state_group is None it means it has yet to be assigned a
|
||||
# state group, i.e. we need to make sure that calls with a state_group
|
||||
# of None don't hit previous cached calls with a None state_group.
|
||||
# To do this we set the state_group to a new object as object() != object()
|
||||
state_group = object()
|
||||
|
||||
current_state_ids = yield context.get_current_state_ids(self)
|
||||
result = yield self._bulk_get_push_rules_for_room(
|
||||
event.room_id, state_group, current_state_ids, event=event
|
||||
)
|
||||
return result
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, cache_context=True)
|
||||
def _bulk_get_push_rules_for_room(
|
||||
self, room_id, state_group, current_state_ids, cache_context, event=None
|
||||
):
|
||||
# We don't use `state_group`, its there so that we can cache based
|
||||
# on it. However, its important that its never None, since two current_state's
|
||||
# with a state_group of None are likely to be different.
|
||||
# See bulk_get_push_rules_for_room for how we work around this.
|
||||
assert state_group is not None
|
||||
|
||||
# We also will want to generate notifs for other people in the room so
|
||||
# their unread countss are correct in the event stream, but to avoid
|
||||
# generating them for bot / AS users etc, we only do so for people who've
|
||||
# sent a read receipt into the room.
|
||||
|
||||
users_in_room = yield self._get_joined_users_from_context(
|
||||
room_id,
|
||||
state_group,
|
||||
current_state_ids,
|
||||
on_invalidate=cache_context.invalidate,
|
||||
event=event,
|
||||
)
|
||||
|
||||
# We ignore app service users for now. This is so that we don't fill
|
||||
# up the `get_if_users_have_pushers` cache with AS entries that we
|
||||
# know don't have pushers, nor even read receipts.
|
||||
local_users_in_room = set(
|
||||
u
|
||||
for u in users_in_room
|
||||
if self.hs.is_mine_id(u)
|
||||
and not self.get_if_app_services_interested_in_user(u)
|
||||
)
|
||||
|
||||
# users in the room who have pushers need to get push rules run because
|
||||
# that's how their pushers work
|
||||
if_users_with_pushers = yield self.get_if_users_have_pushers(
|
||||
local_users_in_room, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
user_ids = set(
|
||||
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
|
||||
)
|
||||
|
||||
users_with_receipts = yield self.get_users_with_read_receipts_in_room(
|
||||
room_id, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
|
||||
# any users with pushers must be ours: they have pushers
|
||||
for uid in users_with_receipts:
|
||||
if uid in local_users_in_room:
|
||||
user_ids.add(uid)
|
||||
|
||||
rules_by_user = yield self.bulk_get_push_rules(
|
||||
user_ids, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
|
||||
rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
|
||||
|
||||
return rules_by_user
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="get_push_rules_enabled_for_user",
|
||||
list_name="user_ids",
|
||||
num_args=1,
|
||||
inlineCallbacks=True,
|
||||
)
|
||||
def bulk_get_push_rules_enabled(self, user_ids):
|
||||
if not user_ids:
|
||||
return {}
|
||||
|
||||
results = {user_id: {} for user_id in user_ids}
|
||||
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="push_rules_enable",
|
||||
column="user_name",
|
||||
iterable=user_ids,
|
||||
retcols=("user_name", "rule_id", "enabled"),
|
||||
desc="bulk_get_push_rules_enabled",
|
||||
)
|
||||
for row in rows:
|
||||
enabled = bool(row["enabled"])
|
||||
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
|
||||
return results
|
||||
|
||||
|
||||
class PushRuleStore(PushRulesWorkerStore):
|
||||
@defer.inlineCallbacks
|
||||
def add_push_rule(
|
||||
self,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
conditions,
|
||||
actions,
|
||||
before=None,
|
||||
after=None,
|
||||
):
|
||||
conditions_json = json.dumps(conditions)
|
||||
actions_json = json.dumps(actions)
|
||||
with self._push_rules_stream_id_gen.get_next() as ids:
|
||||
stream_id, event_stream_ordering = ids
|
||||
if before or after:
|
||||
yield self.runInteraction(
|
||||
"_add_push_rule_relative_txn",
|
||||
self._add_push_rule_relative_txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
conditions_json,
|
||||
actions_json,
|
||||
before,
|
||||
after,
|
||||
)
|
||||
else:
|
||||
yield self.runInteraction(
|
||||
"_add_push_rule_highest_priority_txn",
|
||||
self._add_push_rule_highest_priority_txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
conditions_json,
|
||||
actions_json,
|
||||
)
|
||||
|
||||
def _add_push_rule_relative_txn(
|
||||
self,
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
conditions_json,
|
||||
actions_json,
|
||||
before,
|
||||
after,
|
||||
):
|
||||
# Lock the table since otherwise we'll have annoying races between the
|
||||
# SELECT here and the UPSERT below.
|
||||
self.database_engine.lock_table(txn, "push_rules")
|
||||
|
||||
relative_to_rule = before or after
|
||||
|
||||
res = self._simple_select_one_txn(
|
||||
txn,
|
||||
table="push_rules",
|
||||
keyvalues={"user_name": user_id, "rule_id": relative_to_rule},
|
||||
retcols=["priority_class", "priority"],
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if not res:
|
||||
raise RuleNotFoundException(
|
||||
"before/after rule not found: %s" % (relative_to_rule,)
|
||||
)
|
||||
|
||||
base_priority_class = res["priority_class"]
|
||||
base_rule_priority = res["priority"]
|
||||
|
||||
if base_priority_class != priority_class:
|
||||
raise InconsistentRuleException(
|
||||
"Given priority class does not match class of relative rule"
|
||||
)
|
||||
|
||||
if before:
|
||||
# Higher priority rules are executed first, So adding a rule before
|
||||
# a rule means giving it a higher priority than that rule.
|
||||
new_rule_priority = base_rule_priority + 1
|
||||
else:
|
||||
# We increment the priority of the existing rules to make space for
|
||||
# the new rule. Therefore if we want this rule to appear after
|
||||
# an existing rule we give it the priority of the existing rule,
|
||||
# and then increment the priority of the existing rule.
|
||||
new_rule_priority = base_rule_priority
|
||||
|
||||
sql = (
|
||||
"UPDATE push_rules SET priority = priority + 1"
|
||||
" WHERE user_name = ? AND priority_class = ? AND priority >= ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (user_id, priority_class, new_rule_priority))
|
||||
|
||||
self._upsert_push_rule_txn(
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
new_rule_priority,
|
||||
conditions_json,
|
||||
actions_json,
|
||||
)
|
||||
|
||||
def _add_push_rule_highest_priority_txn(
|
||||
self,
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
conditions_json,
|
||||
actions_json,
|
||||
):
|
||||
# Lock the table since otherwise we'll have annoying races between the
|
||||
# SELECT here and the UPSERT below.
|
||||
self.database_engine.lock_table(txn, "push_rules")
|
||||
|
||||
# find the highest priority rule in that class
|
||||
sql = (
|
||||
"SELECT COUNT(*), MAX(priority) FROM push_rules"
|
||||
" WHERE user_name = ? and priority_class = ?"
|
||||
)
|
||||
txn.execute(sql, (user_id, priority_class))
|
||||
res = txn.fetchall()
|
||||
(how_many, highest_prio) = res[0]
|
||||
|
||||
new_prio = 0
|
||||
if how_many > 0:
|
||||
new_prio = highest_prio + 1
|
||||
|
||||
self._upsert_push_rule_txn(
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
new_prio,
|
||||
conditions_json,
|
||||
actions_json,
|
||||
)
|
||||
|
||||
def _upsert_push_rule_txn(
|
||||
self,
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
priority,
|
||||
conditions_json,
|
||||
actions_json,
|
||||
update_stream=True,
|
||||
):
|
||||
"""Specialised version of _simple_upsert_txn that picks a push_rule_id
|
||||
using the _push_rule_id_gen if it needs to insert the rule. It assumes
|
||||
that the "push_rules" table is locked"""
|
||||
|
||||
sql = (
|
||||
"UPDATE push_rules"
|
||||
" SET priority_class = ?, priority = ?, conditions = ?, actions = ?"
|
||||
" WHERE user_name = ? AND rule_id = ?"
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
(priority_class, priority, conditions_json, actions_json, user_id, rule_id),
|
||||
)
|
||||
|
||||
if txn.rowcount == 0:
|
||||
# We didn't update a row with the given rule_id so insert one
|
||||
push_rule_id = self._push_rule_id_gen.get_next()
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="push_rules",
|
||||
values={
|
||||
"id": push_rule_id,
|
||||
"user_name": user_id,
|
||||
"rule_id": rule_id,
|
||||
"priority_class": priority_class,
|
||||
"priority": priority,
|
||||
"conditions": conditions_json,
|
||||
"actions": actions_json,
|
||||
},
|
||||
)
|
||||
|
||||
if update_stream:
|
||||
self._insert_push_rules_update_txn(
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
op="ADD",
|
||||
data={
|
||||
"priority_class": priority_class,
|
||||
"priority": priority,
|
||||
"conditions": conditions_json,
|
||||
"actions": actions_json,
|
||||
},
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_push_rule(self, user_id, rule_id):
|
||||
"""
|
||||
Delete a push rule. Args specify the row to be deleted and can be
|
||||
any of the columns in the push_rule table, but below are the
|
||||
standard ones
|
||||
|
||||
Args:
|
||||
user_id (str): The matrix ID of the push rule owner
|
||||
rule_id (str): The rule_id of the rule to be deleted
|
||||
"""
|
||||
|
||||
def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
|
||||
self._simple_delete_one_txn(
|
||||
txn, "push_rules", {"user_name": user_id, "rule_id": rule_id}
|
||||
)
|
||||
|
||||
self._insert_push_rules_update_txn(
|
||||
txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE"
|
||||
)
|
||||
|
||||
with self._push_rules_stream_id_gen.get_next() as ids:
|
||||
stream_id, event_stream_ordering = ids
|
||||
yield self.runInteraction(
|
||||
"delete_push_rule",
|
||||
delete_push_rule_txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_push_rule_enabled(self, user_id, rule_id, enabled):
|
||||
with self._push_rules_stream_id_gen.get_next() as ids:
|
||||
stream_id, event_stream_ordering = ids
|
||||
yield self.runInteraction(
|
||||
"_set_push_rule_enabled_txn",
|
||||
self._set_push_rule_enabled_txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
enabled,
|
||||
)
|
||||
|
||||
def _set_push_rule_enabled_txn(
|
||||
self, txn, stream_id, event_stream_ordering, user_id, rule_id, enabled
|
||||
):
|
||||
new_id = self._push_rules_enable_id_gen.get_next()
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
"push_rules_enable",
|
||||
{"user_name": user_id, "rule_id": rule_id},
|
||||
{"enabled": 1 if enabled else 0},
|
||||
{"id": new_id},
|
||||
)
|
||||
|
||||
self._insert_push_rules_update_txn(
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
op="ENABLE" if enabled else "DISABLE",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule):
|
||||
actions_json = json.dumps(actions)
|
||||
|
||||
def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
|
||||
if is_default_rule:
|
||||
# Add a dummy rule to the rules table with the user specified
|
||||
# actions.
|
||||
priority_class = -1
|
||||
priority = 1
|
||||
self._upsert_push_rule_txn(
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
priority_class,
|
||||
priority,
|
||||
"[]",
|
||||
actions_json,
|
||||
update_stream=False,
|
||||
)
|
||||
else:
|
||||
self._simple_update_one_txn(
|
||||
txn,
|
||||
"push_rules",
|
||||
{"user_name": user_id, "rule_id": rule_id},
|
||||
{"actions": actions_json},
|
||||
)
|
||||
|
||||
self._insert_push_rules_update_txn(
|
||||
txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
user_id,
|
||||
rule_id,
|
||||
op="ACTIONS",
|
||||
data={"actions": actions_json},
|
||||
)
|
||||
|
||||
with self._push_rules_stream_id_gen.get_next() as ids:
|
||||
stream_id, event_stream_ordering = ids
|
||||
yield self.runInteraction(
|
||||
"set_push_rule_actions",
|
||||
set_push_rule_actions_txn,
|
||||
stream_id,
|
||||
event_stream_ordering,
|
||||
)
|
||||
|
||||
def _insert_push_rules_update_txn(
|
||||
self, txn, stream_id, event_stream_ordering, user_id, rule_id, op, data=None
|
||||
):
|
||||
values = {
|
||||
"stream_id": stream_id,
|
||||
"event_stream_ordering": event_stream_ordering,
|
||||
"user_id": user_id,
|
||||
"rule_id": rule_id,
|
||||
"op": op,
|
||||
}
|
||||
if data is not None:
|
||||
values.update(data)
|
||||
|
||||
self._simple_insert_txn(txn, "push_rules_stream", values=values)
|
||||
|
||||
txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,))
|
||||
txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,))
|
||||
txn.call_after(
|
||||
self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
|
||||
)
|
||||
|
||||
def get_all_push_rule_updates(self, last_id, current_id, limit):
|
||||
"""Get all the push rules changes that have happend on the server"""
|
||||
if last_id == current_id:
|
||||
return defer.succeed([])
|
||||
|
||||
def get_all_push_rule_updates_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_id, event_stream_ordering, user_id, rule_id,"
|
||||
" op, priority_class, priority, conditions, actions"
|
||||
" FROM push_rules_stream"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id, limit))
|
||||
return txn.fetchall()
|
||||
|
||||
return self.runInteraction(
|
||||
"get_all_push_rule_updates", get_all_push_rule_updates_txn
|
||||
)
|
||||
|
||||
def get_push_rules_stream_token(self):
|
||||
"""Get the position of the push rules stream.
|
||||
Returns a pair of a stream id for the push_rules stream and the
|
||||
room stream ordering it corresponds to."""
|
||||
return self._push_rules_stream_id_gen.get_current_token()
|
||||
|
||||
def get_max_push_rules_stream_id(self):
|
||||
return self.get_push_rules_stream_token()[0]
|
|
@ -22,10 +22,9 @@ from canonicaljson import encode_canonical_json, json
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if six.PY2:
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
import logging
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -0,0 +1,385 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import RelationTypes
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.data_stores.main.stream import generate_pagination_where_clause
|
||||
from synapse.storage.relations import (
|
||||
AggregationPaginationToken,
|
||||
PaginationChunk,
|
||||
RelationPaginationToken,
|
||||
)
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RelationsWorkerStore(SQLBaseStore):
|
||||
@cached(tree=True)
|
||||
def get_relations_for_event(
|
||||
self,
|
||||
event_id,
|
||||
relation_type=None,
|
||||
event_type=None,
|
||||
aggregation_key=None,
|
||||
limit=5,
|
||||
direction="b",
|
||||
from_token=None,
|
||||
to_token=None,
|
||||
):
|
||||
"""Get a list of relations for an event, ordered by topological ordering.
|
||||
|
||||
Args:
|
||||
event_id (str): Fetch events that relate to this event ID.
|
||||
relation_type (str|None): Only fetch events with this relation
|
||||
type, if given.
|
||||
event_type (str|None): Only fetch events with this event type, if
|
||||
given.
|
||||
aggregation_key (str|None): Only fetch events with this aggregation
|
||||
key, if given.
|
||||
limit (int): Only fetch the most recent `limit` events.
|
||||
direction (str): Whether to fetch the most recent first (`"b"`) or
|
||||
the oldest first (`"f"`).
|
||||
from_token (RelationPaginationToken|None): Fetch rows from the given
|
||||
token, or from the start if None.
|
||||
to_token (RelationPaginationToken|None): Fetch rows up to the given
|
||||
token, or up to the end if None.
|
||||
|
||||
Returns:
|
||||
Deferred[PaginationChunk]: List of event IDs that match relations
|
||||
requested. The rows are of the form `{"event_id": "..."}`.
|
||||
"""
|
||||
|
||||
where_clause = ["relates_to_id = ?"]
|
||||
where_args = [event_id]
|
||||
|
||||
if relation_type is not None:
|
||||
where_clause.append("relation_type = ?")
|
||||
where_args.append(relation_type)
|
||||
|
||||
if event_type is not None:
|
||||
where_clause.append("type = ?")
|
||||
where_args.append(event_type)
|
||||
|
||||
if aggregation_key:
|
||||
where_clause.append("aggregation_key = ?")
|
||||
where_args.append(aggregation_key)
|
||||
|
||||
pagination_clause = generate_pagination_where_clause(
|
||||
direction=direction,
|
||||
column_names=("topological_ordering", "stream_ordering"),
|
||||
from_token=attr.astuple(from_token) if from_token else None,
|
||||
to_token=attr.astuple(to_token) if to_token else None,
|
||||
engine=self.database_engine,
|
||||
)
|
||||
|
||||
if pagination_clause:
|
||||
where_clause.append(pagination_clause)
|
||||
|
||||
if direction == "b":
|
||||
order = "DESC"
|
||||
else:
|
||||
order = "ASC"
|
||||
|
||||
sql = """
|
||||
SELECT event_id, topological_ordering, stream_ordering
|
||||
FROM event_relations
|
||||
INNER JOIN events USING (event_id)
|
||||
WHERE %s
|
||||
ORDER BY topological_ordering %s, stream_ordering %s
|
||||
LIMIT ?
|
||||
""" % (
|
||||
" AND ".join(where_clause),
|
||||
order,
|
||||
order,
|
||||
)
|
||||
|
||||
def _get_recent_references_for_event_txn(txn):
|
||||
txn.execute(sql, where_args + [limit + 1])
|
||||
|
||||
last_topo_id = None
|
||||
last_stream_id = None
|
||||
events = []
|
||||
for row in txn:
|
||||
events.append({"event_id": row[0]})
|
||||
last_topo_id = row[1]
|
||||
last_stream_id = row[2]
|
||||
|
||||
next_batch = None
|
||||
if len(events) > limit and last_topo_id and last_stream_id:
|
||||
next_batch = RelationPaginationToken(last_topo_id, last_stream_id)
|
||||
|
||||
return PaginationChunk(
|
||||
chunk=list(events[:limit]), next_batch=next_batch, prev_batch=from_token
|
||||
)
|
||||
|
||||
return self.runInteraction(
|
||||
"get_recent_references_for_event", _get_recent_references_for_event_txn
|
||||
)
|
||||
|
||||
@cached(tree=True)
|
||||
def get_aggregation_groups_for_event(
|
||||
self,
|
||||
event_id,
|
||||
event_type=None,
|
||||
limit=5,
|
||||
direction="b",
|
||||
from_token=None,
|
||||
to_token=None,
|
||||
):
|
||||
"""Get a list of annotations on the event, grouped by event type and
|
||||
aggregation key, sorted by count.
|
||||
|
||||
This is used e.g. to get the what and how many reactions have happend
|
||||
on an event.
|
||||
|
||||
Args:
|
||||
event_id (str): Fetch events that relate to this event ID.
|
||||
event_type (str|None): Only fetch events with this event type, if
|
||||
given.
|
||||
limit (int): Only fetch the `limit` groups.
|
||||
direction (str): Whether to fetch the highest count first (`"b"`) or
|
||||
the lowest count first (`"f"`).
|
||||
from_token (AggregationPaginationToken|None): Fetch rows from the
|
||||
given token, or from the start if None.
|
||||
to_token (AggregationPaginationToken|None): Fetch rows up to the
|
||||
given token, or up to the end if None.
|
||||
|
||||
|
||||
Returns:
|
||||
Deferred[PaginationChunk]: List of groups of annotations that
|
||||
match. Each row is a dict with `type`, `key` and `count` fields.
|
||||
"""
|
||||
|
||||
where_clause = ["relates_to_id = ?", "relation_type = ?"]
|
||||
where_args = [event_id, RelationTypes.ANNOTATION]
|
||||
|
||||
if event_type:
|
||||
where_clause.append("type = ?")
|
||||
where_args.append(event_type)
|
||||
|
||||
having_clause = generate_pagination_where_clause(
|
||||
direction=direction,
|
||||
column_names=("COUNT(*)", "MAX(stream_ordering)"),
|
||||
from_token=attr.astuple(from_token) if from_token else None,
|
||||
to_token=attr.astuple(to_token) if to_token else None,
|
||||
engine=self.database_engine,
|
||||
)
|
||||
|
||||
if direction == "b":
|
||||
order = "DESC"
|
||||
else:
|
||||
order = "ASC"
|
||||
|
||||
if having_clause:
|
||||
having_clause = "HAVING " + having_clause
|
||||
else:
|
||||
having_clause = ""
|
||||
|
||||
sql = """
|
||||
SELECT type, aggregation_key, COUNT(DISTINCT sender), MAX(stream_ordering)
|
||||
FROM event_relations
|
||||
INNER JOIN events USING (event_id)
|
||||
WHERE {where_clause}
|
||||
GROUP BY relation_type, type, aggregation_key
|
||||
{having_clause}
|
||||
ORDER BY COUNT(*) {order}, MAX(stream_ordering) {order}
|
||||
LIMIT ?
|
||||
""".format(
|
||||
where_clause=" AND ".join(where_clause),
|
||||
order=order,
|
||||
having_clause=having_clause,
|
||||
)
|
||||
|
||||
def _get_aggregation_groups_for_event_txn(txn):
|
||||
txn.execute(sql, where_args + [limit + 1])
|
||||
|
||||
next_batch = None
|
||||
events = []
|
||||
for row in txn:
|
||||
events.append({"type": row[0], "key": row[1], "count": row[2]})
|
||||
next_batch = AggregationPaginationToken(row[2], row[3])
|
||||
|
||||
if len(events) <= limit:
|
||||
next_batch = None
|
||||
|
||||
return PaginationChunk(
|
||||
chunk=list(events[:limit]), next_batch=next_batch, prev_batch=from_token
|
||||
)
|
||||
|
||||
return self.runInteraction(
|
||||
"get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks()
|
||||
def get_applicable_edit(self, event_id):
|
||||
"""Get the most recent edit (if any) that has happened for the given
|
||||
event.
|
||||
|
||||
Correctly handles checking whether edits were allowed to happen.
|
||||
|
||||
Args:
|
||||
event_id (str): The original event ID
|
||||
|
||||
Returns:
|
||||
Deferred[EventBase|None]: Returns the most recent edit, if any.
|
||||
"""
|
||||
|
||||
# We only allow edits for `m.room.message` events that have the same sender
|
||||
# and event type. We can't assert these things during regular event auth so
|
||||
# we have to do the checks post hoc.
|
||||
|
||||
# Fetches latest edit that has the same type and sender as the
|
||||
# original, and is an `m.room.message`.
|
||||
sql = """
|
||||
SELECT edit.event_id FROM events AS edit
|
||||
INNER JOIN event_relations USING (event_id)
|
||||
INNER JOIN events AS original ON
|
||||
original.event_id = relates_to_id
|
||||
AND edit.type = original.type
|
||||
AND edit.sender = original.sender
|
||||
WHERE
|
||||
relates_to_id = ?
|
||||
AND relation_type = ?
|
||||
AND edit.type = 'm.room.message'
|
||||
ORDER by edit.origin_server_ts DESC, edit.event_id DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
def _get_applicable_edit_txn(txn):
|
||||
txn.execute(sql, (event_id, RelationTypes.REPLACE))
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
|
||||
edit_id = yield self.runInteraction(
|
||||
"get_applicable_edit", _get_applicable_edit_txn
|
||||
)
|
||||
|
||||
if not edit_id:
|
||||
return
|
||||
|
||||
edit_event = yield self.get_event(edit_id, allow_none=True)
|
||||
return edit_event
|
||||
|
||||
def has_user_annotated_event(self, parent_id, event_type, aggregation_key, sender):
|
||||
"""Check if a user has already annotated an event with the same key
|
||||
(e.g. already liked an event).
|
||||
|
||||
Args:
|
||||
parent_id (str): The event being annotated
|
||||
event_type (str): The event type of the annotation
|
||||
aggregation_key (str): The aggregation key of the annotation
|
||||
sender (str): The sender of the annotation
|
||||
|
||||
Returns:
|
||||
Deferred[bool]
|
||||
"""
|
||||
|
||||
sql = """
|
||||
SELECT 1 FROM event_relations
|
||||
INNER JOIN events USING (event_id)
|
||||
WHERE
|
||||
relates_to_id = ?
|
||||
AND relation_type = ?
|
||||
AND type = ?
|
||||
AND sender = ?
|
||||
AND aggregation_key = ?
|
||||
LIMIT 1;
|
||||
"""
|
||||
|
||||
def _get_if_user_has_annotated_event(txn):
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
parent_id,
|
||||
RelationTypes.ANNOTATION,
|
||||
event_type,
|
||||
sender,
|
||||
aggregation_key,
|
||||
),
|
||||
)
|
||||
|
||||
return bool(txn.fetchone())
|
||||
|
||||
return self.runInteraction(
|
||||
"get_if_user_has_annotated_event", _get_if_user_has_annotated_event
|
||||
)
|
||||
|
||||
|
||||
class RelationsStore(RelationsWorkerStore):
|
||||
def _handle_event_relations(self, txn, event):
|
||||
"""Handles inserting relation data during peristence of events
|
||||
|
||||
Args:
|
||||
txn
|
||||
event (EventBase)
|
||||
"""
|
||||
relation = event.content.get("m.relates_to")
|
||||
if not relation:
|
||||
# No relations
|
||||
return
|
||||
|
||||
rel_type = relation.get("rel_type")
|
||||
if rel_type not in (
|
||||
RelationTypes.ANNOTATION,
|
||||
RelationTypes.REFERENCE,
|
||||
RelationTypes.REPLACE,
|
||||
):
|
||||
# Unknown relation type
|
||||
return
|
||||
|
||||
parent_id = relation.get("event_id")
|
||||
if not parent_id:
|
||||
# Invalid relation
|
||||
return
|
||||
|
||||
aggregation_key = relation.get("key")
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="event_relations",
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
"relates_to_id": parent_id,
|
||||
"relation_type": rel_type,
|
||||
"aggregation_key": aggregation_key,
|
||||
},
|
||||
)
|
||||
|
||||
txn.call_after(self.get_relations_for_event.invalidate_many, (parent_id,))
|
||||
txn.call_after(
|
||||
self.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
|
||||
)
|
||||
|
||||
if rel_type == RelationTypes.REPLACE:
|
||||
txn.call_after(self.get_applicable_edit.invalidate, (parent_id,))
|
||||
|
||||
def _handle_redaction(self, txn, redacted_event_id):
|
||||
"""Handles receiving a redaction and checking whether we need to remove
|
||||
any redacted relations from the database.
|
||||
|
||||
Args:
|
||||
txn
|
||||
redacted_event_id (str): The event that was redacted.
|
||||
"""
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
|
||||
)
|
|
@ -25,7 +25,7 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.search import SearchStore
|
||||
from synapse.storage.data_stores.main.search import SearchStore
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,21 @@
|
|||
/* Copyright 2015, 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS background_updates(
|
||||
update_name TEXT NOT NULL, -- The name of the background update.
|
||||
progress_json TEXT NOT NULL, -- The current progress of the update as JSON.
|
||||
CONSTRAINT background_updates_uniqueness UNIQUE (update_name)
|
||||
);
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue