Attempt to include db threads in cpu usage stats (#3496)
Let's try to include time spent in the DB threads in the per-request/block cpu usage metrics.pull/3514/head
parent
55370331da
commit
c3c29aa196
|
@ -0,0 +1 @@
|
||||||
|
Include CPU time from database threads in request/block metrics.
|
|
@ -220,7 +220,7 @@ class SQLBaseStore(object):
|
||||||
self._clock.looping_call(loop, 10000)
|
self._clock.looping_call(loop, 10000)
|
||||||
|
|
||||||
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
|
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
|
||||||
logging_context, func, *args, **kwargs):
|
func, *args, **kwargs):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
txn_id = self._TXN_ID
|
txn_id = self._TXN_ID
|
||||||
|
|
||||||
|
@ -284,8 +284,7 @@ class SQLBaseStore(object):
|
||||||
end = time.time()
|
end = time.time()
|
||||||
duration = end - start
|
duration = end - start
|
||||||
|
|
||||||
if logging_context is not None:
|
LoggingContext.current_context().add_database_transaction(duration)
|
||||||
logging_context.add_database_transaction(duration)
|
|
||||||
|
|
||||||
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
|
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
|
||||||
|
|
||||||
|
@ -309,19 +308,15 @@ class SQLBaseStore(object):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: The result of func
|
Deferred: The result of func
|
||||||
"""
|
"""
|
||||||
current_context = LoggingContext.current_context()
|
|
||||||
|
|
||||||
after_callbacks = []
|
after_callbacks = []
|
||||||
exception_callbacks = []
|
exception_callbacks = []
|
||||||
|
|
||||||
def inner_func(conn, *args, **kwargs):
|
|
||||||
return self._new_transaction(
|
|
||||||
conn, desc, after_callbacks, exception_callbacks, current_context,
|
|
||||||
func, *args, **kwargs
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = yield self.runWithConnection(inner_func, *args, **kwargs)
|
result = yield self.runWithConnection(
|
||||||
|
self._new_transaction,
|
||||||
|
desc, after_callbacks, exception_callbacks, func,
|
||||||
|
*args, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
for after_callback, after_args, after_kwargs in after_callbacks:
|
for after_callback, after_args, after_kwargs in after_callbacks:
|
||||||
after_callback(*after_args, **after_kwargs)
|
after_callback(*after_args, **after_kwargs)
|
||||||
|
@ -346,22 +341,25 @@ class SQLBaseStore(object):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: The result of func
|
Deferred: The result of func
|
||||||
"""
|
"""
|
||||||
current_context = LoggingContext.current_context()
|
parent_context = LoggingContext.current_context()
|
||||||
|
if parent_context == LoggingContext.sentinel:
|
||||||
|
logger.warn(
|
||||||
|
"Running db txn from sentinel context: metrics will be lost",
|
||||||
|
)
|
||||||
|
parent_context = None
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
def inner_func(conn, *args, **kwargs):
|
def inner_func(conn, *args, **kwargs):
|
||||||
with LoggingContext("runWithConnection") as context:
|
with LoggingContext("runWithConnection", parent_context) as context:
|
||||||
sched_duration_sec = time.time() - start_time
|
sched_duration_sec = time.time() - start_time
|
||||||
sql_scheduling_timer.observe(sched_duration_sec)
|
sql_scheduling_timer.observe(sched_duration_sec)
|
||||||
current_context.add_database_scheduled(sched_duration_sec)
|
context.add_database_scheduled(sched_duration_sec)
|
||||||
|
|
||||||
if self.database_engine.is_connection_closed(conn):
|
if self.database_engine.is_connection_closed(conn):
|
||||||
logger.debug("Reconnecting closed database connection")
|
logger.debug("Reconnecting closed database connection")
|
||||||
conn.reconnect()
|
conn.reconnect()
|
||||||
|
|
||||||
current_context.copy_to(context)
|
|
||||||
|
|
||||||
return func(conn, *args, **kwargs)
|
return func(conn, *args, **kwargs)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
|
|
|
@ -261,7 +261,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
]
|
]
|
||||||
|
|
||||||
rows = self._new_transaction(
|
rows = self._new_transaction(
|
||||||
conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
|
conn, "do_fetch", [], [],
|
||||||
|
self._fetch_event_rows, event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
row_dict = {
|
row_dict = {
|
||||||
|
|
|
@ -137,12 +137,18 @@ class LoggingContext(object):
|
||||||
"""Additional context for log formatting. Contexts are scoped within a
|
"""Additional context for log formatting. Contexts are scoped within a
|
||||||
"with" block.
|
"with" block.
|
||||||
|
|
||||||
|
If a parent is given when creating a new context, then:
|
||||||
|
- logging fields are copied from the parent to the new context on entry
|
||||||
|
- when the new context exits, the cpu usage stats are copied from the
|
||||||
|
child to the parent
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
name (str): Name for the context for debugging.
|
name (str): Name for the context for debugging.
|
||||||
|
parent_context (LoggingContext|None): The parent of the new context
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
"previous_context", "name",
|
"previous_context", "name", "parent_context",
|
||||||
"_resource_usage",
|
"_resource_usage",
|
||||||
"usage_start",
|
"usage_start",
|
||||||
"main_thread", "alive",
|
"main_thread", "alive",
|
||||||
|
@ -183,7 +189,7 @@ class LoggingContext(object):
|
||||||
|
|
||||||
sentinel = Sentinel()
|
sentinel = Sentinel()
|
||||||
|
|
||||||
def __init__(self, name=None):
|
def __init__(self, name=None, parent_context=None):
|
||||||
self.previous_context = LoggingContext.current_context()
|
self.previous_context = LoggingContext.current_context()
|
||||||
self.name = name
|
self.name = name
|
||||||
|
|
||||||
|
@ -199,6 +205,8 @@ class LoggingContext(object):
|
||||||
self.tag = ""
|
self.tag = ""
|
||||||
self.alive = True
|
self.alive = True
|
||||||
|
|
||||||
|
self.parent_context = parent_context
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "%s@%x" % (self.name, id(self))
|
return "%s@%x" % (self.name, id(self))
|
||||||
|
|
||||||
|
@ -236,6 +244,10 @@ class LoggingContext(object):
|
||||||
self.previous_context, old_context
|
self.previous_context, old_context
|
||||||
)
|
)
|
||||||
self.alive = True
|
self.alive = True
|
||||||
|
|
||||||
|
if self.parent_context is not None:
|
||||||
|
self.parent_context.copy_to(self)
|
||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, type, value, traceback):
|
def __exit__(self, type, value, traceback):
|
||||||
|
@ -257,6 +269,13 @@ class LoggingContext(object):
|
||||||
self.previous_context = None
|
self.previous_context = None
|
||||||
self.alive = False
|
self.alive = False
|
||||||
|
|
||||||
|
# if we have a parent, pass our CPU usage stats on
|
||||||
|
if self.parent_context is not None:
|
||||||
|
self.parent_context._resource_usage += self._resource_usage
|
||||||
|
|
||||||
|
# reset them in case we get entered again
|
||||||
|
self._resource_usage.reset()
|
||||||
|
|
||||||
def copy_to(self, record):
|
def copy_to(self, record):
|
||||||
"""Copy logging fields from this context to a log record or
|
"""Copy logging fields from this context to a log record or
|
||||||
another LoggingContext
|
another LoggingContext
|
||||||
|
|
Loading…
Reference in New Issue