Async/await for background updates (#6647)

so that bg update routines can be async
pull/6648/head
Richard van der Hoff 2020-01-07 14:12:42 +00:00 committed by GitHub
parent d20c346544
commit 9824a39d80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 16 deletions

1
changelog.d/6647.misc Normal file
View File

@ -0,0 +1 @@
Port core background update routines to async/await.

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Optional
from canonicaljson import json from canonicaljson import json
@ -97,15 +98,14 @@ class BackgroundUpdater(object):
def start_doing_background_updates(self): def start_doing_background_updates(self):
run_as_background_process("background_updates", self.run_background_updates) run_as_background_process("background_updates", self.run_background_updates)
@defer.inlineCallbacks async def run_background_updates(self, sleep=True):
def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates") logger.info("Starting background schema updates")
while True: while True:
if sleep: if sleep:
yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try: try:
result = yield self.do_next_background_update( result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS self.BACKGROUND_UPDATE_DURATION_MS
) )
except Exception: except Exception:
@ -170,20 +170,21 @@ class BackgroundUpdater(object):
return not update_exists return not update_exists
@defer.inlineCallbacks async def do_next_background_update(
def do_next_background_update(self, desired_duration_ms): self, desired_duration_ms: float
) -> Optional[int]:
"""Does some amount of work on the next queued background update """Does some amount of work on the next queued background update
Returns once some amount of work is done.
Args: Args:
desired_duration_ms(float): How long we want to spend desired_duration_ms(float): How long we want to spend
updating. updating.
Returns: Returns:
A deferred that completes once some amount of work is done. None if there is no more work to do, otherwise an int
The deferred will have a value of None if there is currently
no more work to do.
""" """
if not self._background_update_queue: if not self._background_update_queue:
updates = yield self.db.simple_select_list( updates = await self.db.simple_select_list(
"background_updates", "background_updates",
keyvalues=None, keyvalues=None,
retcols=("update_name", "depends_on"), retcols=("update_name", "depends_on"),
@ -201,11 +202,12 @@ class BackgroundUpdater(object):
update_name = self._background_update_queue.pop(0) update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name) self._background_update_queue.append(update_name)
res = yield self._do_background_update(update_name, desired_duration_ms) res = await self._do_background_update(update_name, desired_duration_ms)
return res return res
@defer.inlineCallbacks async def _do_background_update(
def _do_background_update(self, update_name, desired_duration_ms): self, update_name: str, desired_duration_ms: float
) -> int:
logger.info("Starting update batch on background update '%s'", update_name) logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name] update_handler = self._background_update_handlers[update_name]
@ -225,7 +227,7 @@ class BackgroundUpdater(object):
else: else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
progress_json = yield self.db.simple_select_one_onecol( progress_json = await self.db.simple_select_one_onecol(
"background_updates", "background_updates",
keyvalues={"update_name": update_name}, keyvalues={"update_name": update_name},
retcol="progress_json", retcol="progress_json",
@ -234,7 +236,7 @@ class BackgroundUpdater(object):
progress = json.loads(progress_json) progress = json.loads(progress_json)
time_start = self._clock.time_msec() time_start = self._clock.time_msec()
items_updated = yield update_handler(progress, batch_size) items_updated = await update_handler(progress, batch_size)
time_stop = self._clock.time_msec() time_stop = self._clock.time_msec()
duration_ms = time_stop - time_start duration_ms = time_stop - time_start
@ -263,7 +265,9 @@ class BackgroundUpdater(object):
* A dict of the current progress * A dict of the current progress
* An integer count of the number of items to update in this batch. * An integer count of the number of items to update in this batch.
The handler should return a deferred integer count of items updated. The handler should return a deferred or coroutine which returns an integer count
of items updated.
The handler is responsible for updating the progress of the update. The handler is responsible for updating the progress of the update.
Args: Args: