Merge pull request #7190 from matrix-org/rav/one_bg_update_at_a_time

Only run one background update at a time
pull/7193/head
Richard van der Hoff 2020-04-03 13:17:30 +01:00 committed by GitHub
commit fd4c975b5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 72 deletions

1
changelog.d/7190.misc Normal file
View File

@ -0,0 +1 @@
Only run one background database update at a time.

View File

@ -90,8 +90,10 @@ class BackgroundUpdater(object):
self._clock = hs.get_clock()
self.db = database
# if a background update is currently running, its name.
self._current_background_update = None # type: Optional[str]
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
self._all_done = False
@ -111,7 +113,7 @@ class BackgroundUpdater(object):
except Exception:
logger.exception("Error doing update")
else:
if result is None:
if result:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
@ -119,26 +121,25 @@ class BackgroundUpdater(object):
self._all_done = True
return None
@defer.inlineCallbacks
def has_completed_background_updates(self):
async def has_completed_background_updates(self) -> bool:
"""Check if all the background updates have completed
Returns:
Deferred[bool]: True if all background updates have completed
True if all background updates have completed
"""
# if we've previously determined that there is nothing left to do, that
# is easy
if self._all_done:
return True
# obviously, if we have things in our queue, we're not done.
if self._background_update_queue:
# obviously, if we are currently processing an update, we're not done.
if self._current_background_update:
return False
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
updates = yield self.db.simple_select_onecol(
updates = await self.db.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
@ -153,11 +154,10 @@ class BackgroundUpdater(object):
async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running.
"""
if self._all_done:
return True
if update_name in self._background_update_queue:
if update_name == self._current_background_update:
return False
update_exists = await self.db.simple_select_one_onecol(
@ -170,9 +170,7 @@ class BackgroundUpdater(object):
return not update_exists
async def do_next_background_update(
self, desired_duration_ms: float
) -> Optional[int]:
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
"""Does some amount of work on the next queued background update
Returns once some amount of work is done.
@ -181,33 +179,51 @@ class BackgroundUpdater(object):
desired_duration_ms(float): How long we want to spend
updating.
Returns:
None if there is no more work to do, otherwise an int
True if we have finished running all the background updates, otherwise False
"""
if not self._background_update_queue:
updates = await self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),
def get_background_updates_txn(txn):
txn.execute(
"""
SELECT update_name, depends_on FROM background_updates
ORDER BY ordering, update_name
"""
)
in_flight = {update["update_name"] for update in updates}
for update in updates:
if update["depends_on"] not in in_flight:
self._background_update_queue.append(update["update_name"])
return self.db.cursor_to_dict(txn)
if not self._background_update_queue:
# no work left to do
return None
if not self._current_background_update:
all_pending_updates = await self.db.runInteraction(
"background_updates", get_background_updates_txn,
)
if not all_pending_updates:
# no work left to do
return True
# pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
# find the first update which isn't dependent on another one in the queue.
pending = {update["update_name"] for update in all_pending_updates}
for upd in all_pending_updates:
depends_on = upd["depends_on"]
if not depends_on or depends_on not in pending:
break
logger.info(
"Not starting on bg update %s until %s is done",
upd["update_name"],
depends_on,
)
else:
# if we get to the end of that for loop, there is a problem
raise Exception(
"Unable to find a background update which doesn't depend on "
"another: dependency cycle?"
)
res = await self._do_background_update(update_name, desired_duration_ms)
return res
self._current_background_update = upd["update_name"]
async def _do_background_update(
self, update_name: str, desired_duration_ms: float
) -> int:
await self._do_background_update(desired_duration_ms)
return False
async def _do_background_update(self, desired_duration_ms: float) -> int:
update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
@ -400,27 +416,6 @@ class BackgroundUpdater(object):
self.register_background_update_handler(update_name, updater)
def start_background_update(self, update_name, progress):
"""Starts a background update running.
Args:
update_name: The update to set running.
progress: The initial state of the progress of the update.
Returns:
A deferred that completes once the task has been added to the
queue.
"""
# Clear the background update queue so that we will pick up the new
# task on the next iteration of do_background_update.
self._background_update_queue = []
progress_json = json.dumps(progress)
return self.db.simple_insert(
"background_updates",
{"update_name": update_name, "progress_json": progress_json},
)
def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue.
@ -429,9 +424,12 @@ class BackgroundUpdater(object):
Returns:
A deferred that completes once the task is removed.
"""
self._background_update_queue = [
name for name in self._background_update_queue if name != update_name
]
if update_name != self._current_background_update:
raise Exception(
"Cannot end background update %s which isn't currently running"
% update_name
)
self._current_background_update = None
return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)

View File

@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 57
SCHEMA_VERSION = 58
dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@ -0,0 +1,19 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* add an "ordering" column to background_updates, which can be used to sort them
to achieve some level of consistency. */
ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;

View File

@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
# the base test class should have run the real bg updates for us
self.assertTrue(self.updates.has_completed_background_updates())
self.assertTrue(
self.get_success(self.updates.has_completed_background_updates())
)
self.update_handler = Mock()
self.updates.register_background_update_handler(
@ -25,12 +27,20 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
# the target runtime for each bg update
target_background_update_duration_ms = 50000
store = self.hs.get_datastore()
self.get_success(
store.db.simple_insert(
"background_updates",
values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
)
)
# first step: make a bit of progress
@defer.inlineCallbacks
def update(progress, count):
yield self.clock.sleep((count * duration_ms) / 1000)
progress = {"my_key": progress["my_key"] + 1}
yield self.hs.get_datastore().db.runInteraction(
yield store.db.runInteraction(
"update_progress",
self.updates._background_update_progress_txn,
"test_update",
@ -39,10 +49,6 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
return count
self.update_handler.side_effect = update
self.get_success(
self.updates.start_background_update("test_update", {"my_key": 1})
)
self.update_handler.reset_mock()
res = self.get_success(
self.updates.do_next_background_update(
@ -50,7 +56,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
),
by=0.1,
)
self.assertIsNotNone(res)
self.assertFalse(res)
# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
@ -73,7 +79,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNotNone(result)
self.assertFalse(result)
self.update_handler.assert_called_once()
# third step: we don't expect to be called any more
@ -81,5 +87,5 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNone(result)
self.assertTrue(result)
self.assertFalse(self.update_handler.called)

View File

@ -40,6 +40,7 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import (
SENTINEL_CONTEXT,
LoggingContext,
current_context,
set_current_context,
)
@ -419,15 +420,17 @@ class HomeserverTestCase(TestCase):
config_obj.parse_config_dict(config, "", "")
kwargs["config"] = config_obj
async def run_bg_updates():
with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
while not await stor.db.updates.has_completed_background_updates():
await stor.db.updates.do_next_background_update(1)
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore()
# Run the database background updates, when running against "master".
if hs.__class__.__name__ == "TestHomeServer":
while not self.get_success(
stor.db.updates.has_completed_background_updates()
):
self.get_success(stor.db.updates.do_next_background_update(1))
self.get_success(run_bg_updates())
return hs