Merge pull request #1868 from matrix-org/erikj/replication_cache
Only invalidate membership caches based on the cache streampull/1871/head
						commit
						62f6b86ba7
					
				| 
						 | 
				
			
			@ -299,9 +299,6 @@ class ReplicationResource(Resource):
 | 
			
		|||
                "backward_ex_outliers", res.backward_ex_outliers,
 | 
			
		||||
                ("position", "event_id", "state_group"),
 | 
			
		||||
            )
 | 
			
		||||
            writer.write_header_and_rows(
 | 
			
		||||
                "state_resets", res.state_resets, ("position",),
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def presence(self, writer, current_token, request_streams):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore):
 | 
			
		|||
        return result
 | 
			
		||||
 | 
			
		||||
    def process_replication(self, result):
 | 
			
		||||
        state_resets = set(
 | 
			
		||||
            r[0] for r in result.get("state_resets", {"rows": []})["rows"]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        stream = result.get("events")
 | 
			
		||||
        if stream:
 | 
			
		||||
            self._stream_id_gen.advance(int(stream["position"]))
 | 
			
		||||
| 
						 | 
				
			
			@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore):
 | 
			
		|||
 | 
			
		||||
            for row in stream["rows"]:
 | 
			
		||||
                self._process_replication_row(
 | 
			
		||||
                    row, backfilled=False, state_resets=state_resets
 | 
			
		||||
                    row, backfilled=False,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        stream = result.get("backfill")
 | 
			
		||||
| 
						 | 
				
			
			@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore):
 | 
			
		|||
            self._backfill_id_gen.advance(-int(stream["position"]))
 | 
			
		||||
            for row in stream["rows"]:
 | 
			
		||||
                self._process_replication_row(
 | 
			
		||||
                    row, backfilled=True, state_resets=state_resets
 | 
			
		||||
                    row, backfilled=True,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        stream = result.get("forward_ex_outliers")
 | 
			
		||||
| 
						 | 
				
			
			@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore):
 | 
			
		|||
 | 
			
		||||
        return super(SlavedEventStore, self).process_replication(result)
 | 
			
		||||
 | 
			
		||||
    def _process_replication_row(self, row, backfilled, state_resets):
 | 
			
		||||
        position = row[0]
 | 
			
		||||
    def _process_replication_row(self, row, backfilled):
 | 
			
		||||
        internal = json.loads(row[1])
 | 
			
		||||
        event_json = json.loads(row[2])
 | 
			
		||||
        event = FrozenEvent(event_json, internal_metadata_dict=internal)
 | 
			
		||||
        self.invalidate_caches_for_event(
 | 
			
		||||
            event, backfilled, reset_state=position in state_resets
 | 
			
		||||
            event, backfilled,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def invalidate_caches_for_event(self, event, backfilled, reset_state):
 | 
			
		||||
        if reset_state:
 | 
			
		||||
            self.get_rooms_for_user.invalidate_all()
 | 
			
		||||
            self.get_users_in_room.invalidate((event.room_id,))
 | 
			
		||||
 | 
			
		||||
    def invalidate_caches_for_event(self, event, backfilled):
 | 
			
		||||
        self._invalidate_get_event_cache(event.event_id)
 | 
			
		||||
 | 
			
		||||
        self.get_latest_event_ids_in_room.invalidate((event.room_id,))
 | 
			
		||||
| 
						 | 
				
			
			@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore):
 | 
			
		|||
            self._invalidate_get_event_cache(event.redacts)
 | 
			
		||||
 | 
			
		||||
        if event.type == EventTypes.Member:
 | 
			
		||||
            self.get_rooms_for_user.invalidate((event.state_key,))
 | 
			
		||||
            self.get_users_in_room.invalidate((event.room_id,))
 | 
			
		||||
            self._membership_stream_cache.entity_has_changed(
 | 
			
		||||
                event.state_key, event.internal_metadata.stream_ordering
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -572,14 +572,6 @@ class EventsStore(SQLBaseStore):
 | 
			
		|||
                    txn, self.get_users_in_room, (room_id,)
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # Add an entry to the current_state_resets table to record the point
 | 
			
		||||
                # where we clobbered the current state
 | 
			
		||||
                self._simple_insert_txn(
 | 
			
		||||
                    txn,
 | 
			
		||||
                    table="current_state_resets",
 | 
			
		||||
                    values={"event_stream_ordering": max_stream_order}
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        for room_id, new_extrem in new_forward_extremeties.items():
 | 
			
		||||
            self._simple_delete_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
| 
						 | 
				
			
			@ -1610,15 +1602,6 @@ class EventsStore(SQLBaseStore):
 | 
			
		|||
                else:
 | 
			
		||||
                    upper_bound = current_forward_id
 | 
			
		||||
 | 
			
		||||
                sql = (
 | 
			
		||||
                    "SELECT event_stream_ordering FROM current_state_resets"
 | 
			
		||||
                    " WHERE ? < event_stream_ordering"
 | 
			
		||||
                    " AND event_stream_ordering <= ?"
 | 
			
		||||
                    " ORDER BY event_stream_ordering ASC"
 | 
			
		||||
                )
 | 
			
		||||
                txn.execute(sql, (last_forward_id, upper_bound))
 | 
			
		||||
                state_resets = txn.fetchall()
 | 
			
		||||
 | 
			
		||||
                sql = (
 | 
			
		||||
                    "SELECT event_stream_ordering, event_id, state_group"
 | 
			
		||||
                    " FROM ex_outlier_stream"
 | 
			
		||||
| 
						 | 
				
			
			@ -1630,7 +1613,6 @@ class EventsStore(SQLBaseStore):
 | 
			
		|||
                forward_ex_outliers = txn.fetchall()
 | 
			
		||||
            else:
 | 
			
		||||
                new_forward_events = []
 | 
			
		||||
                state_resets = []
 | 
			
		||||
                forward_ex_outliers = []
 | 
			
		||||
 | 
			
		||||
            sql = (
 | 
			
		||||
| 
						 | 
				
			
			@ -1670,7 +1652,6 @@ class EventsStore(SQLBaseStore):
 | 
			
		|||
            return AllNewEventsResult(
 | 
			
		||||
                new_forward_events, new_backfill_events,
 | 
			
		||||
                forward_ex_outliers, backward_ex_outliers,
 | 
			
		||||
                state_resets,
 | 
			
		||||
            )
 | 
			
		||||
        return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1896,5 +1877,4 @@ class EventsStore(SQLBaseStore):
 | 
			
		|||
AllNewEventsResult = namedtuple("AllNewEventsResult", [
 | 
			
		||||
    "new_forward_events", "new_backfill_events",
 | 
			
		||||
    "forward_ex_outliers", "backward_ex_outliers",
 | 
			
		||||
    "state_resets"
 | 
			
		||||
])
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore):
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
        for event in events:
 | 
			
		||||
            txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
 | 
			
		||||
            txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
 | 
			
		||||
            txn.call_after(
 | 
			
		||||
                self._membership_stream_cache.entity_has_changed,
 | 
			
		||||
                event.state_key, event.internal_metadata.stream_ordering
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,49 +58,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
 | 
			
		|||
    def tearDown(self):
 | 
			
		||||
        [unpatch() for unpatch in self.unpatches]
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def test_room_members(self):
 | 
			
		||||
        yield self.persist(type="m.room.create", key="", creator=USER_ID)
 | 
			
		||||
        yield self.replicate()
 | 
			
		||||
        yield self.check("get_rooms_for_user", (USER_ID,), [])
 | 
			
		||||
        yield self.check("get_users_in_room", (ROOM_ID,), [])
 | 
			
		||||
 | 
			
		||||
        # Join the room.
 | 
			
		||||
        join = yield self.persist(type="m.room.member", key=USER_ID, membership="join")
 | 
			
		||||
        yield self.replicate()
 | 
			
		||||
        yield self.check("get_rooms_for_user", (USER_ID,), [RoomsForUser(
 | 
			
		||||
            room_id=ROOM_ID,
 | 
			
		||||
            sender=USER_ID,
 | 
			
		||||
            membership="join",
 | 
			
		||||
            event_id=join.event_id,
 | 
			
		||||
            stream_ordering=join.internal_metadata.stream_ordering,
 | 
			
		||||
        )])
 | 
			
		||||
        yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID])
 | 
			
		||||
 | 
			
		||||
        # Leave the room.
 | 
			
		||||
        yield self.persist(type="m.room.member", key=USER_ID, membership="leave")
 | 
			
		||||
        yield self.replicate()
 | 
			
		||||
        yield self.check("get_rooms_for_user", (USER_ID,), [])
 | 
			
		||||
        yield self.check("get_users_in_room", (ROOM_ID,), [])
 | 
			
		||||
 | 
			
		||||
        # Add some other user to the room.
 | 
			
		||||
        join = yield self.persist(type="m.room.member", key=USER_ID_2, membership="join")
 | 
			
		||||
        yield self.replicate()
 | 
			
		||||
        yield self.check("get_rooms_for_user", (USER_ID_2,), [RoomsForUser(
 | 
			
		||||
            room_id=ROOM_ID,
 | 
			
		||||
            sender=USER_ID,
 | 
			
		||||
            membership="join",
 | 
			
		||||
            event_id=join.event_id,
 | 
			
		||||
            stream_ordering=join.internal_metadata.stream_ordering,
 | 
			
		||||
        )])
 | 
			
		||||
        yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2])
 | 
			
		||||
 | 
			
		||||
        yield self.persist(
 | 
			
		||||
            type="m.room.member", key=USER_ID, membership="join",
 | 
			
		||||
        )
 | 
			
		||||
        yield self.replicate()
 | 
			
		||||
        yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2, USER_ID])
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def test_get_latest_event_ids_in_room(self):
 | 
			
		||||
        create = yield self.persist(type="m.room.create", key="", creator=USER_ID)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue