Fix MultiWriteIdGenerator's handling of restarts. (#8374)
On startup `MultiWriteIdGenerator` fetches the maximum stream ID for each instance from the table and uses that as its initial "current position" for each writer. This is problematic as a) it involves either a scan of events table or an index (neither of which is ideal), and b) if rows are being persisted out of order elsewhere while the process restarts then using the maximum stream ID is not correct. This could theoretically lead to race conditions where e.g. events that are persisted out of order are not sent down sync streams. We fix this by creating a new table that tracks the current positions of each writer to the stream, and update it each time we finish persisting a new entry. This is a relatively small overhead when persisting events. However for the cache invalidation stream this is a much bigger relative overhead, so instead we note that for invalidation we don't actually care about reliability over restarts (as there's no caches to invalidate) and simply don't bother reading and writing to the new table in that particular case.pull/8401/head
parent
11c9e17738
commit
f112cfe5bb
|
@ -0,0 +1 @@
|
||||||
|
Fix theoretical race condition where events are not sent down `/sync` if the synchrotron worker is restarted without restarting other workers.
|
|
@ -31,11 +31,13 @@ class BaseSlavedStore(CacheInvalidationWorkerStore):
|
||||||
self._cache_id_gen = MultiWriterIdGenerator(
|
self._cache_id_gen = MultiWriterIdGenerator(
|
||||||
db_conn,
|
db_conn,
|
||||||
database,
|
database,
|
||||||
|
stream_name="caches",
|
||||||
instance_name=hs.get_instance_name(),
|
instance_name=hs.get_instance_name(),
|
||||||
table="cache_invalidation_stream_by_instance",
|
table="cache_invalidation_stream_by_instance",
|
||||||
instance_column="instance_name",
|
instance_column="instance_name",
|
||||||
id_column="stream_id",
|
id_column="stream_id",
|
||||||
sequence_name="cache_invalidation_stream_seq",
|
sequence_name="cache_invalidation_stream_seq",
|
||||||
|
writers=[],
|
||||||
) # type: Optional[MultiWriterIdGenerator]
|
) # type: Optional[MultiWriterIdGenerator]
|
||||||
else:
|
else:
|
||||||
self._cache_id_gen = None
|
self._cache_id_gen = None
|
||||||
|
|
|
@ -160,14 +160,20 @@ class DataStore(
|
||||||
)
|
)
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
# We set the `writers` to an empty list here as we don't care about
|
||||||
|
# missing updates over restarts, as we'll not have anything in our
|
||||||
|
# caches to invalidate. (This reduces the amount of writes to the DB
|
||||||
|
# that happen).
|
||||||
self._cache_id_gen = MultiWriterIdGenerator(
|
self._cache_id_gen = MultiWriterIdGenerator(
|
||||||
db_conn,
|
db_conn,
|
||||||
database,
|
database,
|
||||||
instance_name="master",
|
stream_name="caches",
|
||||||
|
instance_name=hs.get_instance_name(),
|
||||||
table="cache_invalidation_stream_by_instance",
|
table="cache_invalidation_stream_by_instance",
|
||||||
instance_column="instance_name",
|
instance_column="instance_name",
|
||||||
id_column="stream_id",
|
id_column="stream_id",
|
||||||
sequence_name="cache_invalidation_stream_seq",
|
sequence_name="cache_invalidation_stream_seq",
|
||||||
|
writers=[],
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._cache_id_gen = None
|
self._cache_id_gen = None
|
||||||
|
|
|
@ -83,21 +83,25 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
self._stream_id_gen = MultiWriterIdGenerator(
|
self._stream_id_gen = MultiWriterIdGenerator(
|
||||||
db_conn=db_conn,
|
db_conn=db_conn,
|
||||||
db=database,
|
db=database,
|
||||||
|
stream_name="events",
|
||||||
instance_name=hs.get_instance_name(),
|
instance_name=hs.get_instance_name(),
|
||||||
table="events",
|
table="events",
|
||||||
instance_column="instance_name",
|
instance_column="instance_name",
|
||||||
id_column="stream_ordering",
|
id_column="stream_ordering",
|
||||||
sequence_name="events_stream_seq",
|
sequence_name="events_stream_seq",
|
||||||
|
writers=hs.config.worker.writers.events,
|
||||||
)
|
)
|
||||||
self._backfill_id_gen = MultiWriterIdGenerator(
|
self._backfill_id_gen = MultiWriterIdGenerator(
|
||||||
db_conn=db_conn,
|
db_conn=db_conn,
|
||||||
db=database,
|
db=database,
|
||||||
|
stream_name="backfill",
|
||||||
instance_name=hs.get_instance_name(),
|
instance_name=hs.get_instance_name(),
|
||||||
table="events",
|
table="events",
|
||||||
instance_column="instance_name",
|
instance_column="instance_name",
|
||||||
id_column="stream_ordering",
|
id_column="stream_ordering",
|
||||||
sequence_name="events_backfill_stream_seq",
|
sequence_name="events_backfill_stream_seq",
|
||||||
positive=False,
|
positive=False,
|
||||||
|
writers=hs.config.worker.writers.events,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# We shouldn't be running in worker mode with SQLite, but its useful
|
# We shouldn't be running in worker mode with SQLite, but its useful
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
/* Copyright 2020 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE TABLE stream_positions (
|
||||||
|
stream_name TEXT NOT NULL,
|
||||||
|
instance_name TEXT NOT NULL,
|
||||||
|
stream_id BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX stream_positions_idx ON stream_positions(stream_name, instance_name);
|
|
@ -22,6 +22,7 @@ from typing import Dict, List, Optional, Set, Union
|
||||||
import attr
|
import attr
|
||||||
from typing_extensions import Deque
|
from typing_extensions import Deque
|
||||||
|
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||||
from synapse.storage.util.sequence import PostgresSequenceGenerator
|
from synapse.storage.util.sequence import PostgresSequenceGenerator
|
||||||
|
|
||||||
|
@ -184,12 +185,16 @@ class MultiWriterIdGenerator:
|
||||||
Args:
|
Args:
|
||||||
db_conn
|
db_conn
|
||||||
db
|
db
|
||||||
|
stream_name: A name for the stream.
|
||||||
instance_name: The name of this instance.
|
instance_name: The name of this instance.
|
||||||
table: Database table associated with stream.
|
table: Database table associated with stream.
|
||||||
instance_column: Column that stores the row's writer's instance name
|
instance_column: Column that stores the row's writer's instance name
|
||||||
id_column: Column that stores the stream ID.
|
id_column: Column that stores the stream ID.
|
||||||
sequence_name: The name of the postgres sequence used to generate new
|
sequence_name: The name of the postgres sequence used to generate new
|
||||||
IDs.
|
IDs.
|
||||||
|
writers: A list of known writers to use to populate current positions
|
||||||
|
on startup. Can be empty if nothing uses `get_current_token` or
|
||||||
|
`get_positions` (e.g. caches stream).
|
||||||
positive: Whether the IDs are positive (true) or negative (false).
|
positive: Whether the IDs are positive (true) or negative (false).
|
||||||
When using negative IDs we go backwards from -1 to -2, -3, etc.
|
When using negative IDs we go backwards from -1 to -2, -3, etc.
|
||||||
"""
|
"""
|
||||||
|
@ -198,16 +203,20 @@ class MultiWriterIdGenerator:
|
||||||
self,
|
self,
|
||||||
db_conn,
|
db_conn,
|
||||||
db: DatabasePool,
|
db: DatabasePool,
|
||||||
|
stream_name: str,
|
||||||
instance_name: str,
|
instance_name: str,
|
||||||
table: str,
|
table: str,
|
||||||
instance_column: str,
|
instance_column: str,
|
||||||
id_column: str,
|
id_column: str,
|
||||||
sequence_name: str,
|
sequence_name: str,
|
||||||
|
writers: List[str],
|
||||||
positive: bool = True,
|
positive: bool = True,
|
||||||
):
|
):
|
||||||
self._db = db
|
self._db = db
|
||||||
|
self._stream_name = stream_name
|
||||||
self._instance_name = instance_name
|
self._instance_name = instance_name
|
||||||
self._positive = positive
|
self._positive = positive
|
||||||
|
self._writers = writers
|
||||||
self._return_factor = 1 if positive else -1
|
self._return_factor = 1 if positive else -1
|
||||||
|
|
||||||
# We lock as some functions may be called from DB threads.
|
# We lock as some functions may be called from DB threads.
|
||||||
|
@ -216,9 +225,7 @@ class MultiWriterIdGenerator:
|
||||||
# Note: If we are a negative stream then we still store all the IDs as
|
# Note: If we are a negative stream then we still store all the IDs as
|
||||||
# positive to make life easier for us, and simply negate the IDs when we
|
# positive to make life easier for us, and simply negate the IDs when we
|
||||||
# return them.
|
# return them.
|
||||||
self._current_positions = self._load_current_ids(
|
self._current_positions = {} # type: Dict[str, int]
|
||||||
db_conn, table, instance_column, id_column
|
|
||||||
)
|
|
||||||
|
|
||||||
# Set of local IDs that we're still processing. The current position
|
# Set of local IDs that we're still processing. The current position
|
||||||
# should be less than the minimum of this set (if not empty).
|
# should be less than the minimum of this set (if not empty).
|
||||||
|
@ -251,31 +258,81 @@ class MultiWriterIdGenerator:
|
||||||
|
|
||||||
self._sequence_gen = PostgresSequenceGenerator(sequence_name)
|
self._sequence_gen = PostgresSequenceGenerator(sequence_name)
|
||||||
|
|
||||||
|
# This goes and fills out the above state from the database.
|
||||||
|
self._load_current_ids(db_conn, table, instance_column, id_column)
|
||||||
|
|
||||||
def _load_current_ids(
|
def _load_current_ids(
|
||||||
self, db_conn, table: str, instance_column: str, id_column: str
|
self, db_conn, table: str, instance_column: str, id_column: str
|
||||||
) -> Dict[str, int]:
|
):
|
||||||
# If positive stream aggregate via MAX. For negative stream use MIN
|
|
||||||
# *and* negate the result to get a positive number.
|
|
||||||
sql = """
|
|
||||||
SELECT %(instance)s, %(agg)s(%(id)s) FROM %(table)s
|
|
||||||
GROUP BY %(instance)s
|
|
||||||
""" % {
|
|
||||||
"instance": instance_column,
|
|
||||||
"id": id_column,
|
|
||||||
"table": table,
|
|
||||||
"agg": "MAX" if self._positive else "-MIN",
|
|
||||||
}
|
|
||||||
|
|
||||||
cur = db_conn.cursor()
|
cur = db_conn.cursor()
|
||||||
cur.execute(sql)
|
|
||||||
|
|
||||||
# `cur` is an iterable over returned rows, which are 2-tuples.
|
# Load the current positions of all writers for the stream.
|
||||||
current_positions = dict(cur)
|
if self._writers:
|
||||||
|
sql = """
|
||||||
|
SELECT instance_name, stream_id FROM stream_positions
|
||||||
|
WHERE stream_name = ?
|
||||||
|
"""
|
||||||
|
sql = self._db.engine.convert_param_style(sql)
|
||||||
|
|
||||||
|
cur.execute(sql, (self._stream_name,))
|
||||||
|
|
||||||
|
self._current_positions = {
|
||||||
|
instance: stream_id * self._return_factor
|
||||||
|
for instance, stream_id in cur
|
||||||
|
if instance in self._writers
|
||||||
|
}
|
||||||
|
|
||||||
|
# We set the `_persisted_upto_position` to be the minimum of all current
|
||||||
|
# positions. If empty we use the max stream ID from the DB table.
|
||||||
|
min_stream_id = min(self._current_positions.values(), default=None)
|
||||||
|
|
||||||
|
if min_stream_id is None:
|
||||||
|
sql = """
|
||||||
|
SELECT COALESCE(%(agg)s(%(id)s), 1) FROM %(table)s
|
||||||
|
""" % {
|
||||||
|
"id": id_column,
|
||||||
|
"table": table,
|
||||||
|
"agg": "MAX" if self._positive else "-MIN",
|
||||||
|
}
|
||||||
|
cur.execute(sql)
|
||||||
|
(stream_id,) = cur.fetchone()
|
||||||
|
self._persisted_upto_position = stream_id
|
||||||
|
else:
|
||||||
|
# If we have a min_stream_id then we pull out everything greater
|
||||||
|
# than it from the DB so that we can prefill
|
||||||
|
# `_known_persisted_positions` and get a more accurate
|
||||||
|
# `_persisted_upto_position`.
|
||||||
|
#
|
||||||
|
# We also check if any of the later rows are from this instance, in
|
||||||
|
# which case we use that for this instance's current position. This
|
||||||
|
# is to handle the case where we didn't finish persisting to the
|
||||||
|
# stream positions table before restart (or the stream position
|
||||||
|
# table otherwise got out of date).
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT %(instance)s, %(id)s FROM %(table)s
|
||||||
|
WHERE ? %(cmp)s %(id)s
|
||||||
|
""" % {
|
||||||
|
"id": id_column,
|
||||||
|
"table": table,
|
||||||
|
"instance": instance_column,
|
||||||
|
"cmp": "<=" if self._positive else ">=",
|
||||||
|
}
|
||||||
|
sql = self._db.engine.convert_param_style(sql)
|
||||||
|
cur.execute(sql, (min_stream_id,))
|
||||||
|
|
||||||
|
self._persisted_upto_position = min_stream_id
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
for (instance, stream_id,) in cur:
|
||||||
|
stream_id = self._return_factor * stream_id
|
||||||
|
self._add_persisted_position(stream_id)
|
||||||
|
|
||||||
|
if instance == self._instance_name:
|
||||||
|
self._current_positions[instance] = stream_id
|
||||||
|
|
||||||
cur.close()
|
cur.close()
|
||||||
|
|
||||||
return current_positions
|
|
||||||
|
|
||||||
def _load_next_id_txn(self, txn) -> int:
|
def _load_next_id_txn(self, txn) -> int:
|
||||||
return self._sequence_gen.get_next_id_txn(txn)
|
return self._sequence_gen.get_next_id_txn(txn)
|
||||||
|
|
||||||
|
@ -316,6 +373,21 @@ class MultiWriterIdGenerator:
|
||||||
txn.call_after(self._mark_id_as_finished, next_id)
|
txn.call_after(self._mark_id_as_finished, next_id)
|
||||||
txn.call_on_exception(self._mark_id_as_finished, next_id)
|
txn.call_on_exception(self._mark_id_as_finished, next_id)
|
||||||
|
|
||||||
|
# Update the `stream_positions` table with newly updated stream
|
||||||
|
# ID (unless self._writers is not set in which case we don't
|
||||||
|
# bother, as nothing will read it).
|
||||||
|
#
|
||||||
|
# We only do this on the success path so that the persisted current
|
||||||
|
# position points to a persited row with the correct instance name.
|
||||||
|
if self._writers:
|
||||||
|
txn.call_after(
|
||||||
|
run_as_background_process,
|
||||||
|
"MultiWriterIdGenerator._update_table",
|
||||||
|
self._db.runInteraction,
|
||||||
|
"MultiWriterIdGenerator._update_table",
|
||||||
|
self._update_stream_positions_table_txn,
|
||||||
|
)
|
||||||
|
|
||||||
return self._return_factor * next_id
|
return self._return_factor * next_id
|
||||||
|
|
||||||
def _mark_id_as_finished(self, next_id: int):
|
def _mark_id_as_finished(self, next_id: int):
|
||||||
|
@ -447,6 +519,28 @@ class MultiWriterIdGenerator:
|
||||||
# do.
|
# do.
|
||||||
break
|
break
|
||||||
|
|
||||||
|
def _update_stream_positions_table_txn(self, txn):
|
||||||
|
"""Update the `stream_positions` table with newly persisted position.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not self._writers:
|
||||||
|
return
|
||||||
|
|
||||||
|
# We upsert the value, ensuring on conflict that we always increase the
|
||||||
|
# value (or decrease if stream goes backwards).
|
||||||
|
sql = """
|
||||||
|
INSERT INTO stream_positions (stream_name, instance_name, stream_id)
|
||||||
|
VALUES (?, ?, ?)
|
||||||
|
ON CONFLICT (stream_name, instance_name)
|
||||||
|
DO UPDATE SET
|
||||||
|
stream_id = %(agg)s(stream_positions.stream_id, EXCLUDED.stream_id)
|
||||||
|
""" % {
|
||||||
|
"agg": "GREATEST" if self._positive else "LEAST",
|
||||||
|
}
|
||||||
|
|
||||||
|
pos = (self.get_current_token_for_writer(self._instance_name),)
|
||||||
|
txn.execute(sql, (self._stream_name, self._instance_name, pos))
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True)
|
@attr.s(slots=True)
|
||||||
class _AsyncCtxManagerWrapper:
|
class _AsyncCtxManagerWrapper:
|
||||||
|
@ -503,4 +597,16 @@ class _MultiWriterCtxManager:
|
||||||
if exc_type is not None:
|
if exc_type is not None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# Update the `stream_positions` table with newly updated stream
|
||||||
|
# ID (unless self._writers is not set in which case we don't
|
||||||
|
# bother, as nothing will read it).
|
||||||
|
#
|
||||||
|
# We only do this on the success path so that the persisted current
|
||||||
|
# position points to a persisted row with the correct instance name.
|
||||||
|
if self.id_gen._writers:
|
||||||
|
await self.id_gen._db.runInteraction(
|
||||||
|
"MultiWriterIdGenerator._update_table",
|
||||||
|
self.id_gen._update_stream_positions_table_txn,
|
||||||
|
)
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -43,16 +43,20 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
def _create_id_generator(self, instance_name="master") -> MultiWriterIdGenerator:
|
def _create_id_generator(
|
||||||
|
self, instance_name="master", writers=["master"]
|
||||||
|
) -> MultiWriterIdGenerator:
|
||||||
def _create(conn):
|
def _create(conn):
|
||||||
return MultiWriterIdGenerator(
|
return MultiWriterIdGenerator(
|
||||||
conn,
|
conn,
|
||||||
self.db_pool,
|
self.db_pool,
|
||||||
|
stream_name="test_stream",
|
||||||
instance_name=instance_name,
|
instance_name=instance_name,
|
||||||
table="foobar",
|
table="foobar",
|
||||||
instance_column="instance_name",
|
instance_column="instance_name",
|
||||||
id_column="stream_id",
|
id_column="stream_id",
|
||||||
sequence_name="foobar_seq",
|
sequence_name="foobar_seq",
|
||||||
|
writers=writers,
|
||||||
)
|
)
|
||||||
|
|
||||||
return self.get_success(self.db_pool.runWithConnection(_create))
|
return self.get_success(self.db_pool.runWithConnection(_create))
|
||||||
|
@ -68,6 +72,13 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
"INSERT INTO foobar VALUES (nextval('foobar_seq'), ?)",
|
"INSERT INTO foobar VALUES (nextval('foobar_seq'), ?)",
|
||||||
(instance_name,),
|
(instance_name,),
|
||||||
)
|
)
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO stream_positions VALUES ('test_stream', ?, lastval())
|
||||||
|
ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = lastval()
|
||||||
|
""",
|
||||||
|
(instance_name,),
|
||||||
|
)
|
||||||
|
|
||||||
self.get_success(self.db_pool.runInteraction("_insert_rows", _insert))
|
self.get_success(self.db_pool.runInteraction("_insert_rows", _insert))
|
||||||
|
|
||||||
|
@ -81,6 +92,13 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
"INSERT INTO foobar VALUES (?, ?)", (stream_id, instance_name,),
|
"INSERT INTO foobar VALUES (?, ?)", (stream_id, instance_name,),
|
||||||
)
|
)
|
||||||
txn.execute("SELECT setval('foobar_seq', ?)", (stream_id,))
|
txn.execute("SELECT setval('foobar_seq', ?)", (stream_id,))
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO stream_positions VALUES ('test_stream', ?, ?)
|
||||||
|
ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = ?
|
||||||
|
""",
|
||||||
|
(instance_name, stream_id, stream_id),
|
||||||
|
)
|
||||||
|
|
||||||
self.get_success(self.db_pool.runInteraction("_insert_row_with_id", _insert))
|
self.get_success(self.db_pool.runInteraction("_insert_row_with_id", _insert))
|
||||||
|
|
||||||
|
@ -179,8 +197,8 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
self._insert_rows("first", 3)
|
self._insert_rows("first", 3)
|
||||||
self._insert_rows("second", 4)
|
self._insert_rows("second", 4)
|
||||||
|
|
||||||
first_id_gen = self._create_id_generator("first")
|
first_id_gen = self._create_id_generator("first", writers=["first", "second"])
|
||||||
second_id_gen = self._create_id_generator("second")
|
second_id_gen = self._create_id_generator("second", writers=["first", "second"])
|
||||||
|
|
||||||
self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7})
|
self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7})
|
||||||
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3)
|
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3)
|
||||||
|
@ -262,7 +280,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
self._insert_row_with_id("first", 3)
|
self._insert_row_with_id("first", 3)
|
||||||
self._insert_row_with_id("second", 5)
|
self._insert_row_with_id("second", 5)
|
||||||
|
|
||||||
id_gen = self._create_id_generator("first")
|
id_gen = self._create_id_generator("first", writers=["first", "second"])
|
||||||
|
|
||||||
self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})
|
self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})
|
||||||
|
|
||||||
|
@ -300,7 +318,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
self._insert_row_with_id("first", 3)
|
self._insert_row_with_id("first", 3)
|
||||||
self._insert_row_with_id("second", 5)
|
self._insert_row_with_id("second", 5)
|
||||||
|
|
||||||
id_gen = self._create_id_generator("first")
|
id_gen = self._create_id_generator("first", writers=["first", "second"])
|
||||||
|
|
||||||
self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})
|
self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})
|
||||||
|
|
||||||
|
@ -319,6 +337,80 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
# `persisted_upto_position` in this case, then it will be correct in the
|
# `persisted_upto_position` in this case, then it will be correct in the
|
||||||
# other cases that are tested above (since they'll hit the same code).
|
# other cases that are tested above (since they'll hit the same code).
|
||||||
|
|
||||||
|
def test_restart_during_out_of_order_persistence(self):
|
||||||
|
"""Test that restarting a process while another process is writing out
|
||||||
|
of order updates are handled correctly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Prefill table with 7 rows written by 'master'
|
||||||
|
self._insert_rows("master", 7)
|
||||||
|
|
||||||
|
id_gen = self._create_id_generator()
|
||||||
|
|
||||||
|
self.assertEqual(id_gen.get_positions(), {"master": 7})
|
||||||
|
self.assertEqual(id_gen.get_current_token_for_writer("master"), 7)
|
||||||
|
|
||||||
|
# Persist two rows at once
|
||||||
|
ctx1 = self.get_success(id_gen.get_next())
|
||||||
|
ctx2 = self.get_success(id_gen.get_next())
|
||||||
|
|
||||||
|
s1 = self.get_success(ctx1.__aenter__())
|
||||||
|
s2 = self.get_success(ctx2.__aenter__())
|
||||||
|
|
||||||
|
self.assertEqual(s1, 8)
|
||||||
|
self.assertEqual(s2, 9)
|
||||||
|
|
||||||
|
self.assertEqual(id_gen.get_positions(), {"master": 7})
|
||||||
|
self.assertEqual(id_gen.get_current_token_for_writer("master"), 7)
|
||||||
|
|
||||||
|
# We finish persisting the second row before restart
|
||||||
|
self.get_success(ctx2.__aexit__(None, None, None))
|
||||||
|
|
||||||
|
# We simulate a restart of another worker by just creating a new ID gen.
|
||||||
|
id_gen_worker = self._create_id_generator("worker")
|
||||||
|
|
||||||
|
# Restarted worker should not see the second persisted row
|
||||||
|
self.assertEqual(id_gen_worker.get_positions(), {"master": 7})
|
||||||
|
self.assertEqual(id_gen_worker.get_current_token_for_writer("master"), 7)
|
||||||
|
|
||||||
|
# Now if we persist the first row then both instances should jump ahead
|
||||||
|
# correctly.
|
||||||
|
self.get_success(ctx1.__aexit__(None, None, None))
|
||||||
|
|
||||||
|
self.assertEqual(id_gen.get_positions(), {"master": 9})
|
||||||
|
id_gen_worker.advance("master", 9)
|
||||||
|
self.assertEqual(id_gen_worker.get_positions(), {"master": 9})
|
||||||
|
|
||||||
|
def test_writer_config_change(self):
|
||||||
|
"""Test that changing the writer config correctly works.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._insert_row_with_id("first", 3)
|
||||||
|
self._insert_row_with_id("second", 5)
|
||||||
|
|
||||||
|
# Initial config has two writers
|
||||||
|
id_gen = self._create_id_generator("first", writers=["first", "second"])
|
||||||
|
self.assertEqual(id_gen.get_persisted_upto_position(), 3)
|
||||||
|
|
||||||
|
# New config removes one of the configs. Note that if the writer is
|
||||||
|
# removed from config we assume that it has been shut down and has
|
||||||
|
# finished persisting, hence why the persisted upto position is 5.
|
||||||
|
id_gen_2 = self._create_id_generator("second", writers=["second"])
|
||||||
|
self.assertEqual(id_gen_2.get_persisted_upto_position(), 5)
|
||||||
|
|
||||||
|
# This config points to a single, previously unused writer.
|
||||||
|
id_gen_3 = self._create_id_generator("third", writers=["third"])
|
||||||
|
self.assertEqual(id_gen_3.get_persisted_upto_position(), 5)
|
||||||
|
|
||||||
|
# Check that we get a sane next stream ID with this new config.
|
||||||
|
|
||||||
|
async def _get_next_async():
|
||||||
|
async with id_gen_3.get_next() as stream_id:
|
||||||
|
self.assertEqual(stream_id, 6)
|
||||||
|
|
||||||
|
self.get_success(_get_next_async())
|
||||||
|
self.assertEqual(id_gen_3.get_persisted_upto_position(), 6)
|
||||||
|
|
||||||
|
|
||||||
class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
"""Tests MultiWriterIdGenerator that produce *negative* stream IDs.
|
"""Tests MultiWriterIdGenerator that produce *negative* stream IDs.
|
||||||
|
@ -345,16 +437,20 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
def _create_id_generator(self, instance_name="master") -> MultiWriterIdGenerator:
|
def _create_id_generator(
|
||||||
|
self, instance_name="master", writers=["master"]
|
||||||
|
) -> MultiWriterIdGenerator:
|
||||||
def _create(conn):
|
def _create(conn):
|
||||||
return MultiWriterIdGenerator(
|
return MultiWriterIdGenerator(
|
||||||
conn,
|
conn,
|
||||||
self.db_pool,
|
self.db_pool,
|
||||||
|
stream_name="test_stream",
|
||||||
instance_name=instance_name,
|
instance_name=instance_name,
|
||||||
table="foobar",
|
table="foobar",
|
||||||
instance_column="instance_name",
|
instance_column="instance_name",
|
||||||
id_column="stream_id",
|
id_column="stream_id",
|
||||||
sequence_name="foobar_seq",
|
sequence_name="foobar_seq",
|
||||||
|
writers=writers,
|
||||||
positive=False,
|
positive=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -368,6 +464,13 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"INSERT INTO foobar VALUES (?, ?)", (stream_id, instance_name,),
|
"INSERT INTO foobar VALUES (?, ?)", (stream_id, instance_name,),
|
||||||
)
|
)
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO stream_positions VALUES ('test_stream', ?, ?)
|
||||||
|
ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = ?
|
||||||
|
""",
|
||||||
|
(instance_name, -stream_id, -stream_id),
|
||||||
|
)
|
||||||
|
|
||||||
self.get_success(self.db_pool.runInteraction("_insert_row", _insert))
|
self.get_success(self.db_pool.runInteraction("_insert_row", _insert))
|
||||||
|
|
||||||
|
@ -409,8 +512,8 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||||
"""Tests that having multiple instances that get advanced over
|
"""Tests that having multiple instances that get advanced over
|
||||||
federation works corretly.
|
federation works corretly.
|
||||||
"""
|
"""
|
||||||
id_gen_1 = self._create_id_generator("first")
|
id_gen_1 = self._create_id_generator("first", writers=["first", "second"])
|
||||||
id_gen_2 = self._create_id_generator("second")
|
id_gen_2 = self._create_id_generator("second", writers=["first", "second"])
|
||||||
|
|
||||||
async def _get_next_async():
|
async def _get_next_async():
|
||||||
async with id_gen_1.get_next() as stream_id:
|
async with id_gen_1.get_next() as stream_id:
|
||||||
|
|
Loading…
Reference in New Issue