Remove some `run_in_background` calls in replication code (#7203)
By running this stuff with `run_in_background`, it won't be correctly reported against the relevant CPU usage stats. Fixes #7202pull/7193/head
parent
6dd6a3557c
commit
bae32740da
|
@ -0,0 +1 @@
|
||||||
|
Fix some worker-mode replication handling not being correctly recorded in CPU usage stats.
|
|
@ -42,7 +42,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.logging.context import LoggingContext, run_in_background
|
from synapse.logging.context import LoggingContext
|
||||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||||
|
@ -635,7 +635,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
|
||||||
await super(GenericWorkerReplicationHandler, self).on_rdata(
|
await super(GenericWorkerReplicationHandler, self).on_rdata(
|
||||||
stream_name, token, rows
|
stream_name, token, rows
|
||||||
)
|
)
|
||||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
await self.process_and_notify(stream_name, token, rows)
|
||||||
|
|
||||||
def get_streams_to_replicate(self):
|
def get_streams_to_replicate(self):
|
||||||
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
|
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
|
||||||
|
@ -650,7 +650,9 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
|
||||||
async def process_and_notify(self, stream_name, token, rows):
|
async def process_and_notify(self, stream_name, token, rows):
|
||||||
try:
|
try:
|
||||||
if self.send_handler:
|
if self.send_handler:
|
||||||
self.send_handler.process_replication_rows(stream_name, token, rows)
|
await self.send_handler.process_replication_rows(
|
||||||
|
stream_name, token, rows
|
||||||
|
)
|
||||||
|
|
||||||
if stream_name == EventsStream.NAME:
|
if stream_name == EventsStream.NAME:
|
||||||
# We shouldn't get multiple rows per token for events stream, so
|
# We shouldn't get multiple rows per token for events stream, so
|
||||||
|
@ -782,12 +784,12 @@ class FederationSenderHandler(object):
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
return {"federation": self.federation_position}
|
return {"federation": self.federation_position}
|
||||||
|
|
||||||
def process_replication_rows(self, stream_name, token, rows):
|
async def process_replication_rows(self, stream_name, token, rows):
|
||||||
# The federation stream contains things that we want to send out, e.g.
|
# The federation stream contains things that we want to send out, e.g.
|
||||||
# presence, typing, etc.
|
# presence, typing, etc.
|
||||||
if stream_name == "federation":
|
if stream_name == "federation":
|
||||||
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
||||||
run_in_background(self.update_token, token)
|
await self.update_token(token)
|
||||||
|
|
||||||
# We also need to poke the federation sender when new events happen
|
# We also need to poke the federation sender when new events happen
|
||||||
elif stream_name == "events":
|
elif stream_name == "events":
|
||||||
|
@ -795,9 +797,7 @@ class FederationSenderHandler(object):
|
||||||
|
|
||||||
# ... and when new receipts happen
|
# ... and when new receipts happen
|
||||||
elif stream_name == ReceiptsStream.NAME:
|
elif stream_name == ReceiptsStream.NAME:
|
||||||
run_as_background_process(
|
await self._on_new_receipts(rows)
|
||||||
"process_receipts_for_federation", self._on_new_receipts, rows
|
|
||||||
)
|
|
||||||
|
|
||||||
# ... as well as device updates and messages
|
# ... as well as device updates and messages
|
||||||
elif stream_name == DeviceListsStream.NAME:
|
elif stream_name == DeviceListsStream.NAME:
|
||||||
|
|
Loading…
Reference in New Issue