Add a utility class that can be used to generate a twisted deferred aware call graph

function_tracer
Erik Johnston 2015-03-12 16:49:27 +00:00
parent abaf47bbb6
commit d1ae594ae5
5 changed files with 417 additions and 2 deletions

116
scripts/graph_tracer.py Normal file
View File

@ -0,0 +1,116 @@
import fileinput
import pydot
nodes = {}
edges = set()
graph = pydot.Dot(graph_name="call_graph", graph_type="digraph")
names = {}
times = {}
deferreds = {}
deferred_edges = set()
root_id = None
for line in fileinput.input():
try:
if " calls " in line:
start, end = line.split(" calls ")
start, end = start.strip(), end.strip()
edges.add((start, end))
print start, end
if " named " in line:
node_id, name = line.split(" named ")
names[node_id.strip()] = name.strip()
if name.strip() == "Deferred synapse.rest.client.v1.room.RoomSendEventRestServlet.on_PUT":
root_id = node_id
if " in " in line:
node_id, d = line.split(" in ")
deferreds[node_id.strip()] = d.strip()
if " time " in line:
node_id, ms = line.split(" time ")
times[node_id.strip()] = int(ms.strip())
if " waits on " in line:
start, end = line.split(" waits on ")
start, end = start.strip(), end.strip()
deferred_edges.add((start, end))
print start, end
except Exception as e:
print "failed %s to parse '%s'" % (e.message, line)
# deferreds_root = set(deferreds.values())
# for parent, child in deferred_edges:
# deferreds_root.discard(child)
#
# deferred_tree = {
# d: {}
# for d in deferreds_root
# }
#
# def populate(root, tree):
# for leaf in deferred_edges.get(root, []):
# populate(leaf, tree.setdefault(leaf, {}))
#
#
# for d in deferreds_root:
# tree = deferred_tree.setdefault(d, {})
# populate(d, tree)
print deferred_edges
print root_id
def is_in_deferred(d):
while True:
if d == root_id:
return True
for start, end in deferred_edges:
if d == end:
d = start
break
else:
return False
for node_id, name in names.items():
# if times.get(node_id, 100) < 5:
# continue
if node_id in deferreds:
if not is_in_deferred(deferreds[node_id]):
continue
else:
if not is_in_deferred(node_id):
continue
node = pydot.Node(node_id, label=name)
# if node_id in deferreds:
# clusters[deferreds[node_id]].add_node(node)
# elif node_id in clusters:
# clusters[node_id].add_node(node)
# else:
# graph.add_node(node)
graph.add_node(node)
nodes[node_id] = node
# print node_id
for parent, child in edges:
if child not in nodes:
print child, "not a node"
continue
if parent not in nodes:
print parent, "not a node"
continue
edge = pydot.Edge(nodes[parent], nodes[child])
graph.add_edge(edge)
file_prefix = "call_graph_out"
graph.write('%s.dot' % file_prefix, format='raw', prog='dot')
graph.write_svg("%s.svg" % file_prefix, prog='dot')

View File

@ -51,6 +51,8 @@ from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
from daemonize import Daemonize
import twisted.manhole.telnet
from synapse.util.traceutil import Tracer
import synapse
import logging
@ -61,6 +63,7 @@ import subprocess
import sqlite3
import syweb
logger = logging.getLogger(__name__)
@ -399,8 +402,13 @@ class SynapseService(service.Service):
def run(hs):
def in_thread():
try:
tracer = Tracer()
sys.settrace(tracer.process)
except Exception:
logger.exception("Failed to start tracer")
with LoggingContext("run"):
change_resource_limit(hs.config.soft_file_limit)

View File

@ -73,7 +73,6 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@log_function
@defer.inlineCallbacks
def handle_new_event(self, event, destinations):
""" Takes in an event from the client to server side, that has already

View File

@ -165,6 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
defer.returnValue((200, {}))
def trace(f):
f.should_trace = True
f.root_trace = True
return f
# TODO: Needs unit testing for generic events + feedback
class RoomSendEventRestServlet(ClientV1RestServlet):
@ -175,7 +181,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, room_id, event_type, txn_id=None):
import inspect
frame = inspect.currentframe()
logger.info("Frame: %s", id(frame))
user, client = yield self.auth.get_user_by_req(request)
logger.info("Frame: %s", id(inspect.currentframe()))
content = _parse_json(request)
msg_handler = self.handlers.message_handler
@ -189,12 +199,14 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
client=client,
txn_id=txn_id,
)
logger.info("Frame: %s", id(inspect.currentframe()))
defer.returnValue((200, {"event_id": event.event_id}))
def on_GET(self, request, room_id, event_type, txn_id):
return (200, "Not implemented")
@trace
@defer.inlineCallbacks
def on_PUT(self, request, room_id, event_type, txn_id):
try:

280
synapse/util/traceutil.py Normal file
View File

@ -0,0 +1,280 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import inspect
import logging
logger = logging.getLogger("Tracer")
class Tracer(object):
def __init__(self):
self.interested_deferreds = set()
self.next_id = 1
self.deferred_to_current_frames = {}
def process(self, frame, event, arg):
if event == 'call':
return self.process_call(frame)
def handle_inline_callbacks(self, frm):
argvalues = inspect.getargvalues(frm)
generator = argvalues.locals["g"]
deferred = argvalues.locals["deferred"]
if not hasattr(deferred, "syn_trace_defer_id"):
trace_id = self.get_next_id()
deferred.syn_trace_defer_id = trace_id
logger.info(
"%s named Deferred %s",
trace_id,
self.get_name_for_frame(generator.gi_frame)
)
logger.info("%s start %d", trace_id, int(time.time() * 1000))
def do(res):
logger.info("%s end %d", trace_id, int(time.time() * 1000))
return res
deferred.addBoth(do)
back = frm.f_back
while back:
try:
name = self.get_name_for_frame(back)
if name == "twisted.internet.defer._inlineCallbacks":
argvalues = inspect.getargvalues(back)
deferred = argvalues.locals["deferred"]
d_id = getattr(deferred, "syn_trace_defer_id", None)
if d_id:
logger.info("%s in %s", trace_id, d_id)
curr_stack = self.deferred_to_current_frames.setdefault(
d_id, []
)
if curr_stack:
logger.info("%s calls %s", curr_stack[-1], trace_id)
else:
logger.info("%s calls %s", d_id, trace_id)
break
except:
pass
back = back.f_back
def are_interested(self, name):
if not name.startswith("synapse"):
return False
if name.startswith("synapse.util.logcontext"):
return False
if name.startswith("synapse.util.logutils"):
return False
if name.startswith("synapse.util.traceutil"):
return False
if name.startswith("synapse.events.FrozenEvent.get"):
return False
if name.startswith("synapse.events.EventBuilder.get"):
return False
if name.startswith("synapse.types"):
return False
if name.startswith("synapse.util.frozenutils.freeze"):
return False
if name.startswith("synapse.util.frozenutils.<dictcomp>"):
return False
if name.startswith("synapse.util.Clock"):
return False
if name.endswith("__repr__") or name.endswith("__str__"):
return False
if name.endswith("<genexpr>"):
return False
return True
def process_call(self, frame):
should_trace = False
try:
name = self.get_name_for_frame(frame)
if name == "twisted.internet.defer._inlineCallbacks":
self.handle_inline_callbacks(frame)
return
if not self.are_interested(name):
return
back_name = self.get_name_for_frame(frame.f_back)
if name == "synapse.api.auth.Auth.get_user_by_req":
logger.info(
"synapse.api.auth.Auth.get_user_by_req %s",
back_name
)
try:
if back_name == "twisted.internet.defer._inlineCallbacks":
def ret(f, event, result):
if event != "return":
return
argvalues = inspect.getargvalues(frame.f_back)
deferred = argvalues.locals["deferred"]
try:
logger.info(
"%s waits on %s",
deferred.syn_trace_defer_id,
result.syn_trace_defer_id
)
except:
pass
return ret
if back_name == "twisted.internet.defer.unwindGenerator":
return
except:
pass
try:
func = getattr(frame.f_locals["self"], frame.f_code.co_name)
if inspect.isgeneratorfunction(func):
return
except:
pass
try:
func = frame.f_globals[frame.f_code.co_name]
if inspect.isgeneratorfunction(func):
return
except:
pass
except:
return
back = frame
names = []
seen_deferreds = []
bottom_deferred = None
while back:
try:
name = self.get_name_for_frame(back)
if name.startswith("synapse"):
names.append(name)
# if name.startswith("twisted.internet.defer"):
# logger.info("Name: %s", name)
if name == "twisted.internet.defer._inlineCallbacks":
argvalues = inspect.getargvalues(back)
deferred = argvalues.locals["deferred"]
d_id = getattr(deferred, "syn_trace_defer_id", None)
if d_id:
seen_deferreds.append(d_id)
if not bottom_deferred:
bottom_deferred = deferred
if d_id in self.interested_deferreds:
should_trace = True
break
func = getattr(back.f_locals["self"], back.f_code.co_name)
if hasattr(func, "should_trace") or hasattr(func.im_func, "should_trace"):
should_trace = True
break
func.root_trace
should_trace = True
break
except:
pass
back = back.f_back
if not should_trace:
return
frame_id = self.get_next_id()
name = self.get_name_for_frame(frame)
logger.info("%s named %s", frame_id, name)
self.interested_deferreds.update(seen_deferreds)
names.reverse()
if bottom_deferred:
self.deferred_frames.setdefault(
bottom_deferred.syn_trace_defer_id, []
).append(names)
logger.info("%s in %s", frame_id, bottom_deferred.syn_trace_defer_id)
if not hasattr(bottom_deferred, "syn_trace_registered_cb"):
bottom_deferred.syn_trace_registered_cb = True
def do(res):
return res
bottom_deferred.addBoth(do)
curr_stack = self.deferred_to_current_frames.setdefault(
bottom_deferred.syn_trace_defer_id, []
)
if curr_stack:
logger.info("%s calls %s", curr_stack[-1], frame_id)
else:
logger.info("%s calls %s", bottom_deferred.syn_trace_defer_id, frame_id)
curr_stack.append(frame_id)
logger.info("%s start %d", frame_id, int(time.time() * 1000))
def p(frame, event, arg):
if event == "return":
curr_stack.pop()
logger.info("%s end %d", frame_id, int(time.time() * 1000))
return p
def get_name_for_frame(self, frame):
module_name = frame.f_globals["__name__"]
cls_instance = frame.f_locals.get("self", None)
if cls_instance:
cls_name = cls_instance.__class__.__name__
name = "%s.%s.%s" % (
module_name, cls_name, frame.f_code.co_name
)
else:
name = "%s.%s" % (
module_name, frame.f_code.co_name
)
return name
def get_next_id(self):
i = self.next_id
self.next_id += 1
return i