From b5d1c68beb24c314006398052070448d67bb4983 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Mar 2017 02:21:07 +0000 Subject: [PATCH 01/11] Implement reset_context_after_deferred to correctly reset the context when we fire off a deferred we aren't going to wait for. --- synapse/util/logcontext.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 6c83eb213d..d73670f9f2 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -308,6 +308,31 @@ def preserve_context_over_deferred(deferred, context=None): return d +def reset_context_after_deferred(deferred): + """If the deferred is incomplete, add a callback which will reset the + context. + + This is useful when you want to fire off a deferred, but don't want to + wait for it to complete. (The deferred will restore the current log context + when it completes, so if you don't do anything, it will leak log context.) + + (If this feels asymmetric, consider it this way: we are effectively forking + a new thread of execution. We are probably currently within a + ``with LoggingContext()`` block, which is supposed to have a single entry + and exit point. But by spawning off another deferred, we are effectively + adding a new exit point.) + + Args: + deferred (defer.Deferred): deferred + """ + def reset_context(result): + LoggingContext.set_current_context(LoggingContext.sentinel) + return result + + if not deferred.called: + deferred.addBoth(reset_context) + + def preserve_fn(f): """Ensures that function is called with correct context and that context is restored after return. Useful for wrapping functions that return a deferred From 9ce53a3861881e1da54d87d2db875f53eafef8ac Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Mar 2017 11:26:57 +0000 Subject: [PATCH 02/11] Queue up federation PDUs while a room join is in progress This just takes the existing `room_queues` logic and moves it out to `on_receive_pdu` instead of `_process_received_pdu`, which ensures that we don't start trying to fetch prev_events and whathaveyou until the join has completed. --- synapse/handlers/federation.py | 68 ++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d0c2b4d6ed..0cd5501b05 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,6 +14,7 @@ # limitations under the License. """Contains handlers for federation events.""" +import synapse.util.logcontext from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -114,6 +115,14 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # If we are currently in the process of joining this room, then we + # queue up events for later processing. + if pdu.room_id in self.room_queues: + logger.info("Ignoring PDU %s for room %s from %s for now; join " + "in progress", pdu.event_id, pdu.room_id, origin) + self.room_queues[pdu.room_id].append((pdu, origin)) + return + state = None auth_chain = [] @@ -274,26 +283,13 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): + def _process_received_pdu(self, origin, pdu, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. - - auth_chain and state are None if we already have the necessary state - and prev_events in the db """ event = pdu - logger.debug("Got event: %s", event.event_id) - - # If we are currently in the process of joining this room, then we - # queue up events for later processing. - if event.room_id in self.room_queues: - self.room_queues[event.room_id].append((pdu, origin)) - return - - logger.debug("Processing event: %s", event.event_id) - - logger.debug("Event: %s", event) + logger.debug("Processing event: %s", event) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -862,8 +858,6 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - yield self.store.clean_room_for_join(room_id) - origin, event = yield self._make_and_verify_event( target_hosts, room_id, @@ -872,7 +866,15 @@ class FederationHandler(BaseHandler): content, ) + # This shouldn't happen, because the RoomMemberHandler has a + # linearizer lock which only allows one operation per user per room + # at a time - so this is just paranoia. + assert (room_id not in self.room_queues) + self.room_queues[room_id] = [] + + yield self.store.clean_room_for_join(room_id) + handled_events = set() try: @@ -925,17 +927,35 @@ class FederationHandler(BaseHandler): room_queue = self.room_queues[room_id] del self.room_queues[room_id] - for p, origin in room_queue: - if p.event_id in handled_events: - continue + # we don't need to wait for the queued events to be processed - + # it's just a best-effort thing at this point. We do want to do + # them roughly in order, though, otherwise we'll end up making + # lots of requests for missing prev_events which we do actually + # have. Hence we fire off the deferred, but don't wait for it. - try: - self._process_received_pdu(origin, p) - except: - logger.exception("Couldn't handle pdu") + synapse.util.logcontext.reset_context_after_deferred( + self._handle_queued_pdus(room_queue)) defer.returnValue(True) + @defer.inlineCallbacks + def _handle_queued_pdus(self, room_queue): + """Process PDUs which got queued up while we were busy send_joining. + + Args: + room_queue (list[FrozenEvent, str]): list of PDUs to be processed + and the servers that sent them + """ + for p, origin in room_queue: + try: + logger.info("Processing queued PDU %s which was received " + "while we were joining %s", p.event_id, p.room_id) + yield self.on_receive_pdu(origin, p) + except Exception as e: + logger.warn( + "Error handling queued PDU %s from %s: %s", + p.event_id, origin, e) + @defer.inlineCallbacks @log_function def on_make_join_request(self, room_id, user_id): From 5068fb16a520d7251461decb289a960ec636d4fe Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 11:51:13 +0000 Subject: [PATCH 03/11] Refactoring and cleanups A few non-functional changes: * A bunch of docstrings to document types * Split `EventsStore._persist_events_txn` up a bit. Hopefully it's a bit more readable. * Rephrase `EventFederationStore._update_min_depth_for_room_txn` to avoid mind-bending conditional. * Rephrase rejected/outlier conditional in `_update_outliers_txn` to avoid mind-bending conditional. --- synapse/events/snapshot.py | 26 +++ synapse/handlers/federation.py | 10 + synapse/state.py | 11 +- synapse/storage/event_federation.py | 24 +-- synapse/storage/events.py | 275 ++++++++++++++++++++++------ 5 files changed, 265 insertions(+), 81 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 11605b34a3..6be18880b9 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -15,6 +15,32 @@ class EventContext(object): + """ + Attributes: + current_state_ids (dict[(str, str), str]): + The current state map including the current event. + (type, state_key) -> event_id + + prev_state_ids (dict[(str, str), str]): + The current state map excluding the current event. + (type, state_key) -> event_id + + state_group (int): state group id + rejected (bool|str): A rejection reason if the event was rejected, else + False + + push_actions (list[(str, list[object])]): list of (user_id, actions) + tuples + + prev_group (int): Previously persisted state group. ``None`` for an + outlier. + delta_ids (dict[(str, str), str]): Delta from ``prev_group``. + (type, state_key) -> event_id. ``None`` for an outlier. + + prev_state_events (?): XXX: is this ever set to anything other than + the empty list? + """ + __slots__ = [ "current_state_ids", "prev_state_ids", diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0cd5501b05..10b2325b27 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1537,7 +1537,17 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): + """ + Args: + origin: + event: + state: + auth_events: + + Returns: + Deferred, which resolves to synapse.events.snapshot.EventContext + """ context = yield self.state_handler.compute_event_context( event, old_state=state, ) diff --git a/synapse/state.py b/synapse/state.py index 383d32b163..9a523a1b89 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -177,17 +177,12 @@ class StateHandler(object): @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): - """ Fills out the context with the `current state` of the graph. The - `current state` here is defined to be the state of the event graph - just before the event - i.e. it never includes `event` - - If `event` has `auth_events` then this will also fill out the - `auth_events` field on `context` from the `current_state`. + """Build an EventContext structure for the event. Args: - event (EventBase) + event (synapse.events.EventBase): Returns: - an EventContext + synapse.events.snapshot.EventContext: """ context = EventContext() diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 256e50dc20..0d97de2fe7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -201,19 +201,19 @@ class EventFederationStore(SQLBaseStore): def _update_min_depth_for_room_txn(self, txn, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) - do_insert = depth < min_depth if min_depth else True + if min_depth and depth >= min_depth: + return - if do_insert: - self._simple_upsert_txn( - txn, - table="room_depth", - keyvalues={ - "room_id": room_id, - }, - values={ - "min_depth": depth, - }, - ) + self._simple_upsert_txn( + txn, + table="room_depth", + keyvalues={ + "room_id": room_id, + }, + values={ + "min_depth": depth, + }, + ) def _handle_mult_prev_events(self, txn, events): """ diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 72319c35ae..42e433da85 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict from functools import wraps -import synapse import synapse.metrics - import logging import math import ujson as json +# these are only included to make the type annotations work +from synapse.events import EventBase # noqa: F401 +from synapse.events.snapshot import EventContext # noqa: F401 + logger = logging.getLogger(__name__) @@ -82,6 +84,11 @@ class _EventPeristenceQueue(object): def add_to_queue(self, room_id, events_and_contexts, backfilled): """Add events to the queue, with the given persist_event options. + + Args: + room_id (str): + events_and_contexts (list[(EventBase, EventContext)]): + backfilled (bool): """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: @@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False): + """ + + Args: + event (EventBase): + context (EventContext): + backfilled (bool): + + Returns: + Deferred: resolves to (int, int): the stream ordering of ``event``, + and the stream ordering of the latest persisted event + """ deferred = self._event_persist_queue.add_to_queue( event.room_id, [(event, context)], backfilled=backfilled, @@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _persist_events(self, events_and_contexts, backfilled=False, delete_existing=False): + """Persist events to db + + Args: + events_and_contexts (list[(EventBase, EventContext)]): + backfilled (bool): + delete_existing (bool): + + Returns: + Deferred: resolves when the events have been persisted + """ if not events_and_contexts: return @@ -554,11 +582,87 @@ class EventsStore(SQLBaseStore): and the rejections table. Things reading from those table will need to check whether the event was rejected. - If delete_existing is True then existing events will be purged from the - database before insertion. This is useful when retrying due to IntegrityError. + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): + events to persist + backfilled (bool): True if the events were backfilled + delete_existing (bool): True to purge existing table rows for the + events from the database. This is useful when retrying due to + IntegrityError. + current_state_for_room (dict[str, (list[str], list[str])]): + The current-state delta for each room. For each room, a tuple + (to_delete, to_insert), being a list of event ids to be removed + from the current state, and a list of event ids to be added to + the current state. + new_forward_extremeties (dict[str, list[str]]): + The new forward extremities for each room. For each room, a + list of the event ids which are the forward extremities. + """ + self._update_current_state_txn(txn, current_state_for_room) + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - for room_id, current_state_tuple in current_state_for_room.iteritems(): + self._update_forward_extremities_txn( + txn, + new_forward_extremities=new_forward_extremeties, + max_stream_order=max_stream_order, + ) + + # Ensure that we don't have the same event twice. + events_and_contexts = self._filter_events_and_contexts_for_duplicates( + events_and_contexts, + ) + + self._update_room_depths_txn( + txn, + events_and_contexts=events_and_contexts, + backfilled=backfilled, + ) + + # _update_outliers_txn filters out any events which have already been + # persisted, and returns the filtered list. + events_and_contexts = self._update_outliers_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # From this point onwards the events are only events that we haven't + # seen before. + + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + self._delete_existing_rows_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + self._store_event_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # _store_rejected_events_txn filters out any events which were + # rejected, and returns the filtered list. + events_and_contexts = self._store_rejected_events_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # From this point onwards the events are only ones that weren't + # rejected. + + self._update_metadata_tables_txn( + txn, + events_and_contexts=events_and_contexts, + backfilled=backfilled, + ) + + def _update_current_state_txn(self, txn, state_delta_by_room): + for room_id, current_state_tuple in state_delta_by_room.iteritems(): to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", @@ -608,7 +712,9 @@ class EventsStore(SQLBaseStore): txn, self.get_current_state_ids, (room_id,) ) - for room_id, new_extrem in new_forward_extremeties.items(): + def _update_forward_extremities_txn(self, txn, new_forward_extremities, + max_stream_order): + for room_id, new_extrem in new_forward_extremities.items(): self._simple_delete_txn( txn, table="event_forward_extremities", @@ -626,7 +732,7 @@ class EventsStore(SQLBaseStore): "event_id": ev_id, "room_id": room_id, } - for room_id, new_extrem in new_forward_extremeties.items() + for room_id, new_extrem in new_forward_extremities.items() for ev_id in new_extrem ], ) @@ -643,13 +749,22 @@ class EventsStore(SQLBaseStore): "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in new_forward_extremeties.items() + for room_id, new_extrem in new_forward_extremities.items() for event_id in new_extrem ] ) - # Ensure that we don't have the same event twice. - # Pick the earliest non-outlier if there is one, else the earliest one. + @classmethod + def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts): + """Ensure that we don't have the same event twice. + + Pick the earliest non-outlier if there is one, else the earliest one. + + Args: + events_and_contexts (list[(EventBase, EventContext)]): + Returns: + list[(EventBase, EventContext)]: filtered list + """ new_events_and_contexts = OrderedDict() for event, context in events_and_contexts: prev_event_context = new_events_and_contexts.get(event.event_id) @@ -662,9 +777,17 @@ class EventsStore(SQLBaseStore): new_events_and_contexts[event.event_id] = (event, context) else: new_events_and_contexts[event.event_id] = (event, context) + return new_events_and_contexts.values() - events_and_contexts = new_events_and_contexts.values() + def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): + """Update min_depth for each room + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled + """ depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -683,6 +806,21 @@ class EventsStore(SQLBaseStore): for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) + def _update_outliers_txn(self, txn, events_and_contexts): + """Update any outliers with new event info. + + This turns outliers into ex-outliers (unless the new event was + rejected). + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + + Returns: + list[(EventBase, EventContext)] new list, without events which + are already in the events table. + """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( ",".join(["?"] * len(events_and_contexts)), @@ -697,19 +835,16 @@ class EventsStore(SQLBaseStore): to_remove = set() for event, context in events_and_contexts: - if context.rejected: - # If the event is rejected then we don't care if the event - # was an outlier or not. - if event.event_id in have_persisted: - # If we have already seen the event then ignore it. - to_remove.add(event) - continue - if event.event_id not in have_persisted: continue to_remove.add(event) + if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + continue + outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: # We received a copy of an event that we had already stored as @@ -764,37 +899,19 @@ class EventsStore(SQLBaseStore): # event isn't an outlier any more. self._update_backward_extremeties(txn, [event]) - events_and_contexts = [ + return [ ec for ec in events_and_contexts if ec[0] not in to_remove ] + @classmethod + def _delete_existing_rows_txn(cls, txn, events_and_contexts): if not events_and_contexts: - # Make sure we don't pass an empty list to functions that expect to - # be storing at least one element. + # nothing to do here return - # From this point onwards the events are only events that we haven't - # seen before. + logger.info("Deleting existing") - def event_dict(event): - return { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - if delete_existing: - # For paranoia reasons, we go and delete all the existing entries - # for these events so we can reinsert them. - # This gets around any problems with some tables already having - # entries. - - logger.info("Deleting existing") - - for table in ( + for table in ( "events", "event_auth", "event_json", @@ -817,11 +934,34 @@ class EventsStore(SQLBaseStore): "redactions", "room_memberships", "topics" - ): - txn.executemany( - "DELETE FROM %s WHERE event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] - ) + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + + def _store_event_txn(self, txn, events_and_contexts): + """Insert new events into the event and event_json tables + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + """ + + if not events_and_contexts: + # nothing to do here + return + + def event_dict(event): + return { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } self._simple_insert_many_txn( txn, @@ -865,6 +1005,19 @@ class EventsStore(SQLBaseStore): ], ) + def _store_rejected_events_txn(self, txn, events_and_contexts): + """Add rows to the 'rejections' table for received events which were + rejected + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + + Returns: + list[(EventBase, EventContext)] new list, without the rejected + events. + """ # Remove the rejected events from the list now that we've added them # to the events table and the events_json table. to_remove = set() @@ -876,16 +1029,23 @@ class EventsStore(SQLBaseStore): ) to_remove.add(event) - events_and_contexts = [ + return [ ec for ec in events_and_contexts if ec[0] not in to_remove ] - if not events_and_contexts: - # Make sure we don't pass an empty list to functions that expect to - # be storing at least one element. - return + def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): + """Update all the miscellaneous tables for new events - # From this point onwards the events are only ones that weren't rejected. + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled + """ + + if not events_and_contexts: + # nothing to do here + return for event, context in events_and_contexts: # Insert all the push actions into the event_push_actions table. @@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore): # Prefill the event cache self._add_to_cache(txn, events_and_contexts) - if backfilled: - # Backfilled events come before the current state so we don't need - # to update the current state table - return - - return - def _add_to_cache(self, txn, events_and_contexts): to_prefill = [] From 0c01f829ae873bbc6451d5933ee77351ba20b93f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 14:30:16 +0000 Subject: [PATCH 04/11] Avoid resetting state on rejected events When we get a rejected event, give it the same state_group as its prev_event, rather than no state_group at all. This should fix https://github.com/matrix-org/synapse/issues/1935. --- synapse/storage/events.py | 8 ++++---- synapse/storage/state.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 42e433da85..4d3cfc336f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -645,6 +645,10 @@ class EventsStore(SQLBaseStore): events_and_contexts=events_and_contexts, ) + # Insert into the state_groups, state_groups_state, and + # event_to_state_groups tables. + self._store_mult_state_groups_txn(txn, events_and_contexts) + # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. events_and_contexts = self._store_rejected_events_txn( @@ -1075,10 +1079,6 @@ class EventsStore(SQLBaseStore): ], ) - # Insert into the state_groups, state_groups_state, and - # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, events_and_contexts) - # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 27f1ec89ec..1b42bea07a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -136,6 +136,16 @@ class StateStore(SQLBaseStore): continue if context.current_state_ids is None: + # AFAIK, this can never happen + logger.error( + "Non-outlier event %s had current_state_ids==None", + event.event_id) + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group continue state_groups[event.event_id] = context.state_group From d2d146a3143e94c5f68caadc3dbeff4452f86853 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 15:11:26 +0000 Subject: [PATCH 05/11] Logcontext docs --- docs/log_contexts.rst | 447 ++++++++++++++++++++++++++++++++++++- synapse/util/logcontext.py | 10 + 2 files changed, 449 insertions(+), 8 deletions(-) diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index 0046e171be..a5f59745e6 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -1,10 +1,441 @@ -What do I do about "Unexpected logging context" debug log-lines everywhere? +Log contexts +============ - The logging context lives in thread local storage - Sometimes it gets out of sync with what it should actually be, usually because something scheduled something to run on the reactor without preserving the logging context. - what is the impact of it getting out of sync? and how and when should we preserve log context? - The impact is that some of the CPU and database metrics will be under-reported, and some log lines will be mis-attributed. - It should happen auto-magically in all the APIs that do IO or otherwise defer to the reactor. - Mjark: the other place is if we branch, e.g. using defer.gatherResults +.. contents:: -Unanswered: how and when should we preserve log context? \ No newline at end of file +To help track the processing of individual requests, synapse uses a +'log context' to track which request it is handling at any given moment. This +is done via a thread-local variable; a ``logging.Filter`` is then used to fish +the information back out of the thread-local variable and add it to each log +record. + +Logcontexts are also used for CPU and database accounting, so that we can track +which requests were responsible for high CPU use or database activity. + +The ``synapse.util.logcontext`` module provides a facilities for managing the +current log context (as well as providing the ``LoggingContextFilter`` class). + +Deferreds make the whole thing complicated, so this document describes how it +all works, and how to write code which follows the rules. + +Logcontexts without Deferreds +----------------------------- + +In the absence of any Deferred voodoo, things are simple enough. As with any +code of this nature, the rule is that our function should leave things as it +found them: + +.. code:: python + + from synapse.util import logcontext # omitted from future snippets + + def handle_request(request_id): + request_context = logcontext.LoggingContext() + + calling_context = logcontext.LoggingContext.current_context() + logcontext.LoggingContext.set_current_context(request_context) + try: + request_context.request = request_id + do_request_handling() + logger.debug("finished") + finally: + logcontext.LoggingContext.set_current_context(calling_context) + + def do_request_handling(): + logger.debug("phew") # this will be logged against request_id + + +LoggingContext implements the context management methods, so the above can be +written much more succinctly as: + +.. code:: python + + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + do_request_handling() + logger.debug("finished") + + def do_request_handling(): + logger.debug("phew") + + +Using logcontexts with Deferreds +-------------------------------- + +Deferreds — and in particular, ``defer.inlineCallbacks`` — break +the linear flow of code so that there is no longer a single entry point where +we should set the logcontext and a single exit point where we should remove it. + +Consider the example above, where ``do_request_handling`` needs to do some +blocking operation, and returns a deferred: + +.. code:: python + + @defer.inlineCallbacks + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + yield do_request_handling() + logger.debug("finished") + + +In the above flow: + +* The logcontext is set +* ``do_request_handling`` is called, and returns a deferred +* ``handle_request`` yields the deferred +* The ``inlineCallbacks`` wrapper of ``handle_request`` returns a deferred + +So we have stopped processing the request (and will probably go on to start +processing the next), without clearing the logcontext. + +To circumvent this problem, synapse code assumes that, wherever you have a +deferred, you will want to yield on it. To that end, whereever functions return +a deferred, we adopt the following conventions: + +.. note:: Rules for functions returning deferreds: + + * If the deferred is already complete, the function returns with the same + logcontext it started with. + * If the deferred is incomplete, the function clears the logcontext before + returning; when the deferred completes, it restores the logcontext before + running any callbacks. + +That sounds complicated, but actually it means a lot of code (including the +example above) "just works". There are two cases: + +* If ``do_request_handling`` returns a completed deferred, then the logcontext + will still be in place. In this case, execution will continue immediately + after the ``yield``; the "finished" line will be logged against the right + context, and the ``with`` block restores the original context before we + return to the caller. + +* If the returned deferred is incomplete, ``do_request_handling`` clears the + logcontext before returning. The logcontext is therefore clear when + ``handle_request`` yields the deferred. At that point, the ``inlineCallbacks`` + wrapper adds a callback to the deferred, and returns another (incomplete) + deferred to the caller, and it is safe to begin processing the next request. + + Once ``do_request_handling``'s deferred completes, it will reinstate the + logcontext, before running the callback added by the ``inlineCallbacks`` + wrapper. That callback runs the second half of ``handle_request``, so again + the "finished" line will be logged against the right + context, and the ``with`` block restores the original context. + +As an aside, it's worth noting that ``handle_request`` follows our rules - +though that only matters if the caller has its own logcontext which it cares +about. + +The following sections describe pitfalls and helpful patterns when implementing +these rules. + +Always yield your deferreds +--------------------------- + +Whenever you get a deferred back from a function, you should ``yield`` on it +as soon as possible. (Returning it directly to your caller is ok too, if you're +not doing ``inlineCallbacks``.) Do not pass go; do not do any logging; do not +call any other functions. + +.. code:: python + + @defer.inlineCallbacks + def fun(): + logger.debug("starting") + yield do_some_stuff() # just like this + + d = more_stuff() + result = yield d # also fine, of course + + defer.returnValue(result) + + def nonInlineCallbacksFun(): + logger.debug("just a wrapper really") + return do_some_stuff() # this is ok too - the caller will yield on + # it anyway. + +Provided this pattern is followed all the way back up to the callchain to where +the logcontext was set, this will make things work out ok: provided +``do_some_stuff`` and ``more_stuff`` follow the rules above, then so will +``fun`` (as wrapped by ``inlineCallbacks``) and ``nonInlineCallbacksFun``. + +It's all too easy to forget to ``yield``: for instance if we forgot that +``do_some_stuff`` returned a deferred, we might plough on regardless. This +leads to a mess; it will probably work itself out eventually, but not before +a load of stuff has been logged against the wrong content. (Normally, other +things will break, more obviously, if you forget to ``yield``, so this tends +not to be a major problem in practice.) + +Of course sometimes you need to do something a bit fancier with your Deferreds +- not all code follows the linear A-then-B-then-C pattern. Notes on +implementing more complex patterns are in later sections. + +Where you create a new Deferred, make it follow the rules +--------------------------------------------------------- + +Most of the time, a Deferred comes from another synapse function. Sometimes, +though, we need to make up a new Deferred, or we get a Deferred back from +external code. We need to make it follow our rules. + +The easy way to do it is with a combination of ``defer.inlineCallbacks``, and +``logcontext.PreserveLoggingContext``. Suppose we want to implement ``sleep``, +which returns a deferred which will run its callbacks after a given number of +seconds. That might look like: + +.. code:: python + + # not a logcontext-rules-compliant function + def get_sleep_deferred(seconds): + d = defer.Deferred() + reactor.callLater(seconds, d.callback, None) + return d + +That doesn't follow the rules, but we can fix it by wrapping it with +``PreserveLoggingContext`` and ``yield`` ing on it: + +.. code:: python + + @defer.inlineCallbacks + def sleep(seconds): + with PreserveLoggingContext(): + yield get_sleep_deferred(seconds) + +This technique works equally for external functions which return deferreds, +or deferreds we have made ourselves. + +XXX: think this is what ``preserve_context_over_deferred`` is supposed to do, +though it is broken, in that it only restores the logcontext for the duration +of the callbacks, which doesn't comply with the logcontext rules. + +Fire-and-forget +--------------- + +Sometimes you want to fire off a chain of execution, but not wait for its +result. That might look a bit like this: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + # *don't* do this + background_operation() + + logger.debug("Request handling complete") + + @defer.inlineCallbacks + def background_operation(): + yield first_background_step() + logger.debug("Completed first step") + yield second_background_step() + logger.debug("Completed second step") + +The above code does a couple of steps in the background after +``do_request_handling`` has finished. The log lines are still logged against +the ``request_context`` logcontext, which may or may not be desirable. There +are two big problems with the above, however. The first problem is that, if +``background_operation`` returns an incomplete Deferred, it will expect its +caller to ``yield`` immediately, so will have cleared the logcontext. In this +example, that means that 'Request handling complete' will be logged without any +context. + +The second problem, which is potentially even worse, is that when the Deferred +returned by ``background_operation`` completes, it will restore the original +logcontext. There is nothing waiting on that Deferred, so the logcontext will +leak into the reactor and possibly get attached to some arbitrary future +operation. + +There are two potential solutions to this. + +One option is to surround the call to ``background_operation`` with a +``PreserveLoggingContext`` call. That will reset the logcontext before +starting ``background_operation`` (so the context restored when the deferred +completes will be the empty logcontext), and will restore the current +logcontext before continuing the foreground process: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + # start background_operation off in the empty logcontext, to + # avoid leaking the current context into the reactor. + with PreserveLoggingContext(): + background_operation() + + # this will now be logged against the request context + logger.debug("Request handling complete") + +Obviously that option means that the operations done in +``background_operation`` would be not be logged against a logcontext (though +that might be fixed by setting a different logcontext via a ``with +LoggingContext(...)`` in ``background_operation``). + +The second option is to use ``logcontext.preserve_fn``, which wraps a function +so that it doesn't reset the logcontext even when it returns an incomplete +deferred, and adds a callback to the returned deferred to reset the +logcontext. In other words, it turns a function that follows the Synapse rules +about logcontexts and Deferreds into one which behaves more like an external +function — the opposite operation to that described in the previous section. +It can be used like this: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + logcontext.preserve_fn(background_operation)() + + # this will now be logged against the request context + logger.debug("Request handling complete") + +XXX: I think ``preserve_context_over_fn`` is supposed to do the first option, +but the fact that it does ``preserve_context_over_deferred`` on its results +means that its use is fraught with difficulty. + +Passing synapse deferreds into third-party functions +---------------------------------------------------- + +A typical example of this is where we want to collect together two or more +deferred via ``defer.gatherResults``: + +.. code:: python + + d1 = operation1() + d2 = operation2() + d3 = defer.gatherResults([d1, d2]) + +This is really a variation of the fire-and-forget problem above, in that we are +firing off ``d1`` and ``d2`` without yielding on them. The difference +is that we now have third-party code attached to their callbacks. Anyway either +technique given in the `Fire-and-forget`_ section will work. + +Of course, the new Deferred returned by ``gatherResults`` needs to be wrapped +in order to make it follow the logcontext rules before we can yield it, as +described in `Where you create a new Deferred, make it follow the rules`_. + +So, option one: reset the logcontext before starting the operations to be +gathered: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + with PreserveLoggingContext(): + d1 = operation1() + d2 = operation2() + result = yield defer.gatherResults([d1, d2]) + +In this case particularly, though, option two, of using +``logcontext.preserve.fn`` almost certainly makes more sense, so that +``operation1`` and ``operation2`` are both logged against the original +logcontext. This looks like: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + d1 = logcontext.preserve_fn(operation1)() + d2 = logcontext.preserve_fn(operation2)() + + with PreserveLoggingContext(): + result = yield defer.gatherResults([d1, d2]) + + +Was all this really necessary? +------------------------------ + +The conventions used work fine for a linear flow where everything happens in +series via ``defer.inlineCallbacks`` and ``yield``, but are certainly tricky to +follow for any more exotic flows. It's hard not to wonder if we could have done +something else. + +We're not going to rewrite Synapse now, so the following is entirely of +acadamic interest, but I'd like to record some thoughts on an alternative +approach. + +I briefly prototyped some code following an alternative set of rules. I think +it would work, but I certainly didn't get as far as thinking how it would +interact with concepts as complicated as the cache descriptors. + +My alternative rules were: + +* functions always preserve the logcontext of their caller, whether or not they + are returning a Deferred. + +* Deferreds returned by synapse functions run their callbacks in the same + context as the function was orignally called in. + +The main point of this scheme is that everywhere that sets the logcontext is +responsible for clearing it before returning control to the reactor. + +So, for example, if you were the function which started a ``with +LoggingContext`` block, you wouldn't ``yield`` within it — instead you'd start +off the background process, and then leave the ``with`` block to wait for it: + +.. code:: python + + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + d = do_request_handling() + + def cb(r): + logger.debug("finished") + + d.addCallback(cb) + return d + +(in general, mixing ``with LoggingContext`` blocks and +``defer.inlineCallbacks`` in the same function leads to slighly +counter-intuitive code, under this scheme). + +Because we leave the original ``with`` block as soon as the Deferred is +returned (as opposed to waiting for it to be resolved, as we do today), the +logcontext is cleared before control passes back to the reactor; so if there is +some code within ``do_request_handling`` which needs to wait for a Deferred to +complete, there is no need for it to worry about clearing the logcontext before +doing so: + +.. code:: python + + def handle_request(): + r = do_some_stuff() + r.addCallback(do_some_more_stuff) + return r + +— and provided ``do_some_stuff`` follows the rules of returning a Deferred which +runs its callbacks in the original logcontext, all is happy. + +The business of a Deferred which runs its callbacks in the original logcontext +isn't hard to achieve — we have it today, in the shape of +``logcontext._PreservingContextDeferred``: + +.. code:: python + + def do_some_stuff(): + deferred = do_some_io() + pcd = _PreservingContextDeferred(LoggingContext.current_context()) + deferred.chainDeferred(pcd) + return pcd + +It turns out that, thanks to the way that Deferreds chain together, we +automatically get the property of a context-preserving deferred with +``defer.inlineCallbacks``, provided the final Defered the function ``yields`` +on has that property. So we can just write: + +.. code:: python + + @defer.inlineCallbacks + def handle_request(): + yield do_some_stuff() + yield do_some_more_stuff() + +To conclude: I think this scheme would have worked equally well, with less +danger of messing it up, and probably made some more esoteric code easier to +write. But again — changing the conventions of the entire Synapse codebase is +not a sensible option for the marginal improvement offered. diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d73670f9f2..1135a683af 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -12,6 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" Thread-local-alike tracking of log contexts within synapse + +This module provides objects and utilities for tracking contexts through +synapse code, so that log lines can include a request identifier, and so that +CPU and database activity can be accounted for against the request that caused +them. + +See doc/log_contexts.rst for details on how this works. +""" + from twisted.internet import defer import threading From 994d7ae7c5e96ae411d7d7e6660c5a2172b69146 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 15:28:35 +0000 Subject: [PATCH 06/11] Remove broken use of clock.call_later background_updates was using `call_later` in a way that leaked the logcontext into the reactor. We could have rewritten it to do it properly, but given that we weren't using the fancier facilities provided by `call_later`, we might as well just use `async.sleep`, which does the logcontext stuff properly. --- synapse/storage/background_updates.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 94b2bcc54a..813ad59e56 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import synapse.util.async from ._base import SQLBaseStore from . import engines @@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} - self._background_update_timer = None @defer.inlineCallbacks def start_doing_background_updates(self): - assert self._background_update_timer is None, \ - "background updates already running" - logger.info("Starting background schema updates") while True: - sleep = defer.Deferred() - self._background_update_timer = self._clock.call_later( - self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None - ) - try: - yield sleep - finally: - self._background_update_timer = None + yield synapse.util.async.sleep( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) try: result = yield self.do_next_background_update( From 067b00d49d25d6994e851fc362328a164ace85b2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 15:33:04 +0000 Subject: [PATCH 07/11] Run the reactor with the sentinel logcontext This fixes a class of 'Unexpected logcontext' messages, which were happening because the logcontext was somewhat arbitrarily swapping between the sentinel and the `run` logcontext. --- synapse/app/appservice.py | 8 ++++++-- synapse/app/client_reader.py | 8 ++++++-- synapse/app/federation_reader.py | 8 ++++++-- synapse/app/federation_sender.py | 8 ++++++-- synapse/app/homeserver.py | 9 +++++++-- synapse/app/media_repository.py | 8 ++++++-- synapse/app/pusher.py | 9 +++++++-- synapse/app/synchrotron.py | 9 +++++++-- 8 files changed, 51 insertions(+), 16 deletions(-) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 83ee3e3ce3..a6f1e7594e 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -187,7 +187,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 7ed0de4117..a821a6ce62 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -193,7 +193,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index ca742de6b2..e52b0f240d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -31,7 +31,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -184,7 +184,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cf5b196e6..76c4cc54d1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -193,7 +193,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 0b9d78c13c..2cdd2d39ff 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -52,7 +52,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX @@ -456,7 +456,12 @@ def run(hs): def in_thread(): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) - with LoggingContext("run"): + + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index c5579d9e38..3c7984a237 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -190,7 +190,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index b025db54d4..ab682e52ec 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -275,7 +276,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 449fac771b..a68bb873fe 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,7 +48,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -496,7 +497,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: From f40c2db05ae5e76428c97f1e194fe7d843913054 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 20:56:54 +0000 Subject: [PATCH 08/11] Stop preserve_fn leaking context into the reactor Fix a bug in ``logcontext.preserve_fn`` which made it leak context into the reactor, and add a test for it. Also, get rid of ``logcontext.reset_context_after_deferred``, which tried to do the same thing but had its own, different, set of bugs. --- synapse/handlers/federation.py | 5 +-- synapse/util/logcontext.py | 61 ++++++++++++++++------------------ tests/util/test_log_context.py | 61 ++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 34 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0cd5501b05..6157204924 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -933,8 +933,9 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - synapse.util.logcontext.reset_context_after_deferred( - self._handle_queued_pdus(room_queue)) + synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( + room_queue + ) defer.returnValue(True) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d73670f9f2..7cbe390b15 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -308,47 +308,44 @@ def preserve_context_over_deferred(deferred, context=None): return d -def reset_context_after_deferred(deferred): - """If the deferred is incomplete, add a callback which will reset the - context. +def preserve_fn(f): + """Wraps a function, to ensure that the current context is restored after + return from the function, and that the sentinel context is set once the + deferred returned by the funtion completes. - This is useful when you want to fire off a deferred, but don't want to - wait for it to complete. (The deferred will restore the current log context - when it completes, so if you don't do anything, it will leak log context.) - - (If this feels asymmetric, consider it this way: we are effectively forking - a new thread of execution. We are probably currently within a - ``with LoggingContext()`` block, which is supposed to have a single entry - and exit point. But by spawning off another deferred, we are effectively - adding a new exit point.) - - Args: - deferred (defer.Deferred): deferred + Useful for wrapping functions that return a deferred which you don't yield + on. """ def reset_context(result): LoggingContext.set_current_context(LoggingContext.sentinel) return result - if not deferred.called: - deferred.addBoth(reset_context) - - -def preserve_fn(f): - """Ensures that function is called with correct context and that context is - restored after return. Useful for wrapping functions that return a deferred - which you don't yield on. - """ + # XXX: why is this here rather than inside g? surely we want to preserve + # the context from the time the function was called, not when it was + # wrapped? current = LoggingContext.current_context() def g(*args, **kwargs): - with PreserveLoggingContext(current): - res = f(*args, **kwargs) - if isinstance(res, defer.Deferred): - return preserve_context_over_deferred( - res, context=LoggingContext.sentinel - ) - else: - return res + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred) and not res.called: + # The function will have reset the context before returning, so + # we need to restore it now. + LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(reset_context) + return res return g diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py index 65a330a0e9..9ffe209c4d 100644 --- a/tests/util/test_log_context.py +++ b/tests/util/test_log_context.py @@ -1,8 +1,10 @@ +import twisted.python.failure from twisted.internet import defer from twisted.internet import reactor from .. import unittest from synapse.util.async import sleep +from synapse.util import logcontext from synapse.util.logcontext import LoggingContext @@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase): context_one.test_key = "one" yield sleep(0) self._check_test_key("one") + + def _test_preserve_fn(self, function): + sentinel_context = LoggingContext.current_context() + + callback_completed = [False] + + @defer.inlineCallbacks + def cb(): + context_one.test_key = "one" + yield function() + self._check_test_key("one") + + callback_completed[0] = True + + with LoggingContext() as context_one: + context_one.test_key = "one" + + # fire off function, but don't wait on it. + logcontext.preserve_fn(cb)() + + self._check_test_key("one") + + # now wait for the function under test to have run, and check that + # the logcontext is left in a sane state. + d2 = defer.Deferred() + + def check_logcontext(): + if not callback_completed[0]: + reactor.callLater(0.01, check_logcontext) + return + + # make sure that the context was reset before it got thrown back + # into the reactor + try: + self.assertIs(LoggingContext.current_context(), + sentinel_context) + d2.callback(None) + except BaseException: + d2.errback(twisted.python.failure.Failure()) + + reactor.callLater(0.01, check_logcontext) + + # test is done once d2 finishes + return d2 + + def test_preserve_fn_with_blocking_fn(self): + @defer.inlineCallbacks + def blocking_function(): + yield sleep(0) + + return self._test_preserve_fn(blocking_function) + + def test_preserve_fn_with_non_blocking_fn(self): + @defer.inlineCallbacks + def nonblocking_function(): + with logcontext.PreserveLoggingContext(): + yield defer.succeed(None) + + return self._test_preserve_fn(nonblocking_function) From d78d08981a6b4f5fe2e2395050e411d0ceedfbb0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 18 Mar 2017 22:47:37 +0000 Subject: [PATCH 09/11] log_contexts.rst: fix typos --- docs/log_contexts.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index a5f59745e6..ffe438c614 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -331,7 +331,7 @@ gathered: result = yield defer.gatherResults([d1, d2]) In this case particularly, though, option two, of using -``logcontext.preserve.fn`` almost certainly makes more sense, so that +``logcontext.preserve_fn`` almost certainly makes more sense, so that ``operation1`` and ``operation2`` are both logged against the original logcontext. This looks like: @@ -355,7 +355,7 @@ follow for any more exotic flows. It's hard not to wonder if we could have done something else. We're not going to rewrite Synapse now, so the following is entirely of -acadamic interest, but I'd like to record some thoughts on an alternative +academic interest, but I'd like to record some thoughts on an alternative approach. I briefly prototyped some code following an alternative set of rules. I think From a4cb21659b0c00c3d04a8a956f413b8cd7a4a238 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Mar 2017 11:59:49 +0000 Subject: [PATCH 10/11] log_contexts.rst: fix formatting of Note block Apparently the github RST renderer doesn't like Note blocks. --- docs/log_contexts.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index ffe438c614..8d04a973de 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -94,13 +94,13 @@ To circumvent this problem, synapse code assumes that, wherever you have a deferred, you will want to yield on it. To that end, whereever functions return a deferred, we adopt the following conventions: -.. note:: Rules for functions returning deferreds: +**Rules for functions returning deferreds:** - * If the deferred is already complete, the function returns with the same - logcontext it started with. - * If the deferred is incomplete, the function clears the logcontext before - returning; when the deferred completes, it restores the logcontext before - running any callbacks. + * If the deferred is already complete, the function returns with the same + logcontext it started with. + * If the deferred is incomplete, the function clears the logcontext before + returning; when the deferred completes, it restores the logcontext before + running any callbacks. That sounds complicated, but actually it means a lot of code (including the example above) "just works". There are two cases: From c36d15d2de12a44b08e0b14889de997b6623df5b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Mar 2017 15:36:14 +0000 Subject: [PATCH 11/11] Add some debug to help diagnose weird federation issue --- synapse/crypto/keyring.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d7211ee9b3..80f27f8c53 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -96,10 +96,11 @@ class Keyring(object): verify_requests = [] for server_name, json_object in server_and_json: - logger.debug("Verifying for %s", server_name) key_ids = signature_ids(json_object, server_name) if not key_ids: + logger.warn("Request from %s: no supported signature keys", + server_name) deferred = defer.fail(SynapseError( 400, "Not signed with a supported algorithm", @@ -108,6 +109,9 @@ class Keyring(object): else: deferred = defer.Deferred() + logger.debug("Verifying for %s with key_ids %s", + server_name, key_ids) + verify_request = VerifyKeyRequest( server_name, key_ids, json_object, deferred ) @@ -142,6 +146,9 @@ class Keyring(object): json_object = verify_request.json_object + logger.debug("Got key %s %s:%s for server %s, verifying" % ( + key_id, verify_key.alg, verify_key.version, server_name, + )) try: verify_signed_json(json_object, server_name, verify_key) except: