673 lines
22 KiB
Python
673 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2014-2016 OpenMarket Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import itertools
|
|
import logging
|
|
import random
|
|
|
|
from six.moves import range
|
|
from six.moves.queue import Empty, PriorityQueue
|
|
|
|
from unpaddedbase64 import encode_base64
|
|
|
|
from twisted.internet import defer
|
|
|
|
from synapse.api.errors import StoreError
|
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
|
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
|
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
|
from synapse.storage.data_stores.main.signatures import SignatureWorkerStore
|
|
from synapse.storage.database import Database
|
|
from synapse.util.caches.descriptors import cached
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
|
|
def get_auth_chain(self, event_ids, include_given=False):
|
|
"""Get auth events for given event_ids. The events *must* be state events.
|
|
|
|
Args:
|
|
event_ids (list): state events
|
|
include_given (bool): include the given events in result
|
|
|
|
Returns:
|
|
list of events
|
|
"""
|
|
return self.get_auth_chain_ids(
|
|
event_ids, include_given=include_given
|
|
).addCallback(self.get_events_as_list)
|
|
|
|
def get_auth_chain_ids(self, event_ids, include_given=False):
|
|
"""Get auth events for given event_ids. The events *must* be state events.
|
|
|
|
Args:
|
|
event_ids (list): state events
|
|
include_given (bool): include the given events in result
|
|
|
|
Returns:
|
|
list of event_ids
|
|
"""
|
|
return self.db.runInteraction(
|
|
"get_auth_chain_ids", self._get_auth_chain_ids_txn, event_ids, include_given
|
|
)
|
|
|
|
def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
|
|
if include_given:
|
|
results = set(event_ids)
|
|
else:
|
|
results = set()
|
|
|
|
base_sql = "SELECT auth_id FROM event_auth WHERE "
|
|
|
|
front = set(event_ids)
|
|
while front:
|
|
new_front = set()
|
|
front_list = list(front)
|
|
chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)]
|
|
for chunk in chunks:
|
|
clause, args = make_in_list_sql_clause(
|
|
txn.database_engine, "event_id", chunk
|
|
)
|
|
txn.execute(base_sql + clause, list(args))
|
|
new_front.update([r[0] for r in txn])
|
|
|
|
new_front -= results
|
|
|
|
front = new_front
|
|
results.update(front)
|
|
|
|
return list(results)
|
|
|
|
def get_oldest_events_in_room(self, room_id):
|
|
return self.db.runInteraction(
|
|
"get_oldest_events_in_room", self._get_oldest_events_in_room_txn, room_id
|
|
)
|
|
|
|
def get_oldest_events_with_depth_in_room(self, room_id):
|
|
return self.db.runInteraction(
|
|
"get_oldest_events_with_depth_in_room",
|
|
self.get_oldest_events_with_depth_in_room_txn,
|
|
room_id,
|
|
)
|
|
|
|
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
|
|
sql = (
|
|
"SELECT b.event_id, MAX(e.depth) FROM events as e"
|
|
" INNER JOIN event_edges as g"
|
|
" ON g.event_id = e.event_id"
|
|
" INNER JOIN event_backward_extremities as b"
|
|
" ON g.prev_event_id = b.event_id"
|
|
" WHERE b.room_id = ? AND g.is_state is ?"
|
|
" GROUP BY b.event_id"
|
|
)
|
|
|
|
txn.execute(sql, (room_id, False))
|
|
|
|
return dict(txn)
|
|
|
|
@defer.inlineCallbacks
|
|
def get_max_depth_of(self, event_ids):
|
|
"""Returns the max depth of a set of event IDs
|
|
|
|
Args:
|
|
event_ids (list[str])
|
|
|
|
Returns
|
|
Deferred[int]
|
|
"""
|
|
rows = yield self.db.simple_select_many_batch(
|
|
table="events",
|
|
column="event_id",
|
|
iterable=event_ids,
|
|
retcols=("depth",),
|
|
desc="get_max_depth_of",
|
|
)
|
|
|
|
if not rows:
|
|
return 0
|
|
else:
|
|
return max(row["depth"] for row in rows)
|
|
|
|
def _get_oldest_events_in_room_txn(self, txn, room_id):
|
|
return self.db.simple_select_onecol_txn(
|
|
txn,
|
|
table="event_backward_extremities",
|
|
keyvalues={"room_id": room_id},
|
|
retcol="event_id",
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def get_prev_events_and_hashes_for_room(self, room_id):
|
|
"""
|
|
Gets a subset of the current forward extremities in the given room,
|
|
along with their depths and hashes.
|
|
|
|
Limits the result to 10 extremities, so that we can avoid creating
|
|
events which refer to hundreds of prev_events.
|
|
|
|
Args:
|
|
room_id (str): room_id
|
|
|
|
Returns:
|
|
Deferred[list[(str, dict[str, str], int)]]
|
|
for each event, a tuple of (event_id, hashes, depth)
|
|
where *hashes* is a map from algorithm to hash.
|
|
"""
|
|
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
|
|
if len(res) > 10:
|
|
# Sort by reverse depth, so we point to the most recent.
|
|
res.sort(key=lambda a: -a[2])
|
|
|
|
# we use half of the limit for the actual most recent events, and
|
|
# the other half to randomly point to some of the older events, to
|
|
# make sure that we don't completely ignore the older events.
|
|
res = res[0:5] + random.sample(res[5:], 5)
|
|
|
|
return res
|
|
|
|
def get_latest_event_ids_and_hashes_in_room(self, room_id):
|
|
"""
|
|
Gets the current forward extremities in the given room
|
|
|
|
Args:
|
|
room_id (str): room_id
|
|
|
|
Returns:
|
|
Deferred[list[(str, dict[str, str], int)]]
|
|
for each event, a tuple of (event_id, hashes, depth)
|
|
where *hashes* is a map from algorithm to hash.
|
|
"""
|
|
|
|
return self.db.runInteraction(
|
|
"get_latest_event_ids_and_hashes_in_room",
|
|
self._get_latest_event_ids_and_hashes_in_room,
|
|
room_id,
|
|
)
|
|
|
|
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
|
|
"""Get the top rooms with at least N extremities.
|
|
|
|
Args:
|
|
min_count (int): The minimum number of extremities
|
|
limit (int): The maximum number of rooms to return.
|
|
room_id_filter (iterable[str]): room_ids to exclude from the results
|
|
|
|
Returns:
|
|
Deferred[list]: At most `limit` room IDs that have at least
|
|
`min_count` extremities, sorted by extremity count.
|
|
"""
|
|
|
|
def _get_rooms_with_many_extremities_txn(txn):
|
|
where_clause = "1=1"
|
|
if room_id_filter:
|
|
where_clause = "room_id NOT IN (%s)" % (
|
|
",".join("?" for _ in room_id_filter),
|
|
)
|
|
|
|
sql = """
|
|
SELECT room_id FROM event_forward_extremities
|
|
WHERE %s
|
|
GROUP BY room_id
|
|
HAVING count(*) > ?
|
|
ORDER BY count(*) DESC
|
|
LIMIT ?
|
|
""" % (
|
|
where_clause,
|
|
)
|
|
|
|
query_args = list(itertools.chain(room_id_filter, [min_count, limit]))
|
|
txn.execute(sql, query_args)
|
|
return [room_id for room_id, in txn]
|
|
|
|
return self.db.runInteraction(
|
|
"get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn
|
|
)
|
|
|
|
@cached(max_entries=5000, iterable=True)
|
|
def get_latest_event_ids_in_room(self, room_id):
|
|
return self.db.simple_select_onecol(
|
|
table="event_forward_extremities",
|
|
keyvalues={"room_id": room_id},
|
|
retcol="event_id",
|
|
desc="get_latest_event_ids_in_room",
|
|
)
|
|
|
|
def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
|
|
sql = (
|
|
"SELECT e.event_id, e.depth FROM events as e "
|
|
"INNER JOIN event_forward_extremities as f "
|
|
"ON e.event_id = f.event_id "
|
|
"AND e.room_id = f.room_id "
|
|
"WHERE f.room_id = ?"
|
|
)
|
|
|
|
txn.execute(sql, (room_id,))
|
|
|
|
results = []
|
|
for event_id, depth in txn.fetchall():
|
|
hashes = self._get_event_reference_hashes_txn(txn, event_id)
|
|
prev_hashes = {
|
|
k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
|
|
}
|
|
results.append((event_id, prev_hashes, depth))
|
|
|
|
return results
|
|
|
|
def get_min_depth(self, room_id):
|
|
""" For hte given room, get the minimum depth we have seen for it.
|
|
"""
|
|
return self.db.runInteraction(
|
|
"get_min_depth", self._get_min_depth_interaction, room_id
|
|
)
|
|
|
|
def _get_min_depth_interaction(self, txn, room_id):
|
|
min_depth = self.db.simple_select_one_onecol_txn(
|
|
txn,
|
|
table="room_depth",
|
|
keyvalues={"room_id": room_id},
|
|
retcol="min_depth",
|
|
allow_none=True,
|
|
)
|
|
|
|
return int(min_depth) if min_depth is not None else None
|
|
|
|
def get_forward_extremeties_for_room(self, room_id, stream_ordering):
|
|
"""For a given room_id and stream_ordering, return the forward
|
|
extremeties of the room at that point in "time".
|
|
|
|
Throws a StoreError if we have since purged the index for
|
|
stream_orderings from that point.
|
|
|
|
Args:
|
|
room_id (str):
|
|
stream_ordering (int):
|
|
|
|
Returns:
|
|
deferred, which resolves to a list of event_ids
|
|
"""
|
|
# We want to make the cache more effective, so we clamp to the last
|
|
# change before the given ordering.
|
|
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
|
|
|
|
# We don't always have a full stream_to_exterm_id table, e.g. after
|
|
# the upgrade that introduced it, so we make sure we never ask for a
|
|
# stream_ordering from before a restart
|
|
last_change = max(self._stream_order_on_start, last_change)
|
|
|
|
# provided the last_change is recent enough, we now clamp the requested
|
|
# stream_ordering to it.
|
|
if last_change > self.stream_ordering_month_ago:
|
|
stream_ordering = min(last_change, stream_ordering)
|
|
|
|
return self._get_forward_extremeties_for_room(room_id, stream_ordering)
|
|
|
|
@cached(max_entries=5000, num_args=2)
|
|
def _get_forward_extremeties_for_room(self, room_id, stream_ordering):
|
|
"""For a given room_id and stream_ordering, return the forward
|
|
extremeties of the room at that point in "time".
|
|
|
|
Throws a StoreError if we have since purged the index for
|
|
stream_orderings from that point.
|
|
"""
|
|
|
|
if stream_ordering <= self.stream_ordering_month_ago:
|
|
raise StoreError(400, "stream_ordering too old")
|
|
|
|
sql = """
|
|
SELECT event_id FROM stream_ordering_to_exterm
|
|
INNER JOIN (
|
|
SELECT room_id, MAX(stream_ordering) AS stream_ordering
|
|
FROM stream_ordering_to_exterm
|
|
WHERE stream_ordering <= ? GROUP BY room_id
|
|
) AS rms USING (room_id, stream_ordering)
|
|
WHERE room_id = ?
|
|
"""
|
|
|
|
def get_forward_extremeties_for_room_txn(txn):
|
|
txn.execute(sql, (stream_ordering, room_id))
|
|
return [event_id for event_id, in txn]
|
|
|
|
return self.db.runInteraction(
|
|
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
|
|
)
|
|
|
|
def get_backfill_events(self, room_id, event_list, limit):
|
|
"""Get a list of Events for a given topic that occurred before (and
|
|
including) the events in event_list. Return a list of max size `limit`
|
|
|
|
Args:
|
|
txn
|
|
room_id (str)
|
|
event_list (list)
|
|
limit (int)
|
|
"""
|
|
return (
|
|
self.db.runInteraction(
|
|
"get_backfill_events",
|
|
self._get_backfill_events,
|
|
room_id,
|
|
event_list,
|
|
limit,
|
|
)
|
|
.addCallback(self.get_events_as_list)
|
|
.addCallback(lambda l: sorted(l, key=lambda e: -e.depth))
|
|
)
|
|
|
|
def _get_backfill_events(self, txn, room_id, event_list, limit):
|
|
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
|
|
|
|
event_results = set()
|
|
|
|
# We want to make sure that we do a breadth-first, "depth" ordered
|
|
# search.
|
|
|
|
query = (
|
|
"SELECT depth, prev_event_id FROM event_edges"
|
|
" INNER JOIN events"
|
|
" ON prev_event_id = events.event_id"
|
|
" WHERE event_edges.event_id = ?"
|
|
" AND event_edges.is_state = ?"
|
|
" LIMIT ?"
|
|
)
|
|
|
|
queue = PriorityQueue()
|
|
|
|
for event_id in event_list:
|
|
depth = self.db.simple_select_one_onecol_txn(
|
|
txn,
|
|
table="events",
|
|
keyvalues={"event_id": event_id, "room_id": room_id},
|
|
retcol="depth",
|
|
allow_none=True,
|
|
)
|
|
|
|
if depth:
|
|
queue.put((-depth, event_id))
|
|
|
|
while not queue.empty() and len(event_results) < limit:
|
|
try:
|
|
_, event_id = queue.get_nowait()
|
|
except Empty:
|
|
break
|
|
|
|
if event_id in event_results:
|
|
continue
|
|
|
|
event_results.add(event_id)
|
|
|
|
txn.execute(query, (event_id, False, limit - len(event_results)))
|
|
|
|
for row in txn:
|
|
if row[1] not in event_results:
|
|
queue.put((-row[0], row[1]))
|
|
|
|
return event_results
|
|
|
|
@defer.inlineCallbacks
|
|
def get_missing_events(self, room_id, earliest_events, latest_events, limit):
|
|
ids = yield self.db.runInteraction(
|
|
"get_missing_events",
|
|
self._get_missing_events,
|
|
room_id,
|
|
earliest_events,
|
|
latest_events,
|
|
limit,
|
|
)
|
|
events = yield self.get_events_as_list(ids)
|
|
return events
|
|
|
|
def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit):
|
|
|
|
seen_events = set(earliest_events)
|
|
front = set(latest_events) - seen_events
|
|
event_results = []
|
|
|
|
query = (
|
|
"SELECT prev_event_id FROM event_edges "
|
|
"WHERE room_id = ? AND event_id = ? AND is_state = ? "
|
|
"LIMIT ?"
|
|
)
|
|
|
|
while front and len(event_results) < limit:
|
|
new_front = set()
|
|
for event_id in front:
|
|
txn.execute(
|
|
query, (room_id, event_id, False, limit - len(event_results))
|
|
)
|
|
|
|
new_results = set(t[0] for t in txn) - seen_events
|
|
|
|
new_front |= new_results
|
|
seen_events |= new_results
|
|
event_results.extend(new_results)
|
|
|
|
front = new_front
|
|
|
|
# we built the list working backwards from latest_events; we now need to
|
|
# reverse it so that the events are approximately chronological.
|
|
event_results.reverse()
|
|
return event_results
|
|
|
|
@defer.inlineCallbacks
|
|
def get_successor_events(self, event_ids):
|
|
"""Fetch all events that have the given events as a prev event
|
|
|
|
Args:
|
|
event_ids (iterable[str])
|
|
|
|
Returns:
|
|
Deferred[list[str]]
|
|
"""
|
|
rows = yield self.db.simple_select_many_batch(
|
|
table="event_edges",
|
|
column="prev_event_id",
|
|
iterable=event_ids,
|
|
retcols=("event_id",),
|
|
desc="get_successor_events",
|
|
)
|
|
|
|
return [row["event_id"] for row in rows]
|
|
|
|
|
|
class EventFederationStore(EventFederationWorkerStore):
|
|
""" Responsible for storing and serving up the various graphs associated
|
|
with an event. Including the main event graph and the auth chains for an
|
|
event.
|
|
|
|
Also has methods for getting the front (latest) and back (oldest) edges
|
|
of the event graphs. These are used to generate the parents for new events
|
|
and backfilling from another server respectively.
|
|
"""
|
|
|
|
EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
|
|
|
|
def __init__(self, database: Database, db_conn, hs):
|
|
super(EventFederationStore, self).__init__(database, db_conn, hs)
|
|
|
|
self.db.updates.register_background_update_handler(
|
|
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
|
|
)
|
|
|
|
hs.get_clock().looping_call(
|
|
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
|
|
)
|
|
|
|
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
|
|
min_depth = self._get_min_depth_interaction(txn, room_id)
|
|
|
|
if min_depth and depth >= min_depth:
|
|
return
|
|
|
|
self.db.simple_upsert_txn(
|
|
txn,
|
|
table="room_depth",
|
|
keyvalues={"room_id": room_id},
|
|
values={"min_depth": depth},
|
|
)
|
|
|
|
def _handle_mult_prev_events(self, txn, events):
|
|
"""
|
|
For the given event, update the event edges table and forward and
|
|
backward extremities tables.
|
|
"""
|
|
self.db.simple_insert_many_txn(
|
|
txn,
|
|
table="event_edges",
|
|
values=[
|
|
{
|
|
"event_id": ev.event_id,
|
|
"prev_event_id": e_id,
|
|
"room_id": ev.room_id,
|
|
"is_state": False,
|
|
}
|
|
for ev in events
|
|
for e_id in ev.prev_event_ids()
|
|
],
|
|
)
|
|
|
|
self._update_backward_extremeties(txn, events)
|
|
|
|
def _update_backward_extremeties(self, txn, events):
|
|
"""Updates the event_backward_extremities tables based on the new/updated
|
|
events being persisted.
|
|
|
|
This is called for new events *and* for events that were outliers, but
|
|
are now being persisted as non-outliers.
|
|
|
|
Forward extremities are handled when we first start persisting the events.
|
|
"""
|
|
events_by_room = {}
|
|
for ev in events:
|
|
events_by_room.setdefault(ev.room_id, []).append(ev)
|
|
|
|
query = (
|
|
"INSERT INTO event_backward_extremities (event_id, room_id)"
|
|
" SELECT ?, ? WHERE NOT EXISTS ("
|
|
" SELECT 1 FROM event_backward_extremities"
|
|
" WHERE event_id = ? AND room_id = ?"
|
|
" )"
|
|
" AND NOT EXISTS ("
|
|
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
|
|
" AND outlier = ?"
|
|
" )"
|
|
)
|
|
|
|
txn.executemany(
|
|
query,
|
|
[
|
|
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
|
|
for ev in events
|
|
for e_id in ev.prev_event_ids()
|
|
if not ev.internal_metadata.is_outlier()
|
|
],
|
|
)
|
|
|
|
query = (
|
|
"DELETE FROM event_backward_extremities"
|
|
" WHERE event_id = ? AND room_id = ?"
|
|
)
|
|
txn.executemany(
|
|
query,
|
|
[
|
|
(ev.event_id, ev.room_id)
|
|
for ev in events
|
|
if not ev.internal_metadata.is_outlier()
|
|
],
|
|
)
|
|
|
|
def _delete_old_forward_extrem_cache(self):
|
|
def _delete_old_forward_extrem_cache_txn(txn):
|
|
# Delete entries older than a month, while making sure we don't delete
|
|
# the only entries for a room.
|
|
sql = """
|
|
DELETE FROM stream_ordering_to_exterm
|
|
WHERE
|
|
room_id IN (
|
|
SELECT room_id
|
|
FROM stream_ordering_to_exterm
|
|
WHERE stream_ordering > ?
|
|
) AND stream_ordering < ?
|
|
"""
|
|
txn.execute(
|
|
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)
|
|
)
|
|
|
|
return run_as_background_process(
|
|
"delete_old_forward_extrem_cache",
|
|
self.db.runInteraction,
|
|
"_delete_old_forward_extrem_cache",
|
|
_delete_old_forward_extrem_cache_txn,
|
|
)
|
|
|
|
def clean_room_for_join(self, room_id):
|
|
return self.db.runInteraction(
|
|
"clean_room_for_join", self._clean_room_for_join_txn, room_id
|
|
)
|
|
|
|
def _clean_room_for_join_txn(self, txn, room_id):
|
|
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
|
|
|
|
txn.execute(query, (room_id,))
|
|
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
|
|
|
|
@defer.inlineCallbacks
|
|
def _background_delete_non_state_event_auth(self, progress, batch_size):
|
|
def delete_event_auth(txn):
|
|
target_min_stream_id = progress.get("target_min_stream_id_inclusive")
|
|
max_stream_id = progress.get("max_stream_id_exclusive")
|
|
|
|
if not target_min_stream_id or not max_stream_id:
|
|
txn.execute("SELECT COALESCE(MIN(stream_ordering), 0) FROM events")
|
|
rows = txn.fetchall()
|
|
target_min_stream_id = rows[0][0]
|
|
|
|
txn.execute("SELECT COALESCE(MAX(stream_ordering), 0) FROM events")
|
|
rows = txn.fetchall()
|
|
max_stream_id = rows[0][0]
|
|
|
|
min_stream_id = max_stream_id - batch_size
|
|
|
|
sql = """
|
|
DELETE FROM event_auth
|
|
WHERE event_id IN (
|
|
SELECT event_id FROM events
|
|
LEFT JOIN state_events USING (room_id, event_id)
|
|
WHERE ? <= stream_ordering AND stream_ordering < ?
|
|
AND state_key IS null
|
|
)
|
|
"""
|
|
|
|
txn.execute(sql, (min_stream_id, max_stream_id))
|
|
|
|
new_progress = {
|
|
"target_min_stream_id_inclusive": target_min_stream_id,
|
|
"max_stream_id_exclusive": min_stream_id,
|
|
}
|
|
|
|
self.db.updates._background_update_progress_txn(
|
|
txn, self.EVENT_AUTH_STATE_ONLY, new_progress
|
|
)
|
|
|
|
return min_stream_id >= target_min_stream_id
|
|
|
|
result = yield self.db.runInteraction(
|
|
self.EVENT_AUTH_STATE_ONLY, delete_event_auth
|
|
)
|
|
|
|
if not result:
|
|
yield self.db.updates._end_background_update(self.EVENT_AUTH_STATE_ONLY)
|
|
|
|
return batch_size
|