Add performance counters for different stages of loading events
parent
8ce100c7b4
commit
d7c7efb691
|
@ -77,6 +77,43 @@ class LoggingTransaction(object):
|
||||||
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
|
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
|
||||||
|
|
||||||
|
|
||||||
|
class PerformanceCounters(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.current_counters = {}
|
||||||
|
self.previous_counters = {}
|
||||||
|
|
||||||
|
def update(self, key, start_time, end_time=None):
|
||||||
|
if end_time is None:
|
||||||
|
end_time = time.time() * 1000;
|
||||||
|
duration = end_time - start_time
|
||||||
|
count, cum_time = self.current_counters.get(key, (0, 0))
|
||||||
|
count += 1
|
||||||
|
cum_time += duration
|
||||||
|
self.current_counters[key] = (count, cum_time)
|
||||||
|
return end_time
|
||||||
|
|
||||||
|
def interval(self, interval_duration, limit=3):
|
||||||
|
counters = []
|
||||||
|
for name, (count, cum_time) in self.current_counters.items():
|
||||||
|
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
|
||||||
|
counters.append((
|
||||||
|
(cum_time - prev_time) / interval_duration,
|
||||||
|
count - prev_count,
|
||||||
|
name
|
||||||
|
))
|
||||||
|
|
||||||
|
self.previous_counters = dict(self.current_counters)
|
||||||
|
|
||||||
|
counters.sort(reverse=True)
|
||||||
|
|
||||||
|
top_n_counters = ", ".join(
|
||||||
|
"%s(%d): %.3f%%" % (name, count, 100 * ratio)
|
||||||
|
for ratio, count, name in txn_counters[:limit]
|
||||||
|
)
|
||||||
|
|
||||||
|
return top_n_counters
|
||||||
|
|
||||||
|
|
||||||
class SQLBaseStore(object):
|
class SQLBaseStore(object):
|
||||||
_TXN_ID = 0
|
_TXN_ID = 0
|
||||||
|
|
||||||
|
@ -88,8 +125,8 @@ class SQLBaseStore(object):
|
||||||
self._previous_txn_total_time = 0
|
self._previous_txn_total_time = 0
|
||||||
self._current_txn_total_time = 0
|
self._current_txn_total_time = 0
|
||||||
self._previous_loop_ts = 0
|
self._previous_loop_ts = 0
|
||||||
self._txn_perf_counters = {}
|
self._txn_perf_counters = PerformanceCounters()
|
||||||
self._previous_txn_perf_counters = {}
|
self._get_event_counters = PerformanceCounters()
|
||||||
|
|
||||||
def start_profiling(self):
|
def start_profiling(self):
|
||||||
self._previous_loop_ts = self._clock.time_msec()
|
self._previous_loop_ts = self._clock.time_msec()
|
||||||
|
@ -105,23 +142,12 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
ratio = (curr - prev)/(time_now - time_then)
|
ratio = (curr - prev)/(time_now - time_then)
|
||||||
|
|
||||||
txn_counters = []
|
top_three_counters = self._txn_perf_counters.interval(
|
||||||
for name, (count, cum_time) in self._txn_perf_counters.items():
|
time_now - time_then, limit=3
|
||||||
prev_count, prev_time = self._previous_txn_perf_counters.get(
|
|
||||||
name, (0,0)
|
|
||||||
)
|
)
|
||||||
txn_counters.append((
|
|
||||||
(cum_time - prev_time) / (time_now - time_then),
|
|
||||||
count - prev_count,
|
|
||||||
name
|
|
||||||
))
|
|
||||||
|
|
||||||
self._previous_txn_perf_counters = dict(self._txn_perf_counters)
|
top_3_event_counters = self._get_event_counters.interval(
|
||||||
|
time_now - time_then, limit=3
|
||||||
txn_counters.sort(reverse=True)
|
|
||||||
top_three_counters = ", ".join(
|
|
||||||
"%s(%d): %.3f%%" % (name, count, 100 * ratio)
|
|
||||||
for ratio, count, name in txn_counters[:3]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -162,11 +188,7 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
self._current_txn_total_time += end - start
|
self._current_txn_total_time += end - start
|
||||||
|
self._txn_perf_counters.update(desc, start, end)
|
||||||
count, cum_time = self._txn_perf_counters.get(desc, (0,0))
|
|
||||||
count += 1
|
|
||||||
cum_time += end - start
|
|
||||||
self._txn_perf_counters[desc] = (count, cum_time)
|
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
result = yield self._db_pool.runInteraction(
|
result = yield self._db_pool.runInteraction(
|
||||||
|
@ -566,6 +588,8 @@ class SQLBaseStore(object):
|
||||||
"LIMIT 1 "
|
"LIMIT 1 "
|
||||||
)
|
)
|
||||||
|
|
||||||
|
start_time = time.time() * 1000;
|
||||||
|
|
||||||
txn.execute(sql, (event_id,))
|
txn.execute(sql, (event_id,))
|
||||||
|
|
||||||
res = txn.fetchone()
|
res = txn.fetchone()
|
||||||
|
@ -575,6 +599,8 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
internal_metadata, js, redacted, rejected_reason = res
|
internal_metadata, js, redacted, rejected_reason = res
|
||||||
|
|
||||||
|
self._get_event_counters.update("select_event", start_time)
|
||||||
|
|
||||||
if allow_rejected or not rejected_reason:
|
if allow_rejected or not rejected_reason:
|
||||||
return self._get_event_from_row_txn(
|
return self._get_event_from_row_txn(
|
||||||
txn, internal_metadata, js, redacted,
|
txn, internal_metadata, js, redacted,
|
||||||
|
@ -586,10 +612,18 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
||||||
check_redacted=True, get_prev_content=False):
|
check_redacted=True, get_prev_content=False):
|
||||||
|
|
||||||
|
start_time = time.time() * 1000;
|
||||||
|
update_counter = self._get_event_counters.update
|
||||||
|
|
||||||
d = json.loads(js)
|
d = json.loads(js)
|
||||||
|
start_time = update_counter("decode_json", start_time)
|
||||||
|
|
||||||
internal_metadata = json.loads(internal_metadata)
|
internal_metadata = json.loads(internal_metadata)
|
||||||
|
start_time = update_counter("decode_internal", start_time)
|
||||||
|
|
||||||
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
|
ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
|
||||||
|
start_time = update_counter("build_frozen_event", start_time)
|
||||||
|
|
||||||
if check_redacted and redacted:
|
if check_redacted and redacted:
|
||||||
ev = prune_event(ev)
|
ev = prune_event(ev)
|
||||||
|
@ -605,6 +639,7 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
if because:
|
if because:
|
||||||
ev.unsigned["redacted_because"] = because
|
ev.unsigned["redacted_because"] = because
|
||||||
|
start_time = update_counter("redact_event", start_time)
|
||||||
|
|
||||||
if get_prev_content and "replaces_state" in ev.unsigned:
|
if get_prev_content and "replaces_state" in ev.unsigned:
|
||||||
prev = self._get_event_txn(
|
prev = self._get_event_txn(
|
||||||
|
@ -614,6 +649,7 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
if prev:
|
if prev:
|
||||||
ev.unsigned["prev_content"] = prev.get_dict()["content"]
|
ev.unsigned["prev_content"] = prev.get_dict()["content"]
|
||||||
|
start_time = update_counter("get_prev_content", start_time)
|
||||||
|
|
||||||
return ev
|
return ev
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue