226 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			226 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
# Copyright 2016 OpenMarket Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
from ._base import BaseSlavedStore
 | 
						|
from ._slaved_id_tracker import SlavedIdTracker
 | 
						|
 | 
						|
from synapse.api.constants import EventTypes
 | 
						|
from synapse.events import FrozenEvent
 | 
						|
from synapse.storage import DataStore
 | 
						|
from synapse.storage.room import RoomStore
 | 
						|
from synapse.storage.roommember import RoomMemberStore
 | 
						|
from synapse.storage.event_federation import EventFederationStore
 | 
						|
from synapse.storage.event_push_actions import EventPushActionsStore
 | 
						|
from synapse.storage.state import StateStore
 | 
						|
from synapse.util.caches.stream_change_cache import StreamChangeCache
 | 
						|
 | 
						|
import ujson as json
 | 
						|
 | 
						|
# So, um, we want to borrow a load of functions intended for reading from
 | 
						|
# a DataStore, but we don't want to take functions that either write to the
 | 
						|
# DataStore or are cached and don't have cache invalidation logic.
 | 
						|
#
 | 
						|
# Rather than write duplicate versions of those functions, or lift them to
 | 
						|
# a common base class, we going to grab the underlying __func__ object from
 | 
						|
# the method descriptor on the DataStore and chuck them into our class.
 | 
						|
 | 
						|
 | 
						|
class SlavedEventStore(BaseSlavedStore):
 | 
						|
 | 
						|
    def __init__(self, db_conn, hs):
 | 
						|
        super(SlavedEventStore, self).__init__(db_conn, hs)
 | 
						|
        self._stream_id_gen = SlavedIdTracker(
 | 
						|
            db_conn, "events", "stream_ordering",
 | 
						|
        )
 | 
						|
        self._backfill_id_gen = SlavedIdTracker(
 | 
						|
            db_conn, "events", "stream_ordering", step=-1
 | 
						|
        )
 | 
						|
        events_max = self._stream_id_gen.get_current_token()
 | 
						|
        event_cache_prefill, min_event_val = self._get_cache_dict(
 | 
						|
            db_conn, "events",
 | 
						|
            entity_column="room_id",
 | 
						|
            stream_column="stream_ordering",
 | 
						|
            max_value=events_max,
 | 
						|
        )
 | 
						|
        self._events_stream_cache = StreamChangeCache(
 | 
						|
            "EventsRoomStreamChangeCache", min_event_val,
 | 
						|
            prefilled_cache=event_cache_prefill,
 | 
						|
        )
 | 
						|
 | 
						|
    # Cached functions can't be accessed through a class instance so we need
 | 
						|
    # to reach inside the __dict__ to extract them.
 | 
						|
    get_room_name_and_aliases = RoomStore.__dict__["get_room_name_and_aliases"]
 | 
						|
    get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
 | 
						|
    get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
 | 
						|
    get_latest_event_ids_in_room = EventFederationStore.__dict__[
 | 
						|
        "get_latest_event_ids_in_room"
 | 
						|
    ]
 | 
						|
    _get_current_state_for_key = StateStore.__dict__[
 | 
						|
        "_get_current_state_for_key"
 | 
						|
    ]
 | 
						|
    get_invited_rooms_for_user = RoomMemberStore.__dict__[
 | 
						|
        "get_invited_rooms_for_user"
 | 
						|
    ]
 | 
						|
    get_unread_event_push_actions_by_room_for_user = (
 | 
						|
        EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
 | 
						|
    )
 | 
						|
 | 
						|
    get_unread_push_actions_for_user_in_range = (
 | 
						|
        DataStore.get_unread_push_actions_for_user_in_range.__func__
 | 
						|
    )
 | 
						|
    get_push_action_users_in_range = (
 | 
						|
        DataStore.get_push_action_users_in_range.__func__
 | 
						|
    )
 | 
						|
    get_event = DataStore.get_event.__func__
 | 
						|
    get_events = DataStore.get_events.__func__
 | 
						|
    get_current_state = DataStore.get_current_state.__func__
 | 
						|
    get_current_state_for_key = DataStore.get_current_state_for_key.__func__
 | 
						|
    get_rooms_for_user_where_membership_is = (
 | 
						|
        DataStore.get_rooms_for_user_where_membership_is.__func__
 | 
						|
    )
 | 
						|
    get_membership_changes_for_user = (
 | 
						|
        DataStore.get_membership_changes_for_user.__func__
 | 
						|
    )
 | 
						|
    get_room_events_max_id = DataStore.get_room_events_max_id.__func__
 | 
						|
    get_room_events_stream_for_room = (
 | 
						|
        DataStore.get_room_events_stream_for_room.__func__
 | 
						|
    )
 | 
						|
 | 
						|
    _set_before_and_after = DataStore._set_before_and_after
 | 
						|
 | 
						|
    _get_events = DataStore._get_events.__func__
 | 
						|
    _get_events_from_cache = DataStore._get_events_from_cache.__func__
 | 
						|
 | 
						|
    _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
 | 
						|
    _parse_events_txn = DataStore._parse_events_txn.__func__
 | 
						|
    _get_events_txn = DataStore._get_events_txn.__func__
 | 
						|
    _get_event_txn = DataStore._get_event_txn.__func__
 | 
						|
    _enqueue_events = DataStore._enqueue_events.__func__
 | 
						|
    _do_fetch = DataStore._do_fetch.__func__
 | 
						|
    _fetch_events_txn = DataStore._fetch_events_txn.__func__
 | 
						|
    _fetch_event_rows = DataStore._fetch_event_rows.__func__
 | 
						|
    _get_event_from_row = DataStore._get_event_from_row.__func__
 | 
						|
    _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
 | 
						|
    _get_rooms_for_user_where_membership_is_txn = (
 | 
						|
        DataStore._get_rooms_for_user_where_membership_is_txn.__func__
 | 
						|
    )
 | 
						|
    _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
 | 
						|
 | 
						|
    def stream_positions(self):
 | 
						|
        result = super(SlavedEventStore, self).stream_positions()
 | 
						|
        result["events"] = self._stream_id_gen.get_current_token()
 | 
						|
        result["backfill"] = -self._backfill_id_gen.get_current_token()
 | 
						|
        return result
 | 
						|
 | 
						|
    def process_replication(self, result):
 | 
						|
        state_resets = set(
 | 
						|
            r[0] for r in result.get("state_resets", {"rows": []})["rows"]
 | 
						|
        )
 | 
						|
 | 
						|
        stream = result.get("events")
 | 
						|
        if stream:
 | 
						|
            self._stream_id_gen.advance(stream["position"])
 | 
						|
            for row in stream["rows"]:
 | 
						|
                self._process_replication_row(
 | 
						|
                    row, backfilled=False, state_resets=state_resets
 | 
						|
                )
 | 
						|
 | 
						|
        stream = result.get("backfill")
 | 
						|
        if stream:
 | 
						|
            self._backfill_id_gen.advance(-stream["position"])
 | 
						|
            for row in stream["rows"]:
 | 
						|
                self._process_replication_row(
 | 
						|
                    row, backfilled=True, state_resets=state_resets
 | 
						|
                )
 | 
						|
 | 
						|
        stream = result.get("forward_ex_outliers")
 | 
						|
        if stream:
 | 
						|
            for row in stream["rows"]:
 | 
						|
                event_id = row[1]
 | 
						|
                self._invalidate_get_event_cache(event_id)
 | 
						|
 | 
						|
        stream = result.get("backward_ex_outliers")
 | 
						|
        if stream:
 | 
						|
            for row in stream["rows"]:
 | 
						|
                event_id = row[1]
 | 
						|
                self._invalidate_get_event_cache(event_id)
 | 
						|
 | 
						|
        return super(SlavedEventStore, self).process_replication(result)
 | 
						|
 | 
						|
    def _process_replication_row(self, row, backfilled, state_resets):
 | 
						|
        position = row[0]
 | 
						|
        internal = json.loads(row[1])
 | 
						|
        event_json = json.loads(row[2])
 | 
						|
        event = FrozenEvent(event_json, internal_metadata_dict=internal)
 | 
						|
        self.invalidate_caches_for_event(
 | 
						|
            event, backfilled, reset_state=position in state_resets
 | 
						|
        )
 | 
						|
 | 
						|
    def invalidate_caches_for_event(self, event, backfilled, reset_state):
 | 
						|
        if reset_state:
 | 
						|
            self._get_current_state_for_key.invalidate_all()
 | 
						|
            self.get_rooms_for_user.invalidate_all()
 | 
						|
            self.get_users_in_room.invalidate((event.room_id,))
 | 
						|
            # self.get_joined_hosts_for_room.invalidate((event.room_id,))
 | 
						|
            self.get_room_name_and_aliases.invalidate((event.room_id,))
 | 
						|
 | 
						|
        self._invalidate_get_event_cache(event.event_id)
 | 
						|
 | 
						|
        self.get_latest_event_ids_in_room.invalidate((event.room_id,))
 | 
						|
 | 
						|
        self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
 | 
						|
            (event.room_id,)
 | 
						|
        )
 | 
						|
 | 
						|
        if not backfilled:
 | 
						|
            self._events_stream_cache.entity_has_changed(
 | 
						|
                event.room_id, event.internal_metadata.stream_ordering
 | 
						|
            )
 | 
						|
 | 
						|
        # self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
 | 
						|
        #     (event.room_id,)
 | 
						|
        # )
 | 
						|
 | 
						|
        if event.type == EventTypes.Redaction:
 | 
						|
            self._invalidate_get_event_cache(event.redacts)
 | 
						|
 | 
						|
        if event.type == EventTypes.Member:
 | 
						|
            self.get_rooms_for_user.invalidate((event.state_key,))
 | 
						|
            # self.get_joined_hosts_for_room.invalidate((event.room_id,))
 | 
						|
            self.get_users_in_room.invalidate((event.room_id,))
 | 
						|
            # self._membership_stream_cache.entity_has_changed(
 | 
						|
            #    event.state_key, event.internal_metadata.stream_ordering
 | 
						|
            # )
 | 
						|
            self.get_invited_rooms_for_user.invalidate((event.state_key,))
 | 
						|
 | 
						|
        if not event.is_state():
 | 
						|
            return
 | 
						|
 | 
						|
        if backfilled:
 | 
						|
            return
 | 
						|
 | 
						|
        if (not event.internal_metadata.is_invite_from_remote()
 | 
						|
                and event.internal_metadata.is_outlier()):
 | 
						|
            return
 | 
						|
 | 
						|
        self._get_current_state_for_key.invalidate((
 | 
						|
            event.room_id, event.type, event.state_key
 | 
						|
        ))
 | 
						|
 | 
						|
        if event.type in [EventTypes.Name, EventTypes.Aliases]:
 | 
						|
            self.get_room_name_and_aliases.invalidate(
 | 
						|
                (event.room_id,)
 | 
						|
            )
 | 
						|
            pass
 |