Backgroud update to clean out rooms from current state (#6802)
							parent
							
								
									b660327056
								
							
						
					
					
						commit
						57ad702af0
					
				| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Add background update to clean out left rooms from current state.
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,19 @@
 | 
			
		|||
/* Copyright 2020 The Matrix.org Foundation C.I.C
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
-- Add background update to go and delete current state events for rooms the
 | 
			
		||||
-- server is no longer in.
 | 
			
		||||
INSERT into background_updates (update_name, progress_json)
 | 
			
		||||
    VALUES ('delete_old_current_state_events', '{}');
 | 
			
		||||
| 
						 | 
				
			
			@ -21,12 +21,13 @@ from six import iteritems
 | 
			
		|||
 | 
			
		||||
from twisted.internet import defer
 | 
			
		||||
 | 
			
		||||
from synapse.api.constants import EventTypes
 | 
			
		||||
from synapse.api.constants import EventTypes, Membership
 | 
			
		||||
from synapse.api.errors import NotFoundError
 | 
			
		||||
from synapse.events import EventBase
 | 
			
		||||
from synapse.events.snapshot import EventContext
 | 
			
		||||
from synapse.storage._base import SQLBaseStore
 | 
			
		||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 | 
			
		||||
from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
 | 
			
		||||
from synapse.storage.database import Database
 | 
			
		||||
from synapse.storage.state import StateFilter
 | 
			
		||||
from synapse.util.caches import intern_string
 | 
			
		||||
| 
						 | 
				
			
			@ -300,14 +301,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 | 
			
		|||
        return set(row["state_group"] for row in rows)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MainStateBackgroundUpdateStore(SQLBaseStore):
 | 
			
		||||
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 | 
			
		||||
 | 
			
		||||
    CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
 | 
			
		||||
    EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
 | 
			
		||||
    DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, database: Database, db_conn, hs):
 | 
			
		||||
        super(MainStateBackgroundUpdateStore, self).__init__(database, db_conn, hs)
 | 
			
		||||
 | 
			
		||||
        self.server_name = hs.hostname
 | 
			
		||||
 | 
			
		||||
        self.db.updates.register_background_index_update(
 | 
			
		||||
            self.CURRENT_STATE_INDEX_UPDATE_NAME,
 | 
			
		||||
            index_name="current_state_events_member_index",
 | 
			
		||||
| 
						 | 
				
			
			@ -321,6 +325,106 @@ class MainStateBackgroundUpdateStore(SQLBaseStore):
 | 
			
		|||
            table="event_to_state_groups",
 | 
			
		||||
            columns=["state_group"],
 | 
			
		||||
        )
 | 
			
		||||
        self.db.updates.register_background_update_handler(
 | 
			
		||||
            self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async def _background_remove_left_rooms(self, progress, batch_size):
 | 
			
		||||
        """Background update to delete rows from `current_state_events` and
 | 
			
		||||
        `event_forward_extremities` tables of rooms that the server is no
 | 
			
		||||
        longer joined to.
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        last_room_id = progress.get("last_room_id", "")
 | 
			
		||||
 | 
			
		||||
        def _background_remove_left_rooms_txn(txn):
 | 
			
		||||
            sql = """
 | 
			
		||||
                SELECT DISTINCT room_id FROM current_state_events
 | 
			
		||||
                WHERE room_id > ? ORDER BY room_id LIMIT ?
 | 
			
		||||
            """
 | 
			
		||||
 | 
			
		||||
            txn.execute(sql, (last_room_id, batch_size))
 | 
			
		||||
            room_ids = list(row[0] for row in txn)
 | 
			
		||||
            if not room_ids:
 | 
			
		||||
                return True, set()
 | 
			
		||||
 | 
			
		||||
            sql = """
 | 
			
		||||
                SELECT room_id
 | 
			
		||||
                FROM current_state_events
 | 
			
		||||
                WHERE
 | 
			
		||||
                    room_id > ? AND room_id <= ?
 | 
			
		||||
                    AND type = 'm.room.member'
 | 
			
		||||
                    AND membership = 'join'
 | 
			
		||||
                    AND state_key LIKE ?
 | 
			
		||||
                GROUP BY room_id
 | 
			
		||||
            """
 | 
			
		||||
 | 
			
		||||
            txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name))
 | 
			
		||||
 | 
			
		||||
            joined_room_ids = set(row[0] for row in txn)
 | 
			
		||||
 | 
			
		||||
            left_rooms = set(room_ids) - joined_room_ids
 | 
			
		||||
 | 
			
		||||
            # First we get all users that we still think were joined to the
 | 
			
		||||
            # room. This is so that we can mark those device lists as
 | 
			
		||||
            # potentially stale, since there may have been a period where the
 | 
			
		||||
            # server didn't share a room with the remote user and therefore may
 | 
			
		||||
            # have missed any device updates.
 | 
			
		||||
            rows = self.db.simple_select_many_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                table="current_state_events",
 | 
			
		||||
                column="room_id",
 | 
			
		||||
                iterable=left_rooms,
 | 
			
		||||
                keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN},
 | 
			
		||||
                retcols=("state_key",),
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            potentially_left_users = set(row["state_key"] for row in rows)
 | 
			
		||||
 | 
			
		||||
            # Now lets actually delete the rooms from the DB.
 | 
			
		||||
            self.db.simple_delete_many_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                table="current_state_events",
 | 
			
		||||
                column="room_id",
 | 
			
		||||
                iterable=left_rooms,
 | 
			
		||||
                keyvalues={},
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            self.db.simple_delete_many_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                table="event_forward_extremities",
 | 
			
		||||
                column="room_id",
 | 
			
		||||
                iterable=left_rooms,
 | 
			
		||||
                keyvalues={},
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            self.db.updates._background_update_progress_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                self.DELETE_CURRENT_STATE_UPDATE_NAME,
 | 
			
		||||
                {"last_room_id": room_ids[-1]},
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            return False, potentially_left_users
 | 
			
		||||
 | 
			
		||||
        finished, potentially_left_users = await self.db.runInteraction(
 | 
			
		||||
            "_background_remove_left_rooms", _background_remove_left_rooms_txn
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if finished:
 | 
			
		||||
            await self.db.updates._end_background_update(
 | 
			
		||||
                self.DELETE_CURRENT_STATE_UPDATE_NAME
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Now go and check if we still share a room with the remote users in
 | 
			
		||||
        # the deleted rooms. If not mark their device lists as stale.
 | 
			
		||||
        joined_users = await self.get_users_server_still_shares_room_with(
 | 
			
		||||
            potentially_left_users
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        for user_id in potentially_left_users - joined_users:
 | 
			
		||||
            await self.mark_remote_user_device_list_as_unsubscribed(user_id)
 | 
			
		||||
 | 
			
		||||
        return batch_size
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue