148 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			148 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
| # -*- coding: utf-8 -*-
 | |
| # Copyright 2017 Vector Creations Ltd
 | |
| # Copyright 2019 New Vector Ltd
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import heapq
 | |
| from typing import Tuple, Type
 | |
| 
 | |
| import attr
 | |
| 
 | |
| from ._base import Stream, db_query_to_update_function
 | |
| 
 | |
| 
 | |
| """Handling of the 'events' replication stream
 | |
| 
 | |
| This stream contains rows of various types. Each row therefore contains a 'type'
 | |
| identifier before the real data. For example::
 | |
| 
 | |
|     RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
 | |
|     RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
 | |
| 
 | |
| An "ev" row is sent for each new event. The fields in the data part are:
 | |
| 
 | |
|  * The new event id
 | |
|  * The room id for the event
 | |
|  * The type of the new event
 | |
|  * The state key of the event, for state events
 | |
|  * The event id of an event which is redacted by this event.
 | |
| 
 | |
| A "state" row is sent whenever the "current state" in a room changes. The fields in the
 | |
| data part are:
 | |
| 
 | |
|  * The room id for the state change
 | |
|  * The event type of the state which has changed
 | |
|  * The state_key of the state which has changed
 | |
|  * The event id of the new state
 | |
| 
 | |
| """
 | |
| 
 | |
| 
 | |
| @attr.s(slots=True, frozen=True)
 | |
| class EventsStreamRow(object):
 | |
|     """A parsed row from the events replication stream"""
 | |
| 
 | |
|     type = attr.ib()  # str: the TypeId of one of the *EventsStreamRows
 | |
|     data = attr.ib()  # BaseEventsStreamRow
 | |
| 
 | |
| 
 | |
| class BaseEventsStreamRow(object):
 | |
|     """Base class for rows to be sent in the events stream.
 | |
| 
 | |
|     Specifies how to identify, serialize and deserialize the different types.
 | |
|     """
 | |
| 
 | |
|     # Unique string that ids the type. Must be overriden in sub classes.
 | |
|     TypeId = None  # type: str
 | |
| 
 | |
|     @classmethod
 | |
|     def from_data(cls, data):
 | |
|         """Parse the data from the replication stream into a row.
 | |
| 
 | |
|         By default we just call the constructor with the data list as arguments
 | |
| 
 | |
|         Args:
 | |
|             data: The value of the data object from the replication stream
 | |
|         """
 | |
|         return cls(*data)
 | |
| 
 | |
| 
 | |
| @attr.s(slots=True, frozen=True)
 | |
| class EventsStreamEventRow(BaseEventsStreamRow):
 | |
|     TypeId = "ev"
 | |
| 
 | |
|     event_id = attr.ib()  # str
 | |
|     room_id = attr.ib()  # str
 | |
|     type = attr.ib()  # str
 | |
|     state_key = attr.ib()  # str, optional
 | |
|     redacts = attr.ib()  # str, optional
 | |
|     relates_to = attr.ib()  # str, optional
 | |
| 
 | |
| 
 | |
| @attr.s(slots=True, frozen=True)
 | |
| class EventsStreamCurrentStateRow(BaseEventsStreamRow):
 | |
|     TypeId = "state"
 | |
| 
 | |
|     room_id = attr.ib()  # str
 | |
|     type = attr.ib()  # str
 | |
|     state_key = attr.ib()  # str
 | |
|     event_id = attr.ib()  # str, optional
 | |
| 
 | |
| 
 | |
| _EventRows = (
 | |
|     EventsStreamEventRow,
 | |
|     EventsStreamCurrentStateRow,
 | |
| )  # type: Tuple[Type[BaseEventsStreamRow], ...]
 | |
| 
 | |
| TypeToRow = {Row.TypeId: Row for Row in _EventRows}
 | |
| 
 | |
| 
 | |
| class EventsStream(Stream):
 | |
|     """We received a new event, or an event went from being an outlier to not
 | |
|     """
 | |
| 
 | |
|     NAME = "events"
 | |
| 
 | |
|     def __init__(self, hs):
 | |
|         self._store = hs.get_datastore()
 | |
|         self.current_token = self._store.get_current_events_token  # type: ignore
 | |
|         self.update_function = db_query_to_update_function(self._update_function)  # type: ignore
 | |
| 
 | |
|         super(EventsStream, self).__init__(hs)
 | |
| 
 | |
|     async def _update_function(self, from_token, current_token, limit=None):
 | |
|         event_rows = await self._store.get_all_new_forward_event_rows(
 | |
|             from_token, current_token, limit
 | |
|         )
 | |
|         event_updates = (
 | |
|             (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
 | |
|         )
 | |
| 
 | |
|         state_rows = await self._store.get_all_updated_current_state_deltas(
 | |
|             from_token, current_token, limit
 | |
|         )
 | |
|         state_updates = (
 | |
|             (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
 | |
|         )
 | |
| 
 | |
|         all_updates = heapq.merge(event_updates, state_updates)
 | |
| 
 | |
|         return all_updates
 | |
| 
 | |
|     @classmethod
 | |
|     def parse_row(cls, row):
 | |
|         (typ, data) = row
 | |
|         data = TypeToRow[typ].from_data(data)
 | |
|         return EventsStreamRow(typ, data)
 |