188 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			188 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
| # Copyright 2022 The Matrix.org Foundation C.I.C.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import logging
 | |
| from collections import defaultdict
 | |
| from typing import Collection, Dict, Set
 | |
| 
 | |
| from twisted.internet import defer
 | |
| from twisted.internet.defer import Deferred
 | |
| 
 | |
| from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
 | |
| from synapse.logging.opentracing import trace_with_opname
 | |
| from synapse.storage.databases.main.events_worker import EventsWorkerStore
 | |
| from synapse.storage.databases.main.room import RoomWorkerStore
 | |
| from synapse.util import unwrapFirstError
 | |
| from synapse.util.cancellation import cancellable
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class PartialStateEventsTracker:
 | |
|     """Keeps track of which events have partial state, after a partial-state join"""
 | |
| 
 | |
|     def __init__(self, store: EventsWorkerStore):
 | |
|         self._store = store
 | |
|         # a map from event id to a set of Deferreds which are waiting for that event to be
 | |
|         # un-partial-stated.
 | |
|         self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)
 | |
| 
 | |
|     def notify_un_partial_stated(self, event_id: str) -> None:
 | |
|         """Notify that we now have full state for a given event
 | |
| 
 | |
|         Called by the state-resynchronization loop whenever we resynchronize the state
 | |
|         for a particular event. Unblocks any callers to await_full_state() for that
 | |
|         event.
 | |
| 
 | |
|         Args:
 | |
|             event_id: the event that now has full state.
 | |
|         """
 | |
|         observers = self._observers.pop(event_id, None)
 | |
|         if not observers:
 | |
|             return
 | |
|         logger.info(
 | |
|             "Notifying %i things waiting for un-partial-stating of event %s",
 | |
|             len(observers),
 | |
|             event_id,
 | |
|         )
 | |
|         with PreserveLoggingContext():
 | |
|             for o in observers:
 | |
|                 o.callback(None)
 | |
| 
 | |
|     @trace_with_opname("PartialStateEventsTracker.await_full_state")
 | |
|     @cancellable
 | |
|     async def await_full_state(self, event_ids: Collection[str]) -> None:
 | |
|         """Wait for all the given events to have full state.
 | |
| 
 | |
|         Args:
 | |
|             event_ids: the list of event ids that we want full state for
 | |
|         """
 | |
|         # first try the happy path: if there are no partial-state events, we can return
 | |
|         # quickly
 | |
|         partial_state_event_ids = [
 | |
|             ev
 | |
|             for ev, p in (await self._store.get_partial_state_events(event_ids)).items()
 | |
|             if p
 | |
|         ]
 | |
| 
 | |
|         if not partial_state_event_ids:
 | |
|             return
 | |
| 
 | |
|         logger.info(
 | |
|             "Awaiting un-partial-stating of events %s",
 | |
|             partial_state_event_ids,
 | |
|             stack_info=True,
 | |
|         )
 | |
| 
 | |
|         # create an observer for each lazy-joined event
 | |
|         observers: Dict[str, Deferred[None]] = {
 | |
|             event_id: Deferred() for event_id in partial_state_event_ids
 | |
|         }
 | |
|         for event_id, observer in observers.items():
 | |
|             self._observers[event_id].add(observer)
 | |
| 
 | |
|         try:
 | |
|             # some of them may have been un-lazy-joined between us checking the db and
 | |
|             # registering the observer, in which case we'd wait forever for the
 | |
|             # notification. Call back the observers now.
 | |
|             for event_id, partial in (
 | |
|                 await self._store.get_partial_state_events(observers.keys())
 | |
|             ).items():
 | |
|                 # there may have been a call to notify_un_partial_stated during the
 | |
|                 # db query, so the observers may already have been called.
 | |
|                 if not partial and not observers[event_id].called:
 | |
|                     observers[event_id].callback(None)
 | |
| 
 | |
|             await make_deferred_yieldable(
 | |
|                 defer.gatherResults(
 | |
|                     observers.values(),
 | |
|                     consumeErrors=True,
 | |
|                 )
 | |
|             ).addErrback(unwrapFirstError)
 | |
|             logger.info("Events %s all un-partial-stated", observers.keys())
 | |
|         finally:
 | |
|             # remove any observers we created. This should happen when the notification
 | |
|             # is received, but that might not happen for two reasons:
 | |
|             #   (a) we're bailing out early on an exception (including us being
 | |
|             #       cancelled during the await)
 | |
|             #   (b) the event got de-lazy-joined before we set up the observer.
 | |
|             for event_id, observer in observers.items():
 | |
|                 observer_set = self._observers.get(event_id)
 | |
|                 if observer_set:
 | |
|                     observer_set.discard(observer)
 | |
|                     if not observer_set:
 | |
|                         del self._observers[event_id]
 | |
| 
 | |
| 
 | |
| class PartialCurrentStateTracker:
 | |
|     """Keeps track of which rooms have partial state, after partial-state joins"""
 | |
| 
 | |
|     def __init__(self, store: RoomWorkerStore):
 | |
|         self._store = store
 | |
| 
 | |
|         # a map from room id to a set of Deferreds which are waiting for that room to be
 | |
|         # un-partial-stated.
 | |
|         self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)
 | |
| 
 | |
|     def notify_un_partial_stated(self, room_id: str) -> None:
 | |
|         """Notify that we now have full current state for a given room
 | |
| 
 | |
|         Unblocks any callers to await_full_state() for that room.
 | |
| 
 | |
|         Args:
 | |
|             room_id: the room that now has full current state.
 | |
|         """
 | |
|         observers = self._observers.pop(room_id, None)
 | |
|         if not observers:
 | |
|             return
 | |
|         logger.info(
 | |
|             "Notifying %i things waiting for un-partial-stating of room %s",
 | |
|             len(observers),
 | |
|             room_id,
 | |
|         )
 | |
|         with PreserveLoggingContext():
 | |
|             for o in observers:
 | |
|                 o.callback(None)
 | |
| 
 | |
|     @trace_with_opname("PartialCurrentStateTracker.await_full_state")
 | |
|     @cancellable
 | |
|     async def await_full_state(self, room_id: str) -> None:
 | |
|         # We add the deferred immediately so that the DB call to check for
 | |
|         # partial state doesn't race when we unpartial the room.
 | |
|         d: Deferred[None] = Deferred()
 | |
|         self._observers.setdefault(room_id, set()).add(d)
 | |
| 
 | |
|         try:
 | |
|             # Check if the room has partial current state or not.
 | |
|             has_partial_state = await self._store.is_partial_state_room(room_id)
 | |
|             if not has_partial_state:
 | |
|                 return
 | |
| 
 | |
|             logger.info(
 | |
|                 "Awaiting un-partial-stating of room %s",
 | |
|                 room_id,
 | |
|                 stack_info=True,
 | |
|             )
 | |
| 
 | |
|             await make_deferred_yieldable(d)
 | |
| 
 | |
|             logger.info("Room has un-partial-stated")
 | |
|         finally:
 | |
|             # Remove the added observer, and remove the room entry if its empty.
 | |
|             ds = self._observers.get(room_id)
 | |
|             if ds is not None:
 | |
|                 ds.discard(d)
 | |
|                 if not ds:
 | |
|                     self._observers.pop(room_id, None)
 |