Migrate stream_ordering to a bigint (#10264)

* Move background update names out to a separate class

`EventsBackgroundUpdatesStore` gets inherited and we don't really want to
further pollute the namespace.

* Migrate stream_ordering to a bigint

* changelog
pull/10392/head
Richard van der Hoff 2021-06-29 11:25:34 +01:00 committed by GitHub
parent a0ed0f363e
commit 60efc51a2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 163 additions and 16 deletions

1
changelog.d/10264.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug where Synapse would return errors after 2<sup>31</sup> events were handled by the server.

View File

@ -29,6 +29,25 @@ from synapse.types import JsonDict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
# there should be no leftover rows without a stream_ordering2, but just in case...
"UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
# finally, we can drop the rule and switch the columns
"DROP RULE populate_stream_ordering2 ON events",
"ALTER TABLE events DROP COLUMN stream_ordering",
"ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
)
class _BackgroundUpdates:
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
@attr.s(slots=True, frozen=True) @attr.s(slots=True, frozen=True)
class _CalculateChainCover: class _CalculateChainCover:
"""Return value for _calculate_chain_cover_txn.""" """Return value for _calculate_chain_cover_txn."""
@ -48,19 +67,15 @@ class _CalculateChainCover:
class EventsBackgroundUpdatesStore(SQLBaseStore): class EventsBackgroundUpdatesStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
def __init__(self, database: DatabasePool, db_conn, hs): def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_update_handler( self.db_pool.updates.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
self._background_reindex_origin_server_ts,
) )
self.db_pool.updates.register_background_update_handler( self.db_pool.updates.register_background_update_handler(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
self._background_reindex_fields_sender, self._background_reindex_fields_sender,
) )
@ -85,7 +100,8 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
) )
self.db_pool.updates.register_background_update_handler( self.db_pool.updates.register_background_update_handler(
self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES,
self._cleanup_extremities_bg_update,
) )
self.db_pool.updates.register_background_update_handler( self.db_pool.updates.register_background_update_handler(
@ -139,6 +155,24 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
self._purged_chain_cover_index, self._purged_chain_cover_index,
) )
# bg updates for replacing stream_ordering with a BIGINT
# (these only run on postgres.)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
self._background_populate_stream_ordering2,
)
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.INDEX_STREAM_ORDERING2,
index_name="events_stream_ordering",
table="events",
columns=["stream_ordering2"],
unique=True,
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
self._background_replace_stream_ordering_column,
)
async def _background_reindex_fields_sender(self, progress, batch_size): async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"] target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"] max_stream_id = progress["max_stream_id_exclusive"]
@ -190,18 +224,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
} }
self.db_pool.updates._background_update_progress_txn( self.db_pool.updates._background_update_progress_txn(
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress txn, _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
) )
return len(rows) return len(rows)
result = await self.db_pool.runInteraction( result = await self.db_pool.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
) )
if not result: if not result:
await self.db_pool.updates._end_background_update( await self.db_pool.updates._end_background_update(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
) )
return result return result
@ -264,18 +298,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
} }
self.db_pool.updates._background_update_progress_txn( self.db_pool.updates._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress txn, _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, progress
) )
return len(rows_to_update) return len(rows_to_update)
result = await self.db_pool.runInteraction( result = await self.db_pool.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
) )
if not result: if not result:
await self.db_pool.updates._end_background_update( await self.db_pool.updates._end_background_update(
self.EVENT_ORIGIN_SERVER_TS_NAME _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME
) )
return result return result
@ -454,7 +488,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
if not num_handled: if not num_handled:
await self.db_pool.updates._end_background_update( await self.db_pool.updates._end_background_update(
self.DELETE_SOFT_FAILED_EXTREMITIES _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES
) )
def _drop_table_txn(txn): def _drop_table_txn(txn):
@ -1009,3 +1043,75 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
await self.db_pool.updates._end_background_update("purged_chain_cover") await self.db_pool.updates._end_background_update("purged_chain_cover")
return result return result
async def _background_populate_stream_ordering2(
self, progress: JsonDict, batch_size: int
) -> int:
"""Populate events.stream_ordering2, then replace stream_ordering
This is to deal with the fact that stream_ordering was initially created as a
32-bit integer field.
"""
batch_size = max(batch_size, 1)
def process(txn: Cursor) -> int:
# if this is the first pass, find the minimum stream ordering
last_stream = progress.get("last_stream")
if last_stream is None:
txn.execute(
"""
SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
"""
)
rows = txn.fetchall()
if not rows:
return 0
last_stream = rows[0][0] - 1
txn.execute(
"""
UPDATE events SET stream_ordering2=stream_ordering
WHERE stream_ordering > ? AND stream_ordering <= ?
""",
(last_stream, last_stream + batch_size),
)
row_count = txn.rowcount
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
{"last_stream": last_stream + batch_size},
)
return row_count
result = await self.db_pool.runInteraction(
"_background_populate_stream_ordering2", process
)
if result != 0:
return result
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.POPULATE_STREAM_ORDERING2
)
return 0
async def _background_replace_stream_ordering_column(
self, progress: JsonDict, batch_size: int
) -> int:
"""Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
def process(txn: Cursor) -> None:
for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
logger.info("completing stream_ordering migration: %s", sql)
txn.execute(sql)
await self.db_pool.runInteraction(
"_background_replace_stream_ordering_column", process
)
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN
)
return 0

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
SCHEMA_VERSION = 59 SCHEMA_VERSION = 60
"""Represents the expectations made by the codebase about the database schema """Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the This should be incremented whenever the codebase changes its requirements on the

View File

@ -0,0 +1,40 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This migration handles the process of changing the type of `stream_ordering` to
-- a BIGINT.
--
-- Note that this is only a problem on postgres as sqlite only has one "integer" type
-- which can cope with values up to 2^63.
-- First add a new column to contain the bigger stream_ordering
ALTER TABLE events ADD COLUMN stream_ordering2 BIGINT;
-- Create a rule which will populate it for new rows.
CREATE OR REPLACE RULE "populate_stream_ordering2" AS
ON INSERT TO events
DO UPDATE events SET stream_ordering2=NEW.stream_ordering WHERE stream_ordering=NEW.stream_ordering;
-- Start a bg process to populate it for old events
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6001, 'populate_stream_ordering2', '{}');
-- ... and another to build an index on it
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');
-- ... and another to do the switcheroo
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');