Use execute_batch instead of executemany in places (#9181)
`execute_batch` does fewer round trips in postgres than `executemany`, but does not give a correct `txn.rowcount` result after.pull/9188/head
parent
1fa15b74e0
commit
eee6fcf5fa
|
@ -0,0 +1 @@
|
||||||
|
Speed up batch insertion when using PostgreSQL.
|
|
@ -267,8 +267,7 @@ class LoggingTransaction:
|
||||||
|
|
||||||
self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
|
self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
|
||||||
else:
|
else:
|
||||||
for val in args:
|
self.executemany(sql, args)
|
||||||
self.execute(sql, val)
|
|
||||||
|
|
||||||
def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
|
def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
|
||||||
"""Corresponds to psycopg2.extras.execute_values. Only available when
|
"""Corresponds to psycopg2.extras.execute_values. Only available when
|
||||||
|
@ -888,7 +887,7 @@ class DatabasePool:
|
||||||
", ".join("?" for _ in keys[0]),
|
", ".join("?" for _ in keys[0]),
|
||||||
)
|
)
|
||||||
|
|
||||||
txn.executemany(sql, vals)
|
txn.execute_batch(sql, vals)
|
||||||
|
|
||||||
async def simple_upsert(
|
async def simple_upsert(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -876,7 +876,7 @@ class PersistEventsStore:
|
||||||
WHERE room_id = ? AND type = ? AND state_key = ?
|
WHERE room_id = ? AND type = ? AND state_key = ?
|
||||||
)
|
)
|
||||||
"""
|
"""
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
sql,
|
sql,
|
||||||
(
|
(
|
||||||
(
|
(
|
||||||
|
@ -895,7 +895,7 @@ class PersistEventsStore:
|
||||||
)
|
)
|
||||||
# Now we actually update the current_state_events table
|
# Now we actually update the current_state_events table
|
||||||
|
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
"DELETE FROM current_state_events"
|
"DELETE FROM current_state_events"
|
||||||
" WHERE room_id = ? AND type = ? AND state_key = ?",
|
" WHERE room_id = ? AND type = ? AND state_key = ?",
|
||||||
(
|
(
|
||||||
|
@ -907,7 +907,7 @@ class PersistEventsStore:
|
||||||
# We include the membership in the current state table, hence we do
|
# We include the membership in the current state table, hence we do
|
||||||
# a lookup when we insert. This assumes that all events have already
|
# a lookup when we insert. This assumes that all events have already
|
||||||
# been inserted into room_memberships.
|
# been inserted into room_memberships.
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
"""INSERT INTO current_state_events
|
"""INSERT INTO current_state_events
|
||||||
(room_id, type, state_key, event_id, membership)
|
(room_id, type, state_key, event_id, membership)
|
||||||
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
||||||
|
@ -927,7 +927,7 @@ class PersistEventsStore:
|
||||||
# we have no record of the fact the user *was* a member of the
|
# we have no record of the fact the user *was* a member of the
|
||||||
# room but got, say, state reset out of it.
|
# room but got, say, state reset out of it.
|
||||||
if to_delete or to_insert:
|
if to_delete or to_insert:
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
"DELETE FROM local_current_membership"
|
"DELETE FROM local_current_membership"
|
||||||
" WHERE room_id = ? AND user_id = ?",
|
" WHERE room_id = ? AND user_id = ?",
|
||||||
(
|
(
|
||||||
|
@ -938,7 +938,7 @@ class PersistEventsStore:
|
||||||
)
|
)
|
||||||
|
|
||||||
if to_insert:
|
if to_insert:
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
"""INSERT INTO local_current_membership
|
"""INSERT INTO local_current_membership
|
||||||
(room_id, user_id, event_id, membership)
|
(room_id, user_id, event_id, membership)
|
||||||
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
||||||
|
@ -1738,7 +1738,7 @@ class PersistEventsStore:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if events_and_contexts:
|
if events_and_contexts:
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
sql,
|
sql,
|
||||||
(
|
(
|
||||||
(
|
(
|
||||||
|
@ -1767,7 +1767,7 @@ class PersistEventsStore:
|
||||||
|
|
||||||
# Now we delete the staging area for *all* events that were being
|
# Now we delete the staging area for *all* events that were being
|
||||||
# persisted.
|
# persisted.
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
|
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
|
||||||
((event.event_id,) for event, _ in all_events_and_contexts),
|
((event.event_id,) for event, _ in all_events_and_contexts),
|
||||||
)
|
)
|
||||||
|
@ -1886,7 +1886,7 @@ class PersistEventsStore:
|
||||||
" )"
|
" )"
|
||||||
)
|
)
|
||||||
|
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
query,
|
query,
|
||||||
[
|
[
|
||||||
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
|
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
|
||||||
|
@ -1900,7 +1900,7 @@ class PersistEventsStore:
|
||||||
"DELETE FROM event_backward_extremities"
|
"DELETE FROM event_backward_extremities"
|
||||||
" WHERE event_id = ? AND room_id = ?"
|
" WHERE event_id = ? AND room_id = ?"
|
||||||
)
|
)
|
||||||
txn.executemany(
|
txn.execute_batch(
|
||||||
query,
|
query,
|
||||||
[
|
[
|
||||||
(ev.event_id, ev.room_id)
|
(ev.event_id, ev.room_id)
|
||||||
|
|
Loading…
Reference in New Issue