Fetch edits for multiple events in a single query. (#11660)
This should reduce database usage when fetching bundled aggregations as the number of individual queries (and round trips to the database) are reduced.pull/11939/head
parent
380c3d40f4
commit
8b309adb43
|
@ -0,0 +1 @@
|
||||||
|
Improve performance when fetching bundled aggregations for multiple events.
|
|
@ -1801,9 +1801,7 @@ class PersistEventsStore:
|
||||||
)
|
)
|
||||||
|
|
||||||
if rel_type == RelationTypes.REPLACE:
|
if rel_type == RelationTypes.REPLACE:
|
||||||
txn.call_after(
|
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
|
||||||
self.store.get_applicable_edit.invalidate, (parent_id, event.room_id)
|
|
||||||
)
|
|
||||||
|
|
||||||
if rel_type == RelationTypes.THREAD:
|
if rel_type == RelationTypes.THREAD:
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
|
|
|
@ -13,12 +13,22 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union, cast
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
Collection,
|
||||||
|
Dict,
|
||||||
|
Iterable,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Tuple,
|
||||||
|
Union,
|
||||||
|
cast,
|
||||||
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
from frozendict import frozendict
|
from frozendict import frozendict
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, RelationTypes
|
from synapse.api.constants import RelationTypes
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import (
|
from synapse.storage.database import (
|
||||||
|
@ -28,13 +38,14 @@ from synapse.storage.database import (
|
||||||
make_in_list_sql_clause,
|
make_in_list_sql_clause,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.stream import generate_pagination_where_clause
|
from synapse.storage.databases.main.stream import generate_pagination_where_clause
|
||||||
|
from synapse.storage.engines import PostgresEngine
|
||||||
from synapse.storage.relations import (
|
from synapse.storage.relations import (
|
||||||
AggregationPaginationToken,
|
AggregationPaginationToken,
|
||||||
PaginationChunk,
|
PaginationChunk,
|
||||||
RelationPaginationToken,
|
RelationPaginationToken,
|
||||||
)
|
)
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
@ -340,20 +351,24 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
@cached()
|
@cached()
|
||||||
async def get_applicable_edit(
|
def get_applicable_edit(self, event_id: str) -> Optional[EventBase]:
|
||||||
self, event_id: str, room_id: str
|
raise NotImplementedError()
|
||||||
) -> Optional[EventBase]:
|
|
||||||
|
@cachedList(cached_method_name="get_applicable_edit", list_name="event_ids")
|
||||||
|
async def _get_applicable_edits(
|
||||||
|
self, event_ids: Collection[str]
|
||||||
|
) -> Dict[str, Optional[EventBase]]:
|
||||||
"""Get the most recent edit (if any) that has happened for the given
|
"""Get the most recent edit (if any) that has happened for the given
|
||||||
event.
|
events.
|
||||||
|
|
||||||
Correctly handles checking whether edits were allowed to happen.
|
Correctly handles checking whether edits were allowed to happen.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_id: The original event ID
|
event_ids: The original event IDs
|
||||||
room_id: The original event's room ID
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The most recent edit, if any.
|
A map of the most recent edit for each event. If there are no edits,
|
||||||
|
the event will map to None.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# We only allow edits for `m.room.message` events that have the same sender
|
# We only allow edits for `m.room.message` events that have the same sender
|
||||||
|
@ -362,37 +377,67 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
# Fetches latest edit that has the same type and sender as the
|
# Fetches latest edit that has the same type and sender as the
|
||||||
# original, and is an `m.room.message`.
|
# original, and is an `m.room.message`.
|
||||||
sql = """
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
SELECT edit.event_id FROM events AS edit
|
# The `DISTINCT ON` clause will pick the *first* row it encounters,
|
||||||
INNER JOIN event_relations USING (event_id)
|
# so ordering by origin server ts + event ID desc will ensure we get
|
||||||
INNER JOIN events AS original ON
|
# the latest edit.
|
||||||
original.event_id = relates_to_id
|
sql = """
|
||||||
AND edit.type = original.type
|
SELECT DISTINCT ON (original.event_id) original.event_id, edit.event_id FROM events AS edit
|
||||||
AND edit.sender = original.sender
|
INNER JOIN event_relations USING (event_id)
|
||||||
WHERE
|
INNER JOIN events AS original ON
|
||||||
relates_to_id = ?
|
original.event_id = relates_to_id
|
||||||
AND relation_type = ?
|
AND edit.type = original.type
|
||||||
AND edit.room_id = ?
|
AND edit.sender = original.sender
|
||||||
AND edit.type = 'm.room.message'
|
AND edit.room_id = original.room_id
|
||||||
ORDER by edit.origin_server_ts DESC, edit.event_id DESC
|
WHERE
|
||||||
LIMIT 1
|
%s
|
||||||
"""
|
AND relation_type = ?
|
||||||
|
AND edit.type = 'm.room.message'
|
||||||
|
ORDER by original.event_id DESC, edit.origin_server_ts DESC, edit.event_id DESC
|
||||||
|
"""
|
||||||
|
else:
|
||||||
|
# SQLite uses a simplified query which returns all edits for an
|
||||||
|
# original event. The results are then de-duplicated when turned into
|
||||||
|
# a dict. Due to the chosen ordering, the latest edit stomps on
|
||||||
|
# earlier edits.
|
||||||
|
sql = """
|
||||||
|
SELECT original.event_id, edit.event_id FROM events AS edit
|
||||||
|
INNER JOIN event_relations USING (event_id)
|
||||||
|
INNER JOIN events AS original ON
|
||||||
|
original.event_id = relates_to_id
|
||||||
|
AND edit.type = original.type
|
||||||
|
AND edit.sender = original.sender
|
||||||
|
AND edit.room_id = original.room_id
|
||||||
|
WHERE
|
||||||
|
%s
|
||||||
|
AND relation_type = ?
|
||||||
|
AND edit.type = 'm.room.message'
|
||||||
|
ORDER by edit.origin_server_ts, edit.event_id
|
||||||
|
"""
|
||||||
|
|
||||||
def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]:
|
def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]:
|
||||||
txn.execute(sql, (event_id, RelationTypes.REPLACE, room_id))
|
clause, args = make_in_list_sql_clause(
|
||||||
row = txn.fetchone()
|
txn.database_engine, "relates_to_id", event_ids
|
||||||
if row:
|
)
|
||||||
return row[0]
|
args.append(RelationTypes.REPLACE)
|
||||||
return None
|
|
||||||
|
|
||||||
edit_id = await self.db_pool.runInteraction(
|
txn.execute(sql % (clause,), args)
|
||||||
"get_applicable_edit", _get_applicable_edit_txn
|
return dict(cast(Iterable[Tuple[str, str]], txn.fetchall()))
|
||||||
|
|
||||||
|
edit_ids = await self.db_pool.runInteraction(
|
||||||
|
"get_applicable_edits", _get_applicable_edits_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
if not edit_id:
|
edits = await self.get_events(edit_ids.values()) # type: ignore[attr-defined]
|
||||||
return None
|
|
||||||
|
|
||||||
return await self.get_event(edit_id, allow_none=True) # type: ignore[attr-defined]
|
# Map to the original event IDs to the edit events.
|
||||||
|
#
|
||||||
|
# There might not be an edit event due to there being no edits or
|
||||||
|
# due to the event not being known, either case is treated the same.
|
||||||
|
return {
|
||||||
|
original_event_id: edits.get(edit_ids.get(original_event_id))
|
||||||
|
for original_event_id in event_ids
|
||||||
|
}
|
||||||
|
|
||||||
@cached()
|
@cached()
|
||||||
async def get_thread_summary(
|
async def get_thread_summary(
|
||||||
|
@ -612,9 +657,6 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
The bundled aggregations for an event, if bundled aggregations are
|
The bundled aggregations for an event, if bundled aggregations are
|
||||||
enabled and the event can have bundled aggregations.
|
enabled and the event can have bundled aggregations.
|
||||||
"""
|
"""
|
||||||
# State events and redacted events do not get bundled aggregations.
|
|
||||||
if event.is_state() or event.internal_metadata.is_redacted():
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Do not bundle aggregations for an event which represents an edit or an
|
# Do not bundle aggregations for an event which represents an edit or an
|
||||||
# annotation. It does not make sense for them to have related events.
|
# annotation. It does not make sense for them to have related events.
|
||||||
|
@ -642,13 +684,6 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
if references.chunk:
|
if references.chunk:
|
||||||
aggregations.references = references.to_dict()
|
aggregations.references = references.to_dict()
|
||||||
|
|
||||||
edit = None
|
|
||||||
if event.type == EventTypes.Message:
|
|
||||||
edit = await self.get_applicable_edit(event_id, room_id)
|
|
||||||
|
|
||||||
if edit:
|
|
||||||
aggregations.replace = edit
|
|
||||||
|
|
||||||
# If this event is the start of a thread, include a summary of the replies.
|
# If this event is the start of a thread, include a summary of the replies.
|
||||||
if self._msc3440_enabled:
|
if self._msc3440_enabled:
|
||||||
thread_count, latest_thread_event = await self.get_thread_summary(
|
thread_count, latest_thread_event = await self.get_thread_summary(
|
||||||
|
@ -668,9 +703,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
return aggregations
|
return aggregations
|
||||||
|
|
||||||
async def get_bundled_aggregations(
|
async def get_bundled_aggregations(
|
||||||
self,
|
self, events: Iterable[EventBase], user_id: str
|
||||||
events: Iterable[EventBase],
|
|
||||||
user_id: str,
|
|
||||||
) -> Dict[str, BundledAggregations]:
|
) -> Dict[str, BundledAggregations]:
|
||||||
"""Generate bundled aggregations for events.
|
"""Generate bundled aggregations for events.
|
||||||
|
|
||||||
|
@ -683,13 +716,28 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||||
events may have bundled aggregations in the results.
|
events may have bundled aggregations in the results.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# TODO Parallelize.
|
# State events and redacted events do not get bundled aggregations.
|
||||||
results = {}
|
events = [
|
||||||
|
event
|
||||||
|
for event in events
|
||||||
|
if not event.is_state() and not event.internal_metadata.is_redacted()
|
||||||
|
]
|
||||||
|
|
||||||
|
# event ID -> bundled aggregation in non-serialized form.
|
||||||
|
results: Dict[str, BundledAggregations] = {}
|
||||||
|
|
||||||
|
# Fetch other relations per event.
|
||||||
for event in events:
|
for event in events:
|
||||||
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
|
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
|
||||||
if event_result:
|
if event_result:
|
||||||
results[event.event_id] = event_result
|
results[event.event_id] = event_result
|
||||||
|
|
||||||
|
# Fetch any edits.
|
||||||
|
event_ids = [event.event_id for event in events]
|
||||||
|
edits = await self._get_applicable_edits(event_ids)
|
||||||
|
for event_id, edit in edits.items():
|
||||||
|
results.setdefault(event_id, BundledAggregations()).replace = edit
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue