commit
b8d821aa68
|
@ -2093,6 +2093,30 @@ class EventsStore(SQLBaseStore):
|
||||||
# state_groups
|
# state_groups
|
||||||
# state_groups_state
|
# state_groups_state
|
||||||
|
|
||||||
|
# we will build a temporary table listing the events so that we don't
|
||||||
|
# have to keep shovelling the list back and forth across the
|
||||||
|
# connection. Annoyingly the python sqlite driver commits the
|
||||||
|
# transaction on CREATE, so let's do this first.
|
||||||
|
#
|
||||||
|
# furthermore, we might already have the table from a previous (failed)
|
||||||
|
# purge attempt, so let's drop the table first.
|
||||||
|
|
||||||
|
txn.execute("DROP TABLE IF EXISTS events_to_purge")
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
"CREATE TEMPORARY TABLE events_to_purge ("
|
||||||
|
" event_id TEXT NOT NULL,"
|
||||||
|
" should_delete BOOLEAN NOT NULL"
|
||||||
|
")"
|
||||||
|
)
|
||||||
|
|
||||||
|
# create an index on should_delete because later we'll be looking for
|
||||||
|
# the should_delete / shouldn't_delete subsets
|
||||||
|
txn.execute(
|
||||||
|
"CREATE INDEX events_to_purge_should_delete"
|
||||||
|
" ON events_to_purge(should_delete)",
|
||||||
|
)
|
||||||
|
|
||||||
# First ensure that we're not about to delete all the forward extremeties
|
# First ensure that we're not about to delete all the forward extremeties
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"SELECT e.event_id, e.depth FROM events as e "
|
"SELECT e.event_id, e.depth FROM events as e "
|
||||||
|
@ -2115,23 +2139,30 @@ class EventsStore(SQLBaseStore):
|
||||||
|
|
||||||
logger.info("[purge] looking for events to delete")
|
logger.info("[purge] looking for events to delete")
|
||||||
|
|
||||||
|
should_delete_expr = "state_key IS NULL"
|
||||||
|
should_delete_params = ()
|
||||||
|
if not delete_local_events:
|
||||||
|
should_delete_expr += " AND event_id NOT LIKE ?"
|
||||||
|
should_delete_params += ("%:" + self.hs.hostname, )
|
||||||
|
|
||||||
|
should_delete_params += (room_id, topological_ordering)
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"SELECT event_id, state_key FROM events"
|
"INSERT INTO events_to_purge"
|
||||||
" LEFT JOIN state_events USING (room_id, event_id)"
|
" SELECT event_id, %s"
|
||||||
" WHERE room_id = ? AND topological_ordering < ?",
|
" FROM events AS e LEFT JOIN state_events USING (event_id)"
|
||||||
(room_id, topological_ordering,)
|
" WHERE e.room_id = ? AND topological_ordering < ?" % (
|
||||||
|
should_delete_expr,
|
||||||
|
),
|
||||||
|
should_delete_params,
|
||||||
|
)
|
||||||
|
txn.execute(
|
||||||
|
"SELECT event_id, should_delete FROM events_to_purge"
|
||||||
)
|
)
|
||||||
event_rows = txn.fetchall()
|
event_rows = txn.fetchall()
|
||||||
|
|
||||||
to_delete = [
|
|
||||||
(event_id,) for event_id, state_key in event_rows
|
|
||||||
if state_key is None and (
|
|
||||||
delete_local_events or not self.hs.is_mine_id(event_id)
|
|
||||||
)
|
|
||||||
]
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"[purge] found %i events before cutoff, of which %i can be deleted",
|
"[purge] found %i events before cutoff, of which %i can be deleted",
|
||||||
len(event_rows), len(to_delete),
|
len(event_rows), sum(1 for e in event_rows if e[1]),
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("[purge] Finding new backward extremities")
|
logger.info("[purge] Finding new backward extremities")
|
||||||
|
@ -2139,12 +2170,11 @@ class EventsStore(SQLBaseStore):
|
||||||
# We calculate the new entries for the backward extremeties by finding
|
# We calculate the new entries for the backward extremeties by finding
|
||||||
# all events that point to events that are to be purged
|
# all events that point to events that are to be purged
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"SELECT DISTINCT e.event_id FROM events as e"
|
"SELECT DISTINCT e.event_id FROM events_to_purge AS e"
|
||||||
" INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
|
" INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
|
||||||
" INNER JOIN events as e2 ON e2.event_id = ed.event_id"
|
" INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
|
||||||
" WHERE e.room_id = ? AND e.topological_ordering < ?"
|
" WHERE e2.topological_ordering >= ?",
|
||||||
" AND e2.topological_ordering >= ?",
|
(topological_ordering, )
|
||||||
(room_id, topological_ordering, topological_ordering)
|
|
||||||
)
|
)
|
||||||
new_backwards_extrems = txn.fetchall()
|
new_backwards_extrems = txn.fetchall()
|
||||||
|
|
||||||
|
@ -2172,12 +2202,11 @@ class EventsStore(SQLBaseStore):
|
||||||
"SELECT state_group FROM event_to_state_groups"
|
"SELECT state_group FROM event_to_state_groups"
|
||||||
" INNER JOIN events USING (event_id)"
|
" INNER JOIN events USING (event_id)"
|
||||||
" WHERE state_group IN ("
|
" WHERE state_group IN ("
|
||||||
" SELECT DISTINCT state_group FROM events"
|
" SELECT DISTINCT state_group FROM events_to_purge"
|
||||||
" INNER JOIN event_to_state_groups USING (event_id)"
|
" INNER JOIN event_to_state_groups USING (event_id)"
|
||||||
" WHERE room_id = ? AND topological_ordering < ?"
|
|
||||||
" )"
|
" )"
|
||||||
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
|
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
|
||||||
(room_id, topological_ordering, topological_ordering)
|
(topological_ordering, )
|
||||||
)
|
)
|
||||||
|
|
||||||
state_rows = txn.fetchall()
|
state_rows = txn.fetchall()
|
||||||
|
@ -2262,9 +2291,9 @@ class EventsStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("[purge] removing events from event_to_state_groups")
|
logger.info("[purge] removing events from event_to_state_groups")
|
||||||
txn.executemany(
|
txn.execute(
|
||||||
"DELETE FROM event_to_state_groups WHERE event_id = ?",
|
"DELETE FROM event_to_state_groups "
|
||||||
[(event_id,) for event_id, _ in event_rows]
|
"WHERE event_id IN (SELECT event_id from events_to_purge)"
|
||||||
)
|
)
|
||||||
for event_id, _ in event_rows:
|
for event_id, _ in event_rows:
|
||||||
txn.call_after(self._get_state_group_for_event.invalidate, (
|
txn.call_after(self._get_state_group_for_event.invalidate, (
|
||||||
|
@ -2289,22 +2318,35 @@ class EventsStore(SQLBaseStore):
|
||||||
):
|
):
|
||||||
logger.info("[purge] removing events from %s", table)
|
logger.info("[purge] removing events from %s", table)
|
||||||
|
|
||||||
txn.executemany(
|
txn.execute(
|
||||||
"DELETE FROM %s WHERE event_id = ?" % (table,),
|
"DELETE FROM %s WHERE event_id IN ("
|
||||||
to_delete
|
" SELECT event_id FROM events_to_purge WHERE should_delete"
|
||||||
|
")" % (table,),
|
||||||
|
)
|
||||||
|
|
||||||
|
# event_push_actions lacks an index on event_id, and has one on
|
||||||
|
# (room_id, event_id) instead.
|
||||||
|
for table in (
|
||||||
|
"event_push_actions",
|
||||||
|
):
|
||||||
|
logger.info("[purge] removing events from %s", table)
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
"DELETE FROM %s WHERE room_id = ? AND event_id IN ("
|
||||||
|
" SELECT event_id FROM events_to_purge WHERE should_delete"
|
||||||
|
")" % (table,),
|
||||||
|
(room_id, )
|
||||||
)
|
)
|
||||||
|
|
||||||
# Mark all state and own events as outliers
|
# Mark all state and own events as outliers
|
||||||
logger.info("[purge] marking remaining events as outliers")
|
logger.info("[purge] marking remaining events as outliers")
|
||||||
txn.executemany(
|
txn.execute(
|
||||||
"UPDATE events SET outlier = ?"
|
"UPDATE events SET outlier = ?"
|
||||||
" WHERE event_id = ?",
|
" WHERE event_id IN ("
|
||||||
[
|
" SELECT event_id FROM events_to_purge "
|
||||||
(True, event_id,) for event_id, state_key in event_rows
|
" WHERE NOT should_delete"
|
||||||
if state_key is not None or (
|
")",
|
||||||
not delete_local_events and self.hs.is_mine_id(event_id)
|
(True,),
|
||||||
)
|
|
||||||
]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# synapse tries to take out an exclusive lock on room_depth whenever it
|
# synapse tries to take out an exclusive lock on room_depth whenever it
|
||||||
|
@ -2319,6 +2361,12 @@ class EventsStore(SQLBaseStore):
|
||||||
(topological_ordering, room_id,)
|
(topological_ordering, room_id,)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# finally, drop the temp table. this will commit the txn in sqlite,
|
||||||
|
# so make sure to keep this actually last.
|
||||||
|
txn.execute(
|
||||||
|
"DROP TABLE events_to_purge"
|
||||||
|
)
|
||||||
|
|
||||||
logger.info("[purge] done")
|
logger.info("[purge] done")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
Loading…
Reference in New Issue