Add receipts event stream ordering (#13703)
parent
fa2f3d8d0c
commit
cdbb641232
|
@ -0,0 +1 @@
|
||||||
|
Add & populate `event_stream_ordering` column on receipts table for future optimisation of push action processing. Contributed by Nick @ Beeper (@fizzadar).
|
|
@ -67,6 +67,7 @@ from synapse.storage.databases.main.media_repository import (
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
|
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
|
||||||
from synapse.storage.databases.main.pusher import PusherWorkerStore
|
from synapse.storage.databases.main.pusher import PusherWorkerStore
|
||||||
|
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
|
||||||
from synapse.storage.databases.main.registration import (
|
from synapse.storage.databases.main.registration import (
|
||||||
RegistrationBackgroundUpdateStore,
|
RegistrationBackgroundUpdateStore,
|
||||||
find_max_generated_user_id_localpart,
|
find_max_generated_user_id_localpart,
|
||||||
|
@ -203,6 +204,7 @@ class Store(
|
||||||
PushRuleStore,
|
PushRuleStore,
|
||||||
PusherWorkerStore,
|
PusherWorkerStore,
|
||||||
PresenceBackgroundUpdateStore,
|
PresenceBackgroundUpdateStore,
|
||||||
|
ReceiptsBackgroundUpdateStore,
|
||||||
):
|
):
|
||||||
def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
|
def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
|
||||||
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
|
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
|
||||||
|
|
|
@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||||
values={
|
values={
|
||||||
"stream_id": stream_id,
|
"stream_id": stream_id,
|
||||||
"event_id": event_id,
|
"event_id": event_id,
|
||||||
|
"event_stream_ordering": stream_ordering,
|
||||||
"data": json_encoder.encode(data),
|
"data": json_encoder.encode(data),
|
||||||
},
|
},
|
||||||
# receipts_linearized has a unique constraint on
|
# receipts_linearized has a unique constraint on
|
||||||
|
@ -830,5 +831,76 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class ReceiptsStore(ReceiptsWorkerStore):
|
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
||||||
|
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
database: DatabasePool,
|
||||||
|
db_conn: LoggingDatabaseConnection,
|
||||||
|
hs: "HomeServer",
|
||||||
|
):
|
||||||
|
super().__init__(database, db_conn, hs)
|
||||||
|
|
||||||
|
self.db_pool.updates.register_background_update_handler(
|
||||||
|
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
|
||||||
|
self._populate_receipt_event_stream_ordering,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _populate_receipt_event_stream_ordering(
|
||||||
|
self, progress: JsonDict, batch_size: int
|
||||||
|
) -> int:
|
||||||
|
def _populate_receipt_event_stream_ordering_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
if "max_stream_id" in progress:
|
||||||
|
max_stream_id = progress["max_stream_id"]
|
||||||
|
else:
|
||||||
|
txn.execute("SELECT max(stream_id) FROM receipts_linearized")
|
||||||
|
res = txn.fetchone()
|
||||||
|
if res is None or res[0] is None:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
max_stream_id = res[0]
|
||||||
|
|
||||||
|
start = progress.get("stream_id", 0)
|
||||||
|
stop = start + batch_size
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
UPDATE receipts_linearized
|
||||||
|
SET event_stream_ordering = (
|
||||||
|
SELECT stream_ordering
|
||||||
|
FROM events
|
||||||
|
WHERE event_id = receipts_linearized.event_id
|
||||||
|
)
|
||||||
|
WHERE stream_id >= ? AND stream_id < ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (start, stop))
|
||||||
|
|
||||||
|
self.db_pool.updates._background_update_progress_txn(
|
||||||
|
txn,
|
||||||
|
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
|
||||||
|
{
|
||||||
|
"stream_id": stop,
|
||||||
|
"max_stream_id": max_stream_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return stop > max_stream_id
|
||||||
|
|
||||||
|
finished = await self.db_pool.runInteraction(
|
||||||
|
"_remove_devices_from_device_inbox_txn",
|
||||||
|
_populate_receipt_event_stream_ordering_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
if finished:
|
||||||
|
await self.db_pool.updates._end_background_update(
|
||||||
|
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
|
||||||
|
)
|
||||||
|
|
||||||
|
return batch_size
|
||||||
|
|
||||||
|
|
||||||
|
class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
/* Copyright 2022 Beeper
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;
|
||||||
|
|
||||||
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('populate_event_stream_ordering', '{}');
|
Loading…
Reference in New Issue