Reintroduce membership tables event stream ordering (#15128)
* Add `event_stream_ordering` column to membership state tables Specifically this adds the column to `current_state_events`, `local_current_membership` and `room_memberships`. Each of these tables is regularly joined with the `events` table to get the stream ordering and denormalising this into each table will yield significant query performance improvements once used. * Make denormalised `event_stream_ordering` columns foreign keys * Add comment in schema file explaining new denormalised columns * Add triggers to enforce consistency of `event_stream_ordering` columns * Re-order purge room tables to account for foreign keys * Bump schema version to 75 Co-authored-by: David Robertson <david.m.robertson1@gmail.com> Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>pull/15319/head
parent
98fd558382
commit
e6af49fbea
|
@ -0,0 +1 @@
|
|||
Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).
|
|
@ -1126,11 +1126,15 @@ class PersistEventsStore:
|
|||
# been inserted into room_memberships.
|
||||
txn.execute_batch(
|
||||
"""INSERT INTO current_state_events
|
||||
(room_id, type, state_key, event_id, membership)
|
||||
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
||||
(room_id, type, state_key, event_id, membership, event_stream_ordering)
|
||||
VALUES (
|
||||
?, ?, ?, ?,
|
||||
(SELECT membership FROM room_memberships WHERE event_id = ?),
|
||||
(SELECT stream_ordering FROM events WHERE event_id = ?)
|
||||
)
|
||||
""",
|
||||
[
|
||||
(room_id, key[0], key[1], ev_id, ev_id)
|
||||
(room_id, key[0], key[1], ev_id, ev_id, ev_id)
|
||||
for key, ev_id in to_insert.items()
|
||||
],
|
||||
)
|
||||
|
@ -1157,11 +1161,15 @@ class PersistEventsStore:
|
|||
if to_insert:
|
||||
txn.execute_batch(
|
||||
"""INSERT INTO local_current_membership
|
||||
(room_id, user_id, event_id, membership)
|
||||
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
||||
(room_id, user_id, event_id, membership, event_stream_ordering)
|
||||
VALUES (
|
||||
?, ?, ?,
|
||||
(SELECT membership FROM room_memberships WHERE event_id = ?),
|
||||
(SELECT stream_ordering FROM events WHERE event_id = ?)
|
||||
)
|
||||
""",
|
||||
[
|
||||
(room_id, key[1], ev_id, ev_id)
|
||||
(room_id, key[1], ev_id, ev_id, ev_id)
|
||||
for key, ev_id in to_insert.items()
|
||||
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
|
||||
],
|
||||
|
@ -1769,6 +1777,7 @@ class PersistEventsStore:
|
|||
table="room_memberships",
|
||||
keys=(
|
||||
"event_id",
|
||||
"event_stream_ordering",
|
||||
"user_id",
|
||||
"sender",
|
||||
"room_id",
|
||||
|
@ -1779,6 +1788,7 @@ class PersistEventsStore:
|
|||
values=[
|
||||
(
|
||||
event.event_id,
|
||||
event.internal_metadata.stream_ordering,
|
||||
event.state_key,
|
||||
event.user_id,
|
||||
event.room_id,
|
||||
|
@ -1811,6 +1821,7 @@ class PersistEventsStore:
|
|||
keyvalues={"room_id": event.room_id, "user_id": event.state_key},
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
"event_stream_ordering": event.internal_metadata.stream_ordering,
|
||||
"membership": event.membership,
|
||||
},
|
||||
)
|
||||
|
|
|
@ -428,14 +428,16 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||
"partial_state_events",
|
||||
"partial_state_rooms_servers",
|
||||
"partial_state_rooms",
|
||||
# Note: the _membership(s) tables have foreign keys to the `events` table
|
||||
# so must be deleted first.
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
"events",
|
||||
"federation_inbound_events_staging",
|
||||
"local_current_membership",
|
||||
"receipts_graph",
|
||||
"receipts_linearized",
|
||||
"room_aliases",
|
||||
"room_depth",
|
||||
"room_memberships",
|
||||
"room_stats_state",
|
||||
"room_stats_current",
|
||||
"room_stats_earliest_token",
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
SCHEMA_VERSION = 74 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 75 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
|
@ -91,13 +91,19 @@ Changes in SCHEMA_VERSION = 74:
|
|||
- A query on `event_stream_ordering` column has now been disambiguated (i.e. the
|
||||
codebase can handle the `current_state_events`, `local_current_memberships` and
|
||||
`room_memberships` tables having an `event_stream_ordering` column).
|
||||
|
||||
Changes in SCHEMA_VERSION = 75:
|
||||
- The `event_stream_ordering` column in membership tables (`current_state_events`,
|
||||
`local_current_membership` & `room_memberships`) is now being populated for new
|
||||
rows. When the background job to populate historical rows lands this will
|
||||
become the compat schema version.
|
||||
"""
|
||||
|
||||
|
||||
SCHEMA_COMPAT_VERSION = (
|
||||
# The threads_id column must exist for event_push_actions, event_push_summary,
|
||||
# receipts_linearized, and receipts_graph.
|
||||
73
|
||||
# Queries against `event_stream_ordering` columns in membership tables must
|
||||
# be disambiguated.
|
||||
74
|
||||
)
|
||||
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
/* Copyright 2022 Beeper
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
|
||||
-- we use to improve database performance by reduring JOINs.
|
||||
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
||||
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
||||
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
|
@ -0,0 +1,79 @@
|
|||
# Copyright 2022 Beeper
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
"""
|
||||
This migration adds triggers to the room membership tables to enforce consistency.
|
||||
Triggers cannot be expressed in .sql files, so we have to use a separate file.
|
||||
"""
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.types import Cursor
|
||||
|
||||
|
||||
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
|
||||
# Complain if the `event_stream_ordering` in membership tables doesn't match
|
||||
# the `stream_ordering` row with the same `event_id` in `events`.
|
||||
if isinstance(database_engine, Sqlite3Engine):
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
):
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
|
||||
BEFORE INSERT ON {table}
|
||||
FOR EACH ROW
|
||||
BEGIN
|
||||
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM events
|
||||
WHERE events.event_id = NEW.event_id
|
||||
AND events.stream_ordering != NEW.event_stream_ordering
|
||||
);
|
||||
END;
|
||||
"""
|
||||
)
|
||||
elif isinstance(database_engine, PostgresEngine):
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM events
|
||||
WHERE events.event_id = NEW.event_id
|
||||
AND events.stream_ordering != NEW.event_stream_ordering
|
||||
) THEN
|
||||
RAISE EXCEPTION 'Incorrect event_stream_ordering';
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
"""
|
||||
)
|
||||
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
):
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE check_event_stream_ordering()
|
||||
"""
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine")
|
Loading…
Reference in New Issue