Consistently use wrap_as_background_task in more places (#8599)

pull/8572/head
Patrick Cloke 2020-10-20 11:29:38 -04:00 committed by GitHub
parent 84c0e46cce
commit 9e0f22874f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 30 deletions

1
changelog.d/8599.feature Normal file
View File

@ -0,0 +1 @@
Allow running background tasks in a separate worker process.

View File

@ -22,7 +22,7 @@ from typing import List
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.logging.context import make_deferred_yieldable from synapse.logging.context import make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import UserID from synapse.types import UserID
from synapse.util import stringutils from synapse.util import stringutils
@ -63,16 +63,10 @@ class AccountValidityHandler:
self._raw_from = email.utils.parseaddr(self._from_string)[1] self._raw_from = email.utils.parseaddr(self._from_string)[1]
# Check the renewal emails to send and send them every 30min. # Check the renewal emails to send and send them every 30min.
def send_emails():
# run as a background process to make sure that the database transactions
# have a logcontext to report to
return run_as_background_process(
"send_renewals", self._send_renewal_emails
)
if hs.config.run_background_tasks: if hs.config.run_background_tasks:
self.clock.looping_call(send_emails, 30 * 60 * 1000) self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000)
@wrap_as_background_process("send_renewals")
async def _send_renewal_emails(self): async def _send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time """Gets the list of users whose account is expiring in the amount of time
configured in the ``renew_at`` parameter from the ``account_validity`` configured in the ``renew_at`` parameter from the ``account_validity``

View File

@ -24,7 +24,7 @@ from synapse.api.errors import (
StoreError, StoreError,
SynapseError, SynapseError,
) )
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import UserID, create_requester, get_domain_from_id from synapse.types import UserID, create_requester, get_domain_from_id
from ._base import BaseHandler from ._base import BaseHandler
@ -57,7 +57,7 @@ class ProfileHandler(BaseHandler):
if hs.config.run_background_tasks: if hs.config.run_background_tasks:
self.clock.looping_call( self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS self._update_remote_profile_cache, self.PROFILE_UPDATE_MS
) )
async def get_profile(self, user_id): async def get_profile(self, user_id):
@ -370,11 +370,7 @@ class ProfileHandler(BaseHandler):
raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN) raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
raise raise
def _start_update_remote_profile_cache(self): @wrap_as_background_process("Update remote profile")
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache
)
async def _update_remote_profile_cache(self): async def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we haven't """Called periodically to check profiles of remote users we haven't
checked in a while. checked in a while.

View File

@ -33,7 +33,10 @@ from synapse.api.room_versions import (
from synapse.events import EventBase, make_event_from_dict from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.events import EventsStream
@ -140,10 +143,7 @@ class EventsWorkerStore(SQLBaseStore):
if hs.config.run_background_tasks: if hs.config.run_background_tasks:
# We periodically clean out old transaction ID mappings # We periodically clean out old transaction ID mappings
self._clock.looping_call( self._clock.looping_call(
run_as_background_process, self._cleanup_old_transaction_ids, 5 * 60 * 1000,
5 * 60 * 1000,
"_cleanup_old_transaction_ids",
self._cleanup_old_transaction_ids,
) )
self._get_event_cache = LruCache( self._get_event_cache = LruCache(
@ -1374,6 +1374,7 @@ class EventsWorkerStore(SQLBaseStore):
return mapping return mapping
@wrap_as_background_process("_cleanup_old_transaction_ids")
async def _cleanup_old_transaction_ids(self): async def _cleanup_old_transaction_ids(self):
"""Cleans out transaction id mappings older than 24hrs. """Cleans out transaction id mappings older than 24hrs.
""" """

View File

@ -20,7 +20,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore
@ -67,16 +70,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
): ):
self._known_servers_count = 1 self._known_servers_count = 1
self.hs.get_clock().looping_call( self.hs.get_clock().looping_call(
run_as_background_process, self._count_known_servers, 60 * 1000,
60 * 1000,
"_count_known_servers",
self._count_known_servers,
) )
self.hs.get_clock().call_later( self.hs.get_clock().call_later(
1000, 1000, self._count_known_servers,
run_as_background_process,
"_count_known_servers",
self._count_known_servers,
) )
LaterGauge( LaterGauge(
"synapse_federation_known_servers", "synapse_federation_known_servers",
@ -85,6 +82,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
lambda: self._known_servers_count, lambda: self._known_servers_count,
) )
@wrap_as_background_process("_count_known_servers")
async def _count_known_servers(self): async def _count_known_servers(self):
""" """
Count the servers that this server knows about. Count the servers that this server knows about.