From 43c2e8deae5f7e2b339ab5c131391231886cad09 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 15:13:25 +0100 Subject: [PATCH 1/4] Add support for using executemany --- synapse/storage/_base.py | 54 ++++++++++++++++++++++------- synapse/storage/event_federation.py | 40 +++++++++++---------- synapse/storage/events.py | 46 +++++++++++++----------- synapse/storage/state.py | 16 +++++---- tests/storage/test_base.py | 4 +-- 5 files changed, 99 insertions(+), 61 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e01c61d08d..b7c3cf03c8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -160,18 +160,23 @@ class LoggingTransaction(object): def __setattr__(self, name, value): setattr(self.txn, name, value) - def execute(self, sql, *args, **kwargs): + def execute(self, sql, *args): + self._do_execute(self.txn.execute, sql, *args) + + def executemany(self, sql, *args): + self._do_execute(self.txn.executemany, sql, *args) + + def _do_execute(self, func, sql, *args): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) sql = self.database_engine.convert_param_style(sql) - if args and args[0]: + if args: try: sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), - self.name, - *args[0] + "[SQL values] {%s} %r", + self.name, args[0] ) except: # Don't let logging failures stop SQL from working @@ -180,8 +185,8 @@ class LoggingTransaction(object): start = time.time() * 1000 try: - return self.txn.execute( - sql, *args, **kwargs + return func( + sql, *args ) except Exception as e: logger.debug("[SQL FAIL] {%s} %s", self.name, e) @@ -434,18 +439,41 @@ class SQLBaseStore(object): @log_function def _simple_insert_txn(self, txn, table, values): + keys, vals = zip(*values.items()) + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, - ", ".join(k for k in values), - ", ".join("?" for k in values) + ", ".join(k for k in keys), + ", ".join("?" for _ in keys) ) - logger.debug( - "[SQL] %s Args=%s", - sql, values.values(), + txn.execute(sql, vals) + + def _simple_insert_many_txn(self, txn, table, values): + if not values: + return + + keys, vals = zip(*[ + zip( + *(sorted(i.items(), key=lambda kv: kv[0])) + ) + for i in values + if i + ]) + + for k in keys: + if k != keys[0]: + raise RuntimeError( + "All items must have the same keys" + ) + + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( + table, + ", ".join(k for k in keys[0]), + ", ".join("?" for _ in keys[0]) ) - txn.execute(sql, values.values()) + txn.executemany(sql, vals) def _simple_upsert(self, table, keyvalues, values, insertion_values={}, desc="_simple_upsert", lock=True): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 68f39bd684..0aca4ba17b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -262,18 +262,19 @@ class EventFederationStore(SQLBaseStore): For the given event, update the event edges table and forward and backward extremities tables. """ - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, + self._simple_insert_many_txn( + txn, table="event_edges", - values={ - "event_id": event_id, - "prev_event_id": e_id, - "room_id": room_id, - "is_state": False, - }, - ) + values=[ + { + "event_id": event_id, + "prev_event_id": e_id, + "room_id": room_id, + "is_state": False, + } + for e_id, _ in prev_events + ], + ) # Update the extremities table if this is not an outlier. if not outlier: @@ -307,16 +308,17 @@ class EventFederationStore(SQLBaseStore): # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, - table="event_backward_extremities", - values={ + self._simple_insert_many_txn( + txn, + table="event_backward_extremities", + values=[ + { "event_id": e_id, "room_id": room_id, - }, - ) + } + for e_id, _ in prev_events + ], + ) # Also delete from the backwards extremities table all ones that # reference events that we have already seen diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3c260ddc4..84e446a99c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -113,17 +113,19 @@ class EventsStore(SQLBaseStore): keyvalues={"room_id": event.room_id}, ) - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", + self._simple_insert_many_txn( + txn, + "current_state_events", + [ { "event_id": s.event_id, "room_id": s.room_id, "type": s.type, "state_key": s.state_key, - }, - ) + } + for s in current_state + ], + ) if event.is_state() and is_new_state: if not backfilled and not context.rejected: @@ -296,16 +298,18 @@ class EventsStore(SQLBaseStore): txn, event.event_id, prev_event_id, alg, hash_bytes ) - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ + self._simple_insert_many_txn( + txn, + table="event_auth", + values=[ + { "event_id": event.event_id, "room_id": event.room_id, "auth_id": auth_id, - }, - ) + } + for auth_id, _ in event.auth_events + ], + ) (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( @@ -330,17 +334,19 @@ class EventsStore(SQLBaseStore): vals, ) - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { "event_id": event.event_id, "prev_event_id": e_id, "room_id": event.room_id, "is_state": True, - }, - ) + } + for e_id, h in event.prev_state + ], + ) if is_new_state and not context.rejected: self._simple_upsert_txn( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7e55e8bed6..dbc0e49c1f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -104,18 +104,20 @@ class StateStore(SQLBaseStore): }, ) - for state in state_events.values(): - self._simple_insert_txn( - txn, - table="state_groups_state", - values={ + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { "state_group": state_group, "room_id": state.room_id, "type": state.type, "state_key": state.state_key, "event_id": state.event_id, - }, - ) + } + for state in state_events.values() + ], + ) self._simple_insert_txn( txn, diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index a64d2b821e..8c348ecc95 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -67,7 +67,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.mock_txn.execute.assert_called_with( "INSERT INTO tablename (columname) VALUES(?)", - ["Value"] + ("Value",) ) @defer.inlineCallbacks @@ -82,7 +82,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.mock_txn.execute.assert_called_with( "INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)", - [1, 2, 3] + (1, 2, 3,) ) @defer.inlineCallbacks From bdcd7693c8b954c9a7895339d4727c17221d4d9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 15:14:48 +0100 Subject: [PATCH 2/4] Fix indentation --- synapse/storage/event_federation.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0aca4ba17b..36b1feac60 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -264,16 +264,16 @@ class EventFederationStore(SQLBaseStore): """ self._simple_insert_many_txn( txn, - table="event_edges", - values=[ - { - "event_id": event_id, - "prev_event_id": e_id, - "room_id": room_id, - "is_state": False, - } - for e_id, _ in prev_events - ], + table="event_edges", + values=[ + { + "event_id": event_id, + "prev_event_id": e_id, + "room_id": room_id, + "is_state": False, + } + for e_id, _ in prev_events + ], ) # Update the extremities table if this is not an outlier. From 995154239358af589146ab4697e7cb4f100e2d84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 17:06:55 +0100 Subject: [PATCH 3/4] Add a comment about the zip(*[zip(sorted(...),...)]) --- synapse/storage/_base.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b7c3cf03c8..94946587f5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -453,6 +453,14 @@ class SQLBaseStore(object): if not values: return + # This is a *slight* abomination to get a list of tuples of key names + # and a list of tuples of value names. + # + # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}] + # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)] + # + # The sort is to ensure that we don't rely on dictionary iteration + # order. keys, vals = zip(*[ zip( *(sorted(i.items(), key=lambda kv: kv[0])) From 0cf7e480b442f9f893b782ab1a437b556c1bbb54 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 18:20:01 +0100 Subject: [PATCH 4/4] And use buffer(...) there as well --- synapse/federation/persistence.py | 2 +- synapse/storage/transactions.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 865766eb2c..1a7cc02f92 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -99,5 +99,5 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, response_code, - encode_canonical_json(response_dict) + response_dict, ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b5b21a9b13..624da4a9dc 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -162,7 +162,8 @@ class TransactionStore(SQLBaseStore): return self.runInteraction( "delivered_txn", self._delivered_txn, - transaction_id, destination, code, response_dict + transaction_id, destination, code, + buffer(encode_canonical_json(response_dict)), ) def _delivered_txn(self, txn, transaction_id, destination,