Speed up current state background update.
Turns out that storing huge JSON arrays in the progress JSON isn't something that postgres particularly likes.pull/5738/head
parent
0d0f6d12bc
commit
b2a629ef49
|
@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _background_current_state_membership(self, progress, batch_size):
|
def _background_current_state_membership(self, progress, batch_size):
|
||||||
"""Update the new membership column on current_state_events.
|
"""Update the new membership column on current_state_events.
|
||||||
|
|
||||||
|
This works by iterating over all rooms in alphebetical order.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if "rooms" not in progress:
|
def _background_current_state_membership_txn(txn, last_processed_room):
|
||||||
rooms = yield self._simple_select_onecol(
|
|
||||||
table="current_state_events",
|
|
||||||
keyvalues={},
|
|
||||||
retcol="DISTINCT room_id",
|
|
||||||
desc="_background_current_state_membership_get_rooms",
|
|
||||||
)
|
|
||||||
progress["rooms"] = rooms
|
|
||||||
|
|
||||||
rooms = progress["rooms"]
|
|
||||||
|
|
||||||
def _background_current_state_membership_txn(txn):
|
|
||||||
processed = 0
|
processed = 0
|
||||||
while rooms and processed < batch_size:
|
while processed < batch_size:
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
SELECT MIN(room_id) FROM rooms WHERE room_id > ?
|
||||||
|
""",
|
||||||
|
(last_processed_room,),
|
||||||
|
)
|
||||||
|
row = txn.fetchone()
|
||||||
|
if not row or not row[0]:
|
||||||
|
return processed, True
|
||||||
|
|
||||||
|
next_room, = row
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
UPDATE current_state_events AS c
|
UPDATE current_state_events AS c
|
||||||
SET membership = (
|
SET membership = (
|
||||||
|
@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
)
|
)
|
||||||
WHERE room_id = ?
|
WHERE room_id = ?
|
||||||
"""
|
"""
|
||||||
txn.execute(sql, (rooms.pop(),))
|
txn.execute(sql, (next_room,))
|
||||||
processed += txn.rowcount
|
processed += txn.rowcount
|
||||||
|
|
||||||
|
last_processed_room = next_room
|
||||||
|
|
||||||
self._background_update_progress_txn(
|
self._background_update_progress_txn(
|
||||||
txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
|
txn,
|
||||||
|
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
|
||||||
|
{"last_processed_room": last_processed_room},
|
||||||
)
|
)
|
||||||
|
|
||||||
return processed
|
return processed, False
|
||||||
|
|
||||||
result = yield self.runInteraction(
|
# If we haven't got a last processed room then just use the empty
|
||||||
|
# string, which will compare before all room IDs correctly.
|
||||||
|
last_processed_room = progress.get("last_processed_room", "")
|
||||||
|
|
||||||
|
row_count, finished = yield self.runInteraction(
|
||||||
"_background_current_state_membership_update",
|
"_background_current_state_membership_update",
|
||||||
_background_current_state_membership_txn,
|
_background_current_state_membership_txn,
|
||||||
|
last_processed_room,
|
||||||
)
|
)
|
||||||
|
|
||||||
if not rooms:
|
if finished:
|
||||||
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
|
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(row_count)
|
||||||
|
|
||||||
|
|
||||||
class _JoinedHostsCache(object):
|
class _JoinedHostsCache(object):
|
||||||
|
|
Loading…
Reference in New Issue