509 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			509 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
| # Copyright 2014-2016 OpenMarket Ltd
 | |
| # Copyright 2021 The Matrix.org Foundation C.I.C.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| """A federation sender that forwards things to be sent across replication to
 | |
| a worker process.
 | |
| 
 | |
| It assumes there is a single worker process feeding off of it.
 | |
| 
 | |
| Each row in the replication stream consists of a type and some json, where the
 | |
| types indicate whether they are presence, or edus, etc.
 | |
| 
 | |
| Ephemeral or non-event data are queued up in-memory. When the worker requests
 | |
| updates since a particular point, all in-memory data since before that point is
 | |
| dropped. We also expire things in the queue after 5 minutes, to ensure that a
 | |
| dead worker doesn't cause the queues to grow limitlessly.
 | |
| 
 | |
| Events are replicated via a separate events stream.
 | |
| """
 | |
| 
 | |
| import logging
 | |
| from typing import (
 | |
|     TYPE_CHECKING,
 | |
|     Dict,
 | |
|     Hashable,
 | |
|     Iterable,
 | |
|     List,
 | |
|     Optional,
 | |
|     Sized,
 | |
|     Tuple,
 | |
|     Type,
 | |
| )
 | |
| 
 | |
| import attr
 | |
| from sortedcontainers import SortedDict
 | |
| 
 | |
| from synapse.api.presence import UserPresenceState
 | |
| from synapse.federation.sender import AbstractFederationSender, FederationSender
 | |
| from synapse.metrics import LaterGauge
 | |
| from synapse.replication.tcp.streams.federation import FederationStream
 | |
| from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
 | |
| from synapse.util.metrics import Measure
 | |
| 
 | |
| from .units import Edu
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from synapse.server import HomeServer
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class FederationRemoteSendQueue(AbstractFederationSender):
 | |
|     """A drop in replacement for FederationSender"""
 | |
| 
 | |
|     def __init__(self, hs: "HomeServer"):
 | |
|         self.server_name = hs.hostname
 | |
|         self.clock = hs.get_clock()
 | |
|         self.notifier = hs.get_notifier()
 | |
|         self.is_mine_id = hs.is_mine_id
 | |
| 
 | |
|         # We may have multiple federation sender instances, so we need to track
 | |
|         # their positions separately.
 | |
|         self._sender_instances = hs.config.worker.federation_shard_config.instances
 | |
|         self._sender_positions: Dict[str, int] = {}
 | |
| 
 | |
|         # Pending presence map user_id -> UserPresenceState
 | |
|         self.presence_map: Dict[str, UserPresenceState] = {}
 | |
| 
 | |
|         # Stores the destinations we need to explicitly send presence to about a
 | |
|         # given user.
 | |
|         # Stream position -> (user_id, destinations)
 | |
|         self.presence_destinations: SortedDict[
 | |
|             int, Tuple[str, Iterable[str]]
 | |
|         ] = SortedDict()
 | |
| 
 | |
|         # (destination, key) -> EDU
 | |
|         self.keyed_edu: Dict[Tuple[str, tuple], Edu] = {}
 | |
| 
 | |
|         # stream position -> (destination, key)
 | |
|         self.keyed_edu_changed: SortedDict[int, Tuple[str, tuple]] = SortedDict()
 | |
| 
 | |
|         self.edus: SortedDict[int, Edu] = SortedDict()
 | |
| 
 | |
|         # stream ID for the next entry into keyed_edu_changed/edus.
 | |
|         self.pos = 1
 | |
| 
 | |
|         # map from stream ID to the time that stream entry was generated, so that we
 | |
|         # can clear out entries after a while
 | |
|         self.pos_time: SortedDict[int, int] = SortedDict()
 | |
| 
 | |
|         # EVERYTHING IS SAD. In particular, python only makes new scopes when
 | |
|         # we make a new function, so we need to make a new function so the inner
 | |
|         # lambda binds to the queue rather than to the name of the queue which
 | |
|         # changes. ARGH.
 | |
|         def register(name: str, queue: Sized) -> None:
 | |
|             LaterGauge(
 | |
|                 "synapse_federation_send_queue_%s_size" % (queue_name,),
 | |
|                 "",
 | |
|                 [],
 | |
|                 lambda: len(queue),
 | |
|             )
 | |
| 
 | |
|         for queue_name in [
 | |
|             "presence_map",
 | |
|             "keyed_edu",
 | |
|             "keyed_edu_changed",
 | |
|             "edus",
 | |
|             "pos_time",
 | |
|             "presence_destinations",
 | |
|         ]:
 | |
|             register(queue_name, getattr(self, queue_name))
 | |
| 
 | |
|         self.clock.looping_call(self._clear_queue, 30 * 1000)
 | |
| 
 | |
|     def _next_pos(self) -> int:
 | |
|         pos = self.pos
 | |
|         self.pos += 1
 | |
|         self.pos_time[self.clock.time_msec()] = pos
 | |
|         return pos
 | |
| 
 | |
|     def _clear_queue(self) -> None:
 | |
|         """Clear the queues for anything older than N minutes"""
 | |
| 
 | |
|         FIVE_MINUTES_AGO = 5 * 60 * 1000
 | |
|         now = self.clock.time_msec()
 | |
| 
 | |
|         keys = self.pos_time.keys()
 | |
|         time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
 | |
|         if not keys[:time]:
 | |
|             return
 | |
| 
 | |
|         position_to_delete = max(keys[:time])
 | |
|         for key in keys[:time]:
 | |
|             del self.pos_time[key]
 | |
| 
 | |
|         self._clear_queue_before_pos(position_to_delete)
 | |
| 
 | |
|     def _clear_queue_before_pos(self, position_to_delete: int) -> None:
 | |
|         """Clear all the queues from before a given position"""
 | |
|         with Measure(self.clock, "send_queue._clear"):
 | |
|             # Delete things out of presence maps
 | |
|             keys = self.presence_destinations.keys()
 | |
|             i = self.presence_destinations.bisect_left(position_to_delete)
 | |
|             for key in keys[:i]:
 | |
|                 del self.presence_destinations[key]
 | |
| 
 | |
|             user_ids = {user_id for user_id, _ in self.presence_destinations.values()}
 | |
| 
 | |
|             to_del = [
 | |
|                 user_id for user_id in self.presence_map if user_id not in user_ids
 | |
|             ]
 | |
|             for user_id in to_del:
 | |
|                 del self.presence_map[user_id]
 | |
| 
 | |
|             # Delete things out of keyed edus
 | |
|             keys = self.keyed_edu_changed.keys()
 | |
|             i = self.keyed_edu_changed.bisect_left(position_to_delete)
 | |
|             for key in keys[:i]:
 | |
|                 del self.keyed_edu_changed[key]
 | |
| 
 | |
|             live_keys = set()
 | |
|             for edu_key in self.keyed_edu_changed.values():
 | |
|                 live_keys.add(edu_key)
 | |
| 
 | |
|             keys_to_del = [
 | |
|                 edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
 | |
|             ]
 | |
|             for edu_key in keys_to_del:
 | |
|                 del self.keyed_edu[edu_key]
 | |
| 
 | |
|             # Delete things out of edu map
 | |
|             keys = self.edus.keys()
 | |
|             i = self.edus.bisect_left(position_to_delete)
 | |
|             for key in keys[:i]:
 | |
|                 del self.edus[key]
 | |
| 
 | |
|     def notify_new_events(self, max_token: RoomStreamToken) -> None:
 | |
|         """As per FederationSender"""
 | |
|         # This should never get called.
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     def build_and_send_edu(
 | |
|         self,
 | |
|         destination: str,
 | |
|         edu_type: str,
 | |
|         content: JsonDict,
 | |
|         key: Optional[Hashable] = None,
 | |
|     ) -> None:
 | |
|         """As per FederationSender"""
 | |
|         if destination == self.server_name:
 | |
|             logger.info("Not sending EDU to ourselves")
 | |
|             return
 | |
| 
 | |
|         pos = self._next_pos()
 | |
| 
 | |
|         edu = Edu(
 | |
|             origin=self.server_name,
 | |
|             destination=destination,
 | |
|             edu_type=edu_type,
 | |
|             content=content,
 | |
|         )
 | |
| 
 | |
|         if key:
 | |
|             assert isinstance(key, tuple)
 | |
|             self.keyed_edu[(destination, key)] = edu
 | |
|             self.keyed_edu_changed[pos] = (destination, key)
 | |
|         else:
 | |
|             self.edus[pos] = edu
 | |
| 
 | |
|         self.notifier.on_new_replication_data()
 | |
| 
 | |
|     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
 | |
|         """As per FederationSender
 | |
| 
 | |
|         Args:
 | |
|             receipt:
 | |
|         """
 | |
|         # nothing to do here: the replication listener will handle it.
 | |
| 
 | |
|     def send_presence_to_destinations(
 | |
|         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
 | |
|     ) -> None:
 | |
|         """As per FederationSender
 | |
| 
 | |
|         Args:
 | |
|             states
 | |
|             destinations
 | |
|         """
 | |
|         for state in states:
 | |
|             pos = self._next_pos()
 | |
|             self.presence_map.update({state.user_id: state for state in states})
 | |
|             self.presence_destinations[pos] = (state.user_id, destinations)
 | |
| 
 | |
|         self.notifier.on_new_replication_data()
 | |
| 
 | |
|     def send_device_messages(self, destination: str, immediate: bool = True) -> None:
 | |
|         """As per FederationSender"""
 | |
|         # We don't need to replicate this as it gets sent down a different
 | |
|         # stream.
 | |
| 
 | |
|     def wake_destination(self, server: str) -> None:
 | |
|         pass
 | |
| 
 | |
|     def get_current_token(self) -> int:
 | |
|         return self.pos - 1
 | |
| 
 | |
|     def federation_ack(self, instance_name: str, token: int) -> None:
 | |
|         if self._sender_instances:
 | |
|             # If we have configured multiple federation sender instances we need
 | |
|             # to track their positions separately, and only clear the queue up
 | |
|             # to the token all instances have acked.
 | |
|             self._sender_positions[instance_name] = token
 | |
|             token = min(self._sender_positions.values())
 | |
| 
 | |
|         self._clear_queue_before_pos(token)
 | |
| 
 | |
|     async def get_replication_rows(
 | |
|         self, instance_name: str, from_token: int, to_token: int, target_row_count: int
 | |
|     ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
 | |
|         """Get rows to be sent over federation between the two tokens
 | |
| 
 | |
|         Args:
 | |
|             instance_name: the name of the current process
 | |
|             from_token: the previous stream token: the starting point for fetching the
 | |
|                 updates
 | |
|             to_token: the new stream token: the point to get updates up to
 | |
|             target_row_count: a target for the number of rows to be returned.
 | |
| 
 | |
|         Returns: a triplet `(updates, new_last_token, limited)`, where:
 | |
|            * `updates` is a list of `(token, row)` entries.
 | |
|            * `new_last_token` is the new position in stream.
 | |
|            * `limited` is whether there are more updates to fetch.
 | |
|         """
 | |
|         # TODO: Handle target_row_count.
 | |
| 
 | |
|         # To handle restarts where we wrap around
 | |
|         if from_token > self.pos:
 | |
|             from_token = -1
 | |
| 
 | |
|         # list of tuple(int, BaseFederationRow), where the first is the position
 | |
|         # of the federation stream.
 | |
|         rows: List[Tuple[int, BaseFederationRow]] = []
 | |
| 
 | |
|         # Fetch presence to send to destinations
 | |
|         i = self.presence_destinations.bisect_right(from_token)
 | |
|         j = self.presence_destinations.bisect_right(to_token) + 1
 | |
| 
 | |
|         for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
 | |
|             rows.append(
 | |
|                 (
 | |
|                     pos,
 | |
|                     PresenceDestinationsRow(
 | |
|                         state=self.presence_map[user_id], destinations=list(dests)
 | |
|                     ),
 | |
|                 )
 | |
|             )
 | |
| 
 | |
|         # Fetch changes keyed edus
 | |
|         i = self.keyed_edu_changed.bisect_right(from_token)
 | |
|         j = self.keyed_edu_changed.bisect_right(to_token) + 1
 | |
|         # We purposefully clobber based on the key here, python dict comprehensions
 | |
|         # always use the last value, so this will correctly point to the last
 | |
|         # stream position.
 | |
|         keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
 | |
| 
 | |
|         for (destination, edu_key), pos in keyed_edus.items():
 | |
|             rows.append(
 | |
|                 (
 | |
|                     pos,
 | |
|                     KeyedEduRow(
 | |
|                         key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
 | |
|                     ),
 | |
|                 )
 | |
|             )
 | |
| 
 | |
|         # Fetch changed edus
 | |
|         i = self.edus.bisect_right(from_token)
 | |
|         j = self.edus.bisect_right(to_token) + 1
 | |
|         edus = self.edus.items()[i:j]
 | |
| 
 | |
|         for pos, edu in edus:
 | |
|             rows.append((pos, EduRow(edu)))
 | |
| 
 | |
|         # Sort rows based on pos
 | |
|         rows.sort()
 | |
| 
 | |
|         return (
 | |
|             [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
 | |
|             to_token,
 | |
|             False,
 | |
|         )
 | |
| 
 | |
| 
 | |
| class BaseFederationRow:
 | |
|     """Base class for rows to be sent in the federation stream.
 | |
| 
 | |
|     Specifies how to identify, serialize and deserialize the different types.
 | |
|     """
 | |
| 
 | |
|     TypeId = ""  # Unique string that ids the type. Must be overridden in sub classes.
 | |
| 
 | |
|     @staticmethod
 | |
|     def from_data(data: JsonDict) -> "BaseFederationRow":
 | |
|         """Parse the data from the federation stream into a row.
 | |
| 
 | |
|         Args:
 | |
|             data: The value of ``data`` from FederationStreamRow.data, type
 | |
|                 depends on the type of stream
 | |
|         """
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     def to_data(self) -> JsonDict:
 | |
|         """Serialize this row to be sent over the federation stream.
 | |
| 
 | |
|         Returns:
 | |
|             The value to be sent in FederationStreamRow.data. The type depends
 | |
|             on the type of stream.
 | |
|         """
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
 | |
|         """Add this row to the appropriate field in the buffer ready for this
 | |
|         to be sent over federation.
 | |
| 
 | |
|         We use a buffer so that we can batch up events that have come in at
 | |
|         the same time and send them all at once.
 | |
| 
 | |
|         Args:
 | |
|             buff (BufferedToSend)
 | |
|         """
 | |
|         raise NotImplementedError()
 | |
| 
 | |
| 
 | |
| @attr.s(slots=True, frozen=True, auto_attribs=True)
 | |
| class PresenceDestinationsRow(BaseFederationRow):
 | |
|     state: UserPresenceState
 | |
|     destinations: List[str]
 | |
| 
 | |
|     TypeId = "pd"
 | |
| 
 | |
|     @staticmethod
 | |
|     def from_data(data: JsonDict) -> "PresenceDestinationsRow":
 | |
|         return PresenceDestinationsRow(
 | |
|             state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
 | |
|         )
 | |
| 
 | |
|     def to_data(self) -> JsonDict:
 | |
|         return {"state": self.state.as_dict(), "dests": self.destinations}
 | |
| 
 | |
|     def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
 | |
|         buff.presence_destinations.append((self.state, self.destinations))
 | |
| 
 | |
| 
 | |
| @attr.s(slots=True, frozen=True, auto_attribs=True)
 | |
| class KeyedEduRow(BaseFederationRow):
 | |
|     """Streams EDUs that have an associated key that is ued to clobber. For example,
 | |
|     typing EDUs clobber based on room_id.
 | |
|     """
 | |
| 
 | |
|     key: Tuple[str, ...]  # the edu key passed to send_edu
 | |
|     edu: Edu
 | |
| 
 | |
|     TypeId = "k"
 | |
| 
 | |
|     @staticmethod
 | |
|     def from_data(data: JsonDict) -> "KeyedEduRow":
 | |
|         return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
 | |
| 
 | |
|     def to_data(self) -> JsonDict:
 | |
|         return {"key": self.key, "edu": self.edu.get_internal_dict()}
 | |
| 
 | |
|     def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
 | |
|         buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
 | |
| 
 | |
| 
 | |
| @attr.s(slots=True, frozen=True, auto_attribs=True)
 | |
| class EduRow(BaseFederationRow):
 | |
|     """Streams EDUs that don't have keys. See KeyedEduRow"""
 | |
| 
 | |
|     edu: Edu
 | |
| 
 | |
|     TypeId = "e"
 | |
| 
 | |
|     @staticmethod
 | |
|     def from_data(data: JsonDict) -> "EduRow":
 | |
|         return EduRow(Edu(**data))
 | |
| 
 | |
|     def to_data(self) -> JsonDict:
 | |
|         return self.edu.get_internal_dict()
 | |
| 
 | |
|     def add_to_buffer(self, buff: "ParsedFederationStreamData") -> None:
 | |
|         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 | |
| 
 | |
| 
 | |
| _rowtypes: Tuple[Type[BaseFederationRow], ...] = (
 | |
|     PresenceDestinationsRow,
 | |
|     KeyedEduRow,
 | |
|     EduRow,
 | |
| )
 | |
| 
 | |
| TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
 | |
| 
 | |
| 
 | |
| @attr.s(slots=True, frozen=True, auto_attribs=True)
 | |
| class ParsedFederationStreamData:
 | |
|     # list of tuples of UserPresenceState and destinations
 | |
|     presence_destinations: List[Tuple[UserPresenceState, List[str]]]
 | |
|     # dict of destination -> { key -> Edu }
 | |
|     keyed_edus: Dict[str, Dict[Tuple[str, ...], Edu]]
 | |
|     # dict of destination -> [Edu]
 | |
|     edus: Dict[str, List[Edu]]
 | |
| 
 | |
| 
 | |
| def process_rows_for_federation(
 | |
|     transaction_queue: FederationSender,
 | |
|     rows: List[FederationStream.FederationStreamRow],
 | |
| ) -> None:
 | |
|     """Parse a list of rows from the federation stream and put them in the
 | |
|     transaction queue ready for sending to the relevant homeservers.
 | |
| 
 | |
|     Args:
 | |
|         transaction_queue
 | |
|         rows
 | |
|     """
 | |
| 
 | |
|     # The federation stream contains a bunch of different types of
 | |
|     # rows that need to be handled differently. We parse the rows, put
 | |
|     # them into the appropriate collection and then send them off.
 | |
| 
 | |
|     buff = ParsedFederationStreamData(
 | |
|         presence_destinations=[],
 | |
|         keyed_edus={},
 | |
|         edus={},
 | |
|     )
 | |
| 
 | |
|     # Parse the rows in the stream and add to the buffer
 | |
|     for row in rows:
 | |
|         if row.type not in TypeToRow:
 | |
|             logger.error("Unrecognized federation row type %r", row.type)
 | |
|             continue
 | |
| 
 | |
|         RowType = TypeToRow[row.type]
 | |
|         parsed_row = RowType.from_data(row.data)
 | |
|         parsed_row.add_to_buffer(buff)
 | |
| 
 | |
|     for state, destinations in buff.presence_destinations:
 | |
|         transaction_queue.send_presence_to_destinations(
 | |
|             states=[state], destinations=destinations
 | |
|         )
 | |
| 
 | |
|     for edu_map in buff.keyed_edus.values():
 | |
|         for key, edu in edu_map.items():
 | |
|             transaction_queue.send_edu(edu, key)
 | |
| 
 | |
|     for edu_list in buff.edus.values():
 | |
|         for edu in edu_list:
 | |
|             transaction_queue.send_edu(edu, None)
 |