Delete current state when server leaves a room (#6792)
Otherwise its just stale data, which may get deleted later anyway so can't be relied on. It's also a bit of a shotgun if we're trying to get the current state of a room we're not in.pull/6800/head
parent
2cad8baa70
commit
611215a49c
|
@ -0,0 +1 @@
|
||||||
|
Delete current state from the database when server leaves a room.
|
|
@ -32,6 +32,7 @@ from twisted.internet import defer
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.api.constants import EventContentFields, EventTypes
|
from synapse.api.constants import EventContentFields, EventTypes
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.events import EventBase # noqa: F401
|
from synapse.events import EventBase # noqa: F401
|
||||||
from synapse.events.snapshot import EventContext # noqa: F401
|
from synapse.events.snapshot import EventContext # noqa: F401
|
||||||
from synapse.events.utils import prune_event_dict
|
from synapse.events.utils import prune_event_dict
|
||||||
|
@ -468,84 +469,93 @@ class EventsStore(
|
||||||
to_delete = delta_state.to_delete
|
to_delete = delta_state.to_delete
|
||||||
to_insert = delta_state.to_insert
|
to_insert = delta_state.to_insert
|
||||||
|
|
||||||
# First we add entries to the current_state_delta_stream. We
|
if delta_state.no_longer_in_room:
|
||||||
# do this before updating the current_state_events table so
|
# Server is no longer in the room so we delete the room from
|
||||||
# that we can use it to calculate the `prev_event_id`. (This
|
# current_state_events, being careful we've already updated the
|
||||||
# allows us to not have to pull out the existing state
|
# rooms.room_version column (which gets populated in a
|
||||||
# unnecessarily).
|
# background task).
|
||||||
#
|
self._upsert_room_version_txn(txn, room_id)
|
||||||
# The stream_id for the update is chosen to be the minimum of the stream_ids
|
|
||||||
# for the batch of the events that we are persisting; that means we do not
|
# Before deleting we populate the current_state_delta_stream
|
||||||
# end up in a situation where workers see events before the
|
# so that async background tasks get told what happened.
|
||||||
# current_state_delta updates.
|
sql = """
|
||||||
#
|
INSERT INTO current_state_delta_stream
|
||||||
sql = """
|
(stream_id, room_id, type, state_key, event_id, prev_event_id)
|
||||||
INSERT INTO current_state_delta_stream
|
SELECT ?, room_id, type, state_key, null, event_id
|
||||||
(stream_id, room_id, type, state_key, event_id, prev_event_id)
|
FROM current_state_events
|
||||||
SELECT ?, ?, ?, ?, ?, (
|
WHERE room_id = ?
|
||||||
SELECT event_id FROM current_state_events
|
"""
|
||||||
WHERE room_id = ? AND type = ? AND state_key = ?
|
txn.execute(sql, (stream_id, room_id))
|
||||||
|
|
||||||
|
self.db.simple_delete_txn(
|
||||||
|
txn, table="current_state_events", keyvalues={"room_id": room_id},
|
||||||
)
|
)
|
||||||
"""
|
else:
|
||||||
txn.executemany(
|
# We're still in the room, so we update the current state as normal.
|
||||||
sql,
|
|
||||||
(
|
# First we add entries to the current_state_delta_stream. We
|
||||||
(
|
# do this before updating the current_state_events table so
|
||||||
stream_id,
|
# that we can use it to calculate the `prev_event_id`. (This
|
||||||
room_id,
|
# allows us to not have to pull out the existing state
|
||||||
etype,
|
# unnecessarily).
|
||||||
state_key,
|
#
|
||||||
None,
|
# The stream_id for the update is chosen to be the minimum of the stream_ids
|
||||||
room_id,
|
# for the batch of the events that we are persisting; that means we do not
|
||||||
etype,
|
# end up in a situation where workers see events before the
|
||||||
state_key,
|
# current_state_delta updates.
|
||||||
|
#
|
||||||
|
sql = """
|
||||||
|
INSERT INTO current_state_delta_stream
|
||||||
|
(stream_id, room_id, type, state_key, event_id, prev_event_id)
|
||||||
|
SELECT ?, ?, ?, ?, ?, (
|
||||||
|
SELECT event_id FROM current_state_events
|
||||||
|
WHERE room_id = ? AND type = ? AND state_key = ?
|
||||||
)
|
)
|
||||||
for etype, state_key in to_delete
|
"""
|
||||||
# We sanity check that we're deleting rather than updating
|
txn.executemany(
|
||||||
if (etype, state_key) not in to_insert
|
sql,
|
||||||
),
|
|
||||||
)
|
|
||||||
txn.executemany(
|
|
||||||
sql,
|
|
||||||
(
|
|
||||||
(
|
(
|
||||||
stream_id,
|
(
|
||||||
room_id,
|
stream_id,
|
||||||
etype,
|
room_id,
|
||||||
state_key,
|
etype,
|
||||||
ev_id,
|
state_key,
|
||||||
room_id,
|
to_insert.get((etype, state_key)),
|
||||||
etype,
|
room_id,
|
||||||
state_key,
|
etype,
|
||||||
)
|
state_key,
|
||||||
for (etype, state_key), ev_id in iteritems(to_insert)
|
)
|
||||||
),
|
for etype, state_key in itertools.chain(to_delete, to_insert)
|
||||||
)
|
),
|
||||||
|
)
|
||||||
|
# Now we actually update the current_state_events table
|
||||||
|
|
||||||
# Now we actually update the current_state_events table
|
txn.executemany(
|
||||||
|
"DELETE FROM current_state_events"
|
||||||
|
" WHERE room_id = ? AND type = ? AND state_key = ?",
|
||||||
|
(
|
||||||
|
(room_id, etype, state_key)
|
||||||
|
for etype, state_key in itertools.chain(to_delete, to_insert)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
txn.executemany(
|
# We include the membership in the current state table, hence we do
|
||||||
"DELETE FROM current_state_events"
|
# a lookup when we insert. This assumes that all events have already
|
||||||
" WHERE room_id = ? AND type = ? AND state_key = ?",
|
# been inserted into room_memberships.
|
||||||
(
|
txn.executemany(
|
||||||
(room_id, etype, state_key)
|
"""INSERT INTO current_state_events
|
||||||
for etype, state_key in itertools.chain(to_delete, to_insert)
|
(room_id, type, state_key, event_id, membership)
|
||||||
),
|
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
||||||
)
|
""",
|
||||||
|
[
|
||||||
|
(room_id, key[0], key[1], ev_id, ev_id)
|
||||||
|
for key, ev_id in iteritems(to_insert)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
# We include the membership in the current state table, hence we do
|
# We now update `local_current_membership`. We do this regardless
|
||||||
# a lookup when we insert. This assumes that all events have already
|
# of whether we're still in the room or not to handle the case where
|
||||||
# been inserted into room_memberships.
|
# e.g. we just got banned (where we need to record that fact here).
|
||||||
txn.executemany(
|
|
||||||
"""INSERT INTO current_state_events
|
|
||||||
(room_id, type, state_key, event_id, membership)
|
|
||||||
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
|
|
||||||
""",
|
|
||||||
[
|
|
||||||
(room_id, key[0], key[1], ev_id, ev_id)
|
|
||||||
for key, ev_id in iteritems(to_insert)
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Note: Do we really want to delete rows here (that we do not
|
# Note: Do we really want to delete rows here (that we do not
|
||||||
# subsequently reinsert below)? While technically correct it means
|
# subsequently reinsert below)? While technically correct it means
|
||||||
|
@ -601,6 +611,35 @@ class EventsStore(
|
||||||
|
|
||||||
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
|
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
|
||||||
|
|
||||||
|
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
|
||||||
|
"""Update the room version in the database based off current state
|
||||||
|
events.
|
||||||
|
|
||||||
|
This is used when we're about to delete current state and we want to
|
||||||
|
ensure that the `rooms.room_version` column is up to date.
|
||||||
|
"""
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT json FROM event_json
|
||||||
|
INNER JOIN current_state_events USING (room_id, event_id)
|
||||||
|
WHERE room_id = ? AND type = ? AND state_key = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id, EventTypes.Create, ""))
|
||||||
|
row = txn.fetchone()
|
||||||
|
if row:
|
||||||
|
event_json = json.loads(row[0])
|
||||||
|
content = event_json.get("content", {})
|
||||||
|
creator = content.get("creator")
|
||||||
|
room_version_id = content.get("room_version", RoomVersions.V1.identifier)
|
||||||
|
|
||||||
|
self.db.simple_upsert_txn(
|
||||||
|
txn,
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
values={"room_version": room_version_id},
|
||||||
|
insertion_values={"is_public": False, "creator": creator},
|
||||||
|
)
|
||||||
|
|
||||||
def _update_forward_extremities_txn(
|
def _update_forward_extremities_txn(
|
||||||
self, txn, new_forward_extremities, max_stream_order
|
self, txn, new_forward_extremities, max_stream_order
|
||||||
):
|
):
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from collections import deque, namedtuple
|
from collections import deque, namedtuple
|
||||||
from typing import Iterable, List, Optional, Tuple
|
from typing import Iterable, List, Optional, Tuple
|
||||||
|
@ -27,7 +28,7 @@ from prometheus_client import Counter, Histogram
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
from synapse.events.snapshot import EventContext
|
from synapse.events.snapshot import EventContext
|
||||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||||
|
@ -72,17 +73,20 @@ stale_forward_extremities_counter = Histogram(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True)
|
@attr.s(slots=True)
|
||||||
class DeltaState:
|
class DeltaState:
|
||||||
"""Deltas to use to update the `current_state_events` table.
|
"""Deltas to use to update the `current_state_events` table.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
to_delete: List of type/state_keys to delete from current state
|
to_delete: List of type/state_keys to delete from current state
|
||||||
to_insert: Map of state to upsert into current state
|
to_insert: Map of state to upsert into current state
|
||||||
|
no_longer_in_room: The server is not longer in the room, so the room
|
||||||
|
should e.g. be removed from `current_state_events` table.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
to_delete = attr.ib(type=List[Tuple[str, str]])
|
to_delete = attr.ib(type=List[Tuple[str, str]])
|
||||||
to_insert = attr.ib(type=StateMap[str])
|
to_insert = attr.ib(type=StateMap[str])
|
||||||
|
no_longer_in_room = attr.ib(type=bool, default=False)
|
||||||
|
|
||||||
|
|
||||||
class _EventPeristenceQueue(object):
|
class _EventPeristenceQueue(object):
|
||||||
|
@ -396,11 +400,12 @@ class EventsPersistenceStorage(object):
|
||||||
# If either are not None then there has been a change,
|
# If either are not None then there has been a change,
|
||||||
# and we need to work out the delta (or use that
|
# and we need to work out the delta (or use that
|
||||||
# given)
|
# given)
|
||||||
|
delta = None
|
||||||
if delta_ids is not None:
|
if delta_ids is not None:
|
||||||
# If there is a delta we know that we've
|
# If there is a delta we know that we've
|
||||||
# only added or replaced state, never
|
# only added or replaced state, never
|
||||||
# removed keys entirely.
|
# removed keys entirely.
|
||||||
state_delta_for_room[room_id] = DeltaState([], delta_ids)
|
delta = DeltaState([], delta_ids)
|
||||||
elif current_state is not None:
|
elif current_state is not None:
|
||||||
with Measure(
|
with Measure(
|
||||||
self._clock, "persist_events.calculate_state_delta"
|
self._clock, "persist_events.calculate_state_delta"
|
||||||
|
@ -408,6 +413,22 @@ class EventsPersistenceStorage(object):
|
||||||
delta = await self._calculate_state_delta(
|
delta = await self._calculate_state_delta(
|
||||||
room_id, current_state
|
room_id, current_state
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if delta:
|
||||||
|
# If we have a change of state then lets check
|
||||||
|
# whether we're actually still a member of the room,
|
||||||
|
# or if our last user left. If we're no longer in
|
||||||
|
# the room then we delete the current state and
|
||||||
|
# extremities.
|
||||||
|
is_still_joined = await self._is_server_still_joined(
|
||||||
|
room_id, ev_ctx_rm, delta, current_state
|
||||||
|
)
|
||||||
|
if not is_still_joined:
|
||||||
|
logger.info("Server no longer in room %s", room_id)
|
||||||
|
latest_event_ids = []
|
||||||
|
current_state = {}
|
||||||
|
delta.no_longer_in_room = True
|
||||||
|
|
||||||
state_delta_for_room[room_id] = delta
|
state_delta_for_room[room_id] = delta
|
||||||
|
|
||||||
# If we have the current_state then lets prefill
|
# If we have the current_state then lets prefill
|
||||||
|
@ -660,3 +681,65 @@ class EventsPersistenceStorage(object):
|
||||||
}
|
}
|
||||||
|
|
||||||
return DeltaState(to_delete=to_delete, to_insert=to_insert)
|
return DeltaState(to_delete=to_delete, to_insert=to_insert)
|
||||||
|
|
||||||
|
async def _is_server_still_joined(
|
||||||
|
self,
|
||||||
|
room_id: str,
|
||||||
|
ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
|
||||||
|
delta: DeltaState,
|
||||||
|
current_state: Optional[StateMap[str]],
|
||||||
|
) -> bool:
|
||||||
|
"""Check if the server will still be joined after the given events have
|
||||||
|
been persised.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id
|
||||||
|
ev_ctx_rm
|
||||||
|
delta: The delta of current state between what is in the database
|
||||||
|
and what the new current state will be.
|
||||||
|
current_state: The new current state if it already been calculated,
|
||||||
|
otherwise None.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not any(
|
||||||
|
self.is_mine_id(state_key)
|
||||||
|
for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
|
||||||
|
if typ == EventTypes.Member
|
||||||
|
):
|
||||||
|
# There have been no changes to membership of our users, so nothing
|
||||||
|
# has changed and we assume we're still in the room.
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check if any of the given events are a local join that appear in the
|
||||||
|
# current state
|
||||||
|
for (typ, state_key), event_id in delta.to_insert.items():
|
||||||
|
if typ != EventTypes.Member or not self.is_mine_id(state_key):
|
||||||
|
continue
|
||||||
|
|
||||||
|
for event, _ in ev_ctx_rm:
|
||||||
|
if event_id == event.event_id:
|
||||||
|
if event.membership == Membership.JOIN:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# There's been a change of membership but we don't have a local join
|
||||||
|
# event in the new events, so we need to check the full state.
|
||||||
|
if current_state is None:
|
||||||
|
current_state = await self.main_store.get_current_state_ids(room_id)
|
||||||
|
current_state = dict(current_state)
|
||||||
|
for key in delta.to_delete:
|
||||||
|
current_state.pop(key, None)
|
||||||
|
|
||||||
|
current_state.update(delta.to_insert)
|
||||||
|
|
||||||
|
event_ids = [
|
||||||
|
event_id
|
||||||
|
for (typ, state_key,), event_id in current_state.items()
|
||||||
|
if typ == EventTypes.Member and self.is_mine_id(state_key)
|
||||||
|
]
|
||||||
|
|
||||||
|
rows = await self.main_store.get_membership_from_event_ids(event_ids)
|
||||||
|
is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
|
||||||
|
if is_still_joined:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
Loading…
Reference in New Issue