This reverts commit e6af49fbea
.
pull/15348/head
parent
78cdb72cd6
commit
5350b5d04d
|
@ -1 +0,0 @@
|
||||||
Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).
|
|
|
@ -1127,15 +1127,11 @@ class PersistEventsStore:
|
||||||
# been inserted into room_memberships.
|
# been inserted into room_memberships.
|
||||||
txn.execute_batch(
|
txn.execute_batch(
|
||||||
"""INSERT INTO current_state_events
|
"""INSERT INTO current_state_events
|
||||||
(room_id, type, state_key, event_id, membership, event_stream_ordering)
|
(room_id, type, state_key, event_id, membership)
|
||||||
VALUES (
|
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
||||||
?, ?, ?, ?,
|
|
||||||
(SELECT membership FROM room_memberships WHERE event_id = ?),
|
|
||||||
(SELECT stream_ordering FROM events WHERE event_id = ?)
|
|
||||||
)
|
|
||||||
""",
|
""",
|
||||||
[
|
[
|
||||||
(room_id, key[0], key[1], ev_id, ev_id, ev_id)
|
(room_id, key[0], key[1], ev_id, ev_id)
|
||||||
for key, ev_id in to_insert.items()
|
for key, ev_id in to_insert.items()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -1162,15 +1158,11 @@ class PersistEventsStore:
|
||||||
if to_insert:
|
if to_insert:
|
||||||
txn.execute_batch(
|
txn.execute_batch(
|
||||||
"""INSERT INTO local_current_membership
|
"""INSERT INTO local_current_membership
|
||||||
(room_id, user_id, event_id, membership, event_stream_ordering)
|
(room_id, user_id, event_id, membership)
|
||||||
VALUES (
|
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
||||||
?, ?, ?,
|
|
||||||
(SELECT membership FROM room_memberships WHERE event_id = ?),
|
|
||||||
(SELECT stream_ordering FROM events WHERE event_id = ?)
|
|
||||||
)
|
|
||||||
""",
|
""",
|
||||||
[
|
[
|
||||||
(room_id, key[1], ev_id, ev_id, ev_id)
|
(room_id, key[1], ev_id, ev_id)
|
||||||
for key, ev_id in to_insert.items()
|
for key, ev_id in to_insert.items()
|
||||||
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
|
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
|
||||||
],
|
],
|
||||||
|
@ -1776,7 +1768,6 @@ class PersistEventsStore:
|
||||||
table="room_memberships",
|
table="room_memberships",
|
||||||
keys=(
|
keys=(
|
||||||
"event_id",
|
"event_id",
|
||||||
"event_stream_ordering",
|
|
||||||
"user_id",
|
"user_id",
|
||||||
"sender",
|
"sender",
|
||||||
"room_id",
|
"room_id",
|
||||||
|
@ -1787,7 +1778,6 @@ class PersistEventsStore:
|
||||||
values=[
|
values=[
|
||||||
(
|
(
|
||||||
event.event_id,
|
event.event_id,
|
||||||
event.internal_metadata.stream_ordering,
|
|
||||||
event.state_key,
|
event.state_key,
|
||||||
event.user_id,
|
event.user_id,
|
||||||
event.room_id,
|
event.room_id,
|
||||||
|
@ -1820,7 +1810,6 @@ class PersistEventsStore:
|
||||||
keyvalues={"room_id": event.room_id, "user_id": event.state_key},
|
keyvalues={"room_id": event.room_id, "user_id": event.state_key},
|
||||||
values={
|
values={
|
||||||
"event_id": event.event_id,
|
"event_id": event.event_id,
|
||||||
"event_stream_ordering": event.internal_metadata.stream_ordering,
|
|
||||||
"membership": event.membership,
|
"membership": event.membership,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
|
@ -428,16 +428,14 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
"partial_state_events",
|
"partial_state_events",
|
||||||
"partial_state_rooms_servers",
|
"partial_state_rooms_servers",
|
||||||
"partial_state_rooms",
|
"partial_state_rooms",
|
||||||
# Note: the _membership(s) tables have foreign keys to the `events` table
|
|
||||||
# so must be deleted first.
|
|
||||||
"local_current_membership",
|
|
||||||
"room_memberships",
|
|
||||||
"events",
|
"events",
|
||||||
"federation_inbound_events_staging",
|
"federation_inbound_events_staging",
|
||||||
|
"local_current_membership",
|
||||||
"receipts_graph",
|
"receipts_graph",
|
||||||
"receipts_linearized",
|
"receipts_linearized",
|
||||||
"room_aliases",
|
"room_aliases",
|
||||||
"room_depth",
|
"room_depth",
|
||||||
|
"room_memberships",
|
||||||
"room_stats_state",
|
"room_stats_state",
|
||||||
"room_stats_current",
|
"room_stats_current",
|
||||||
"room_stats_earliest_token",
|
"room_stats_earliest_token",
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
SCHEMA_VERSION = 75 # remember to update the list below when updating
|
SCHEMA_VERSION = 74 # remember to update the list below when updating
|
||||||
"""Represents the expectations made by the codebase about the database schema
|
"""Represents the expectations made by the codebase about the database schema
|
||||||
|
|
||||||
This should be incremented whenever the codebase changes its requirements on the
|
This should be incremented whenever the codebase changes its requirements on the
|
||||||
|
@ -91,19 +91,13 @@ Changes in SCHEMA_VERSION = 74:
|
||||||
- A query on `event_stream_ordering` column has now been disambiguated (i.e. the
|
- A query on `event_stream_ordering` column has now been disambiguated (i.e. the
|
||||||
codebase can handle the `current_state_events`, `local_current_memberships` and
|
codebase can handle the `current_state_events`, `local_current_memberships` and
|
||||||
`room_memberships` tables having an `event_stream_ordering` column).
|
`room_memberships` tables having an `event_stream_ordering` column).
|
||||||
|
|
||||||
Changes in SCHEMA_VERSION = 75:
|
|
||||||
- The `event_stream_ordering` column in membership tables (`current_state_events`,
|
|
||||||
`local_current_membership` & `room_memberships`) is now being populated for new
|
|
||||||
rows. When the background job to populate historical rows lands this will
|
|
||||||
become the compat schema version.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
SCHEMA_COMPAT_VERSION = (
|
SCHEMA_COMPAT_VERSION = (
|
||||||
# Queries against `event_stream_ordering` columns in membership tables must
|
# The threads_id column must exist for event_push_actions, event_push_summary,
|
||||||
# be disambiguated.
|
# receipts_linearized, and receipts_graph.
|
||||||
74
|
73
|
||||||
)
|
)
|
||||||
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
|
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
|
||||||
|
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
/* Copyright 2022 Beeper
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
|
|
||||||
-- we use to improve database performance by reduring JOINs.
|
|
||||||
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
|
||||||
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
|
||||||
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
|
|
@ -1,79 +0,0 @@
|
||||||
# Copyright 2022 Beeper
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
|
||||||
This migration adds triggers to the room membership tables to enforce consistency.
|
|
||||||
Triggers cannot be expressed in .sql files, so we have to use a separate file.
|
|
||||||
"""
|
|
||||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
|
||||||
from synapse.storage.types import Cursor
|
|
||||||
|
|
||||||
|
|
||||||
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
|
|
||||||
# Complain if the `event_stream_ordering` in membership tables doesn't match
|
|
||||||
# the `stream_ordering` row with the same `event_id` in `events`.
|
|
||||||
if isinstance(database_engine, Sqlite3Engine):
|
|
||||||
for table in (
|
|
||||||
"current_state_events",
|
|
||||||
"local_current_membership",
|
|
||||||
"room_memberships",
|
|
||||||
):
|
|
||||||
cur.execute(
|
|
||||||
f"""
|
|
||||||
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
|
|
||||||
BEFORE INSERT ON {table}
|
|
||||||
FOR EACH ROW
|
|
||||||
BEGIN
|
|
||||||
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
|
|
||||||
WHERE EXISTS (
|
|
||||||
SELECT 1 FROM events
|
|
||||||
WHERE events.event_id = NEW.event_id
|
|
||||||
AND events.stream_ordering != NEW.event_stream_ordering
|
|
||||||
);
|
|
||||||
END;
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
elif isinstance(database_engine, PostgresEngine):
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
|
|
||||||
BEGIN
|
|
||||||
IF EXISTS (
|
|
||||||
SELECT 1 FROM events
|
|
||||||
WHERE events.event_id = NEW.event_id
|
|
||||||
AND events.stream_ordering != NEW.event_stream_ordering
|
|
||||||
) THEN
|
|
||||||
RAISE EXCEPTION 'Incorrect event_stream_ordering';
|
|
||||||
END IF;
|
|
||||||
RETURN NEW;
|
|
||||||
END;
|
|
||||||
$BODY$ LANGUAGE plpgsql;
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
for table in (
|
|
||||||
"current_state_events",
|
|
||||||
"local_current_membership",
|
|
||||||
"room_memberships",
|
|
||||||
):
|
|
||||||
cur.execute(
|
|
||||||
f"""
|
|
||||||
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
|
|
||||||
FOR EACH ROW
|
|
||||||
EXECUTE PROCEDURE check_event_stream_ordering()
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise NotImplementedError("Unknown database engine")
|
|
Loading…
Reference in New Issue