Only run one background update at a time

pull/7190/head
Richard van der Hoff 2020-03-31 17:43:19 +01:00
parent b4c2234232
commit 7b608cf468
3 changed files with 68 additions and 27 deletions

View File

@ -90,8 +90,10 @@ class BackgroundUpdater(object):
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.db = database self.db = database
# if a background update is currently running, its name.
self._current_background_update = None # type: Optional[str]
self._background_update_performance = {} self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {} self._background_update_handlers = {}
self._all_done = False self._all_done = False
@ -131,7 +133,7 @@ class BackgroundUpdater(object):
return True return True
# obviously, if we have things in our queue, we're not done. # obviously, if we have things in our queue, we're not done.
if self._background_update_queue: if self._current_background_update:
return False return False
# otherwise, check if there are updates to be run. This is important, # otherwise, check if there are updates to be run. This is important,
@ -152,11 +154,10 @@ class BackgroundUpdater(object):
async def has_completed_background_update(self, update_name) -> bool: async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running. """Check if the given background update has finished running.
""" """
if self._all_done: if self._all_done:
return True return True
if update_name in self._background_update_queue: if update_name == self._current_background_update:
return False return False
update_exists = await self.db.simple_select_one_onecol( update_exists = await self.db.simple_select_one_onecol(
@ -180,31 +181,49 @@ class BackgroundUpdater(object):
Returns: Returns:
True if there is no more work to do, otherwise False True if there is no more work to do, otherwise False
""" """
if not self._background_update_queue:
updates = await self.db.simple_select_list( def get_background_updates_txn(txn):
"background_updates", txn.execute(
keyvalues=None, """
retcols=("update_name", "depends_on"), SELECT update_name, depends_on FROM background_updates
ORDER BY ordering, update_name
"""
) )
in_flight = {update["update_name"] for update in updates} return self.db.cursor_to_dict(txn)
for update in updates:
if update["depends_on"] not in in_flight:
self._background_update_queue.append(update["update_name"])
if not self._background_update_queue: if not self._current_background_update:
# no work left to do all_pending_updates = await self.db.runInteraction(
return True "background_updates", get_background_updates_txn,
)
if not all_pending_updates:
# no work left to do
return True
# pop from the front, and add back to the back # find the first update which isn't dependent on another one in the queue.
update_name = self._background_update_queue.pop(0) pending = {update["update_name"] for update in all_pending_updates}
self._background_update_queue.append(update_name) for upd in all_pending_updates:
depends_on = upd["depends_on"]
if not depends_on or depends_on not in pending:
break
logger.info(
"Not starting on bg update %s until %s is done",
upd["update_name"],
depends_on,
)
else:
# if we get to the end of that for loop, there is a problem
raise Exception(
"Unable to find a background update which doesn't depend on "
"another: dependency cycle?"
)
res = await self._do_background_update(update_name, desired_duration_ms) self._current_background_update = upd["update_name"]
await self._do_background_update(desired_duration_ms)
return False return False
async def _do_background_update( async def _do_background_update(self, desired_duration_ms: float) -> int:
self, update_name: str, desired_duration_ms: float update_name = self._current_background_update
) -> int:
logger.info("Starting update batch on background update '%s'", update_name) logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name] update_handler = self._background_update_handlers[update_name]
@ -405,9 +424,12 @@ class BackgroundUpdater(object):
Returns: Returns:
A deferred that completes once the task is removed. A deferred that completes once the task is removed.
""" """
self._background_update_queue = [ if update_name != self._current_background_update:
name for name in self._background_update_queue if name != update_name raise Exception(
] "Cannot end background update %s which isn't currently running"
% update_name
)
self._current_background_update = None
return self.db.simple_delete_one( return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name} "background_updates", keyvalues={"update_name": update_name}
) )

View File

@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database # Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts. # schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 57 SCHEMA_VERSION = 58
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@ -0,0 +1,19 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* add an "ordering" column to background_updates, which can be used to sort them
to achieve some level of consistency. */
ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;