Only send RDATA for instance local events. (#8496)

When pulling events out of the DB to send over replication we were not
filtering by instance name, and so we were sending events for other
instances.
pull/8514/head
Erik Johnston 2020-10-09 13:10:33 +01:00 committed by GitHub
parent fe0f4a3591
commit 5009ffcaa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 54 additions and 25 deletions

1
changelog.d/8496.misc Normal file
View File

@ -0,0 +1 @@
Allow events to be sent to clients sooner when using sharded event persisters.

View File

@ -240,13 +240,18 @@ class BackfillStream(Stream):
ROW_TYPE = BackfillStreamRow
def __init__(self, hs):
store = hs.get_datastore()
self.store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_backfill_token),
store.get_all_new_backfill_event_rows,
self._current_token,
self.store.get_all_new_backfill_event_rows,
)
def _current_token(self, instance_name: str) -> int:
# The backfill stream over replication operates on *positive* numbers,
# which means we need to negate it.
return -self.store._backfill_id_gen.get_current_token_for_writer(instance_name)
class PresenceStream(Stream):
PresenceStreamRow = namedtuple(

View File

@ -155,7 +155,7 @@ class EventsStream(Stream):
# now we fetch up to that many rows from the events table
event_rows = await self._store.get_all_new_forward_event_rows(
from_token, current_token, target_row_count
instance_name, from_token, current_token, target_row_count
) # type: List[Tuple]
# we rely on get_all_new_forward_event_rows strictly honouring the limit, so
@ -180,7 +180,7 @@ class EventsStream(Stream):
upper_limit,
state_rows_limited,
) = await self._store.get_all_updated_current_state_deltas(
from_token, upper_limit, target_row_count
instance_name, from_token, upper_limit, target_row_count
)
limited = limited or state_rows_limited
@ -189,7 +189,7 @@ class EventsStream(Stream):
# not to bother with the limit.
ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
from_token, upper_limit
instance_name, from_token, upper_limit
) # type: List[Tuple]
# we now need to turn the raw database rows returned into tuples suitable

View File

@ -426,12 +426,12 @@ class PersistEventsStore:
# so that async background tasks get told what happened.
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, room_id, type, state_key, null, event_id
(stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, room_id, type, state_key, null, event_id
FROM current_state_events
WHERE room_id = ?
"""
txn.execute(sql, (stream_id, room_id))
txn.execute(sql, (stream_id, self._instance_name, room_id))
self.db_pool.simple_delete_txn(
txn, table="current_state_events", keyvalues={"room_id": room_id},
@ -452,8 +452,8 @@ class PersistEventsStore:
#
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, ?, ?, ?, (
(stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, ?, ?, ?, ?, (
SELECT event_id FROM current_state_events
WHERE room_id = ? AND type = ? AND state_key = ?
)
@ -463,6 +463,7 @@ class PersistEventsStore:
(
(
stream_id,
self._instance_name,
room_id,
etype,
state_key,
@ -755,6 +756,7 @@ class PersistEventsStore:
"event_stream_ordering": stream_order,
"event_id": event.event_id,
"state_group": state_group_id,
"instance_name": self._instance_name,
},
)

View File

@ -1034,16 +1034,12 @@ class EventsWorkerStore(SQLBaseStore):
return {"v1": complexity_v1}
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
def get_current_events_token(self):
"""The current maximum token that events have reached"""
return self._stream_id_gen.get_current_token()
async def get_all_new_forward_event_rows(
self, last_id: int, current_id: int, limit: int
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple]:
"""Returns new events, for the Events replication stream
@ -1067,10 +1063,11 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
txn.execute(sql, (last_id, current_id, instance_name, limit))
return txn.fetchall()
return await self.db_pool.runInteraction(
@ -1078,7 +1075,7 @@ class EventsWorkerStore(SQLBaseStore):
)
async def get_ex_outlier_stream_rows(
self, last_id: int, current_id: int
self, instance_name: str, last_id: int, current_id: int
) -> List[Tuple]:
"""Returns de-outliered events, for the Events replication stream
@ -1097,16 +1094,17 @@ class EventsWorkerStore(SQLBaseStore):
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
txn.execute(sql, (last_id, current_id))
txn.execute(sql, (last_id, current_id, instance_name))
return txn.fetchall()
return await self.db_pool.runInteraction(
@ -1119,6 +1117,9 @@ class EventsWorkerStore(SQLBaseStore):
"""Get updates for backfill replication stream, including all new
backfilled events and events that have gone from being outliers to not.
NOTE: The IDs given here are from replication, and so should be
*positive*.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
@ -1149,10 +1150,11 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
txn.execute(sql, (-last_id, -current_id, instance_name, limit))
new_event_updates = [(row[0], row[1:]) for row in txn]
limited = False
@ -1166,15 +1168,16 @@ class EventsWorkerStore(SQLBaseStore):
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
txn.execute(sql, (-last_id, -upper_bound, instance_name))
new_event_updates.extend((row[0], row[1:]) for row in txn)
if len(new_event_updates) >= limit:
@ -1188,7 +1191,7 @@ class EventsWorkerStore(SQLBaseStore):
)
async def get_all_updated_current_state_deltas(
self, from_token: int, to_token: int, target_row_count: int
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple], int, bool]:
"""Fetch updates from current_state_delta_stream
@ -1214,9 +1217,10 @@ class EventsWorkerStore(SQLBaseStore):
SELECT stream_id, room_id, type, state_key, event_id
FROM current_state_delta_stream
WHERE ? < stream_id AND stream_id <= ?
AND instance_name = ?
ORDER BY stream_id ASC LIMIT ?
"""
txn.execute(sql, (from_token, to_token, target_row_count))
txn.execute(sql, (from_token, to_token, instance_name, target_row_count))
return txn.fetchall()
def get_deltas_for_stream_id_txn(txn, stream_id):

View File

@ -0,0 +1,17 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE current_state_delta_stream ADD COLUMN instance_name TEXT;
ALTER TABLE ex_outlier_stream ADD COLUMN instance_name TEXT;