From d18f37e026a02b4e899bc96e600850007a613189 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:32:21 +0100 Subject: [PATCH] Collect the invalidate callbacks on the transaction object rather than passing around a separate list --- synapse/storage/_base.py | 18 ++++++++--- synapse/storage/event_federation.py | 10 +++--- synapse/storage/events.py | 48 +++++++++++++---------------- synapse/storage/room.py | 4 +-- synapse/storage/roommember.py | 8 ++--- synapse/storage/signatures.py | 12 ++++---- synapse/storage/state.py | 2 +- 7 files changed, 51 insertions(+), 51 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 579ed56377..ccf9697fa3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -185,12 +185,16 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name", "database_engine"] + __slots__ = ["txn", "name", "database_engine", "after_callbacks"] - def __init__(self, txn, name, database_engine): + def __init__(self, txn, name, database_engine, after_callbacks): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) object.__setattr__(self, "database_engine", database_engine) + object.__setattr__(self, "after_callbacks", after_callbacks) + + def call_after(self, callback, *args): + self.after_callbacks.append((callback, args)) def __getattr__(self, name): return getattr(self.txn, name) @@ -336,6 +340,8 @@ class SQLBaseStore(object): start_time = time.time() * 1000 + after_callbacks = [] + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: if self.database_engine.is_connection_closed(conn): @@ -360,10 +366,10 @@ class SQLBaseStore(object): while True: try: txn = conn.cursor() - return func( - LoggingTransaction(txn, name, self.database_engine), - *args, **kwargs + txn = LoggingTransaction( + txn, name, self.database_engine, after_callbacks ) + return func(txn, *args, **kwargs) except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -412,6 +418,8 @@ class SQLBaseStore(object): result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) def cursor_to_dict(self, cursor): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3cd3fbdc9b..893344eff3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth): + def _update_min_depth_for_room_txn(self, txn, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) do_insert = depth < min_depth if min_depth else True @@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, invalidates, outlier, event_id, - prev_events, room_id): + def _handle_prev_events(self, txn, outlier, event_id, prev_events, + room_id): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -330,9 +330,9 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - invalidates.append(( + txn.call_after( self.get_latest_event_ids_in_room.invalidate, room_id - )) + ) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7dc49ceed6..17f9d27289 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_token try: - invalidates = yield self.runInteraction( + yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, @@ -52,11 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - for invalidated in invalidates: - invalidated_callback = invalidated[0] - invalidated_args = invalidated[1:] - invalidated_callback(*invalidated_args) - except _RollbackButIsFineException: pass @@ -96,10 +91,9 @@ class EventsStore(SQLBaseStore): def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): - invalidates = [] # Remove the any existing cache entries for the event_id - invalidates.append((self._invalidate_get_event_cache, event.event_id)) + txn.call_after(self._invalidate_get_event_cache, event.event_id) if stream_ordering is None: with self._stream_id_gen.get_next_txn(txn) as stream_ordering: @@ -121,10 +115,12 @@ class EventsStore(SQLBaseStore): for s in current_state: if s.type == EventTypes.Member: - invalidates.extend([ - (self.get_rooms_for_user.invalidate, s.state_key), - (self.get_joined_hosts_for_room.invalidate, s.room_id), - ]) + txn.call_after( + self.get_rooms_for_user.invalidate, s.state_key + ) + txn.call_after( + self.get_joined_hosts_for_room.invalidate, s.room_id + ) self._simple_insert_txn( txn, "current_state_events", @@ -161,11 +157,10 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, invalidates, event, context) + self._store_state_groups_txn(txn, event, context) self._update_min_depth_for_room_txn( txn, - invalidates, event.room_id, event.depth ) @@ -207,11 +202,10 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - return invalidates + return self._handle_prev_events( txn, - invalidates, outlier=outlier, event_id=event.event_id, prev_events=event.prev_events, @@ -219,13 +213,13 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Member: - self._store_room_member_txn(txn, invalidates, event) + self._store_room_member_txn(txn, event) elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, invalidates, event) + self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, invalidates, event) + self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Redaction: - self._store_redaction(txn, invalidates, event) + self._store_redaction(txn, event) event_dict = { k: v @@ -295,20 +289,20 @@ class EventsStore(SQLBaseStore): if context.rejected: self._store_rejections_txn( - txn, invalidates, event.event_id, context.rejected + txn, event.event_id, context.rejected ) for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( - txn, invalidates, event.event_id, hash_alg, hash_bytes, + txn, event.event_id, hash_alg, hash_bytes, ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_prev_event_hash_txn( - txn, invalidates, event.event_id, prev_event_id, alg, + txn, event.event_id, prev_event_id, alg, hash_bytes ) @@ -325,7 +319,7 @@ class EventsStore(SQLBaseStore): (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( - txn, invalidates, event.event_id, ref_alg, ref_hash_bytes + txn, event.event_id, ref_alg, ref_hash_bytes ) if event.is_state(): @@ -372,11 +366,11 @@ class EventsStore(SQLBaseStore): } ) - return invalidates + return - def _store_redaction(self, txn, invalidates, event): + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event - invalidates.append((self._invalidate_get_event_cache, event.redacts)) + txn.call_after(self._invalidate_get_event_cache, event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index d42d7ff0e3..f956377632 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic_txn(self, txn, invalidates, event): + def _store_room_topic_txn(self, txn, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( txn, @@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore): }, ) - def _store_room_name_txn(self, txn, invalidates, event): + def _store_room_name_txn(self, txn, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 117da817ba..839c74f63a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, invalidates, event): + def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ try: @@ -64,10 +64,8 @@ class RoomMemberStore(SQLBaseStore): } ) - invalidates.extend([ - (self.get_rooms_for_user.invalidate, target_user_id), - (self.get_joined_hosts_for_room.invalidate, event.room_id), - ]) + txn.call_after(self.get_rooms_for_user.invalidate, target_user_id) + txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index e3979846e7..f051828630 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return dict(txn.fetchall()) - def _store_event_content_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_content_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_reference_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore): hashes[algorithm] = hash_bytes return results - def _store_prev_event_hash_txn(self, txn, invalidates, event_id, - prev_event_id, algorithm, hash_bytes): + def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, + algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 35d11c27cc..7e55e8bed6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): f, ) - def _store_state_groups_txn(self, txn, invalidates, event, context): + def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return