Fix incorrect truncation in get_missing_events
It's quite important that get_missing_events returns the *latest* events in the room; however we were pulling event ids out of the database until we got *at least* 10, and then taking the *earliest* of the results. We also shouldn't really be relying on depth, and should be checking the room_id.pull/4045/head
parent
b8a5b0097c
commit
fc0f13dd03
|
@ -0,0 +1 @@
|
||||||
|
Fix bug which made get_missing_events return too few events
|
|
@ -507,19 +507,19 @@ class FederationServer(FederationBase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_get_missing_events(self, origin, room_id, earliest_events,
|
def on_get_missing_events(self, origin, room_id, earliest_events,
|
||||||
latest_events, limit, min_depth):
|
latest_events, limit):
|
||||||
with (yield self._server_linearizer.queue((origin, room_id))):
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
origin_host, _ = parse_server_name(origin)
|
origin_host, _ = parse_server_name(origin)
|
||||||
yield self.check_server_matches_acl(origin_host, room_id)
|
yield self.check_server_matches_acl(origin_host, room_id)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
|
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
|
||||||
" limit: %d, min_depth: %d",
|
" limit: %d",
|
||||||
earliest_events, latest_events, limit, min_depth
|
earliest_events, latest_events, limit,
|
||||||
)
|
)
|
||||||
|
|
||||||
missing_events = yield self.handler.on_get_missing_events(
|
missing_events = yield self.handler.on_get_missing_events(
|
||||||
origin, room_id, earliest_events, latest_events, limit, min_depth
|
origin, room_id, earliest_events, latest_events, limit,
|
||||||
)
|
)
|
||||||
|
|
||||||
if len(missing_events) < 5:
|
if len(missing_events) < 5:
|
||||||
|
|
|
@ -560,7 +560,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_POST(self, origin, content, query, room_id):
|
def on_POST(self, origin, content, query, room_id):
|
||||||
limit = int(content.get("limit", 10))
|
limit = int(content.get("limit", 10))
|
||||||
min_depth = int(content.get("min_depth", 0))
|
|
||||||
earliest_events = content.get("earliest_events", [])
|
earliest_events = content.get("earliest_events", [])
|
||||||
latest_events = content.get("latest_events", [])
|
latest_events = content.get("latest_events", [])
|
||||||
|
|
||||||
|
@ -569,7 +568,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
earliest_events=earliest_events,
|
earliest_events=earliest_events,
|
||||||
latest_events=latest_events,
|
latest_events=latest_events,
|
||||||
min_depth=min_depth,
|
|
||||||
limit=limit,
|
limit=limit,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -309,8 +309,8 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
if sent_to_us_directly:
|
if sent_to_us_directly:
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"[%s %s] Failed to fetch %d prev events: rejecting",
|
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
|
||||||
room_id, event_id, len(prevs - seen),
|
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
|
||||||
)
|
)
|
||||||
raise FederationError(
|
raise FederationError(
|
||||||
"ERROR",
|
"ERROR",
|
||||||
|
@ -452,8 +452,8 @@ class FederationHandler(BaseHandler):
|
||||||
latest |= seen
|
latest |= seen
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"[%s %s]: Requesting %d prev_events: %s",
|
"[%s %s]: Requesting missing events between %s and %s",
|
||||||
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
|
room_id, event_id, shortstr(latest), event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: we set timeout to 10s to help workaround
|
# XXX: we set timeout to 10s to help workaround
|
||||||
|
@ -1852,7 +1852,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_get_missing_events(self, origin, room_id, earliest_events,
|
def on_get_missing_events(self, origin, room_id, earliest_events,
|
||||||
latest_events, limit, min_depth):
|
latest_events, limit):
|
||||||
in_room = yield self.auth.check_host_in_room(
|
in_room = yield self.auth.check_host_in_room(
|
||||||
room_id,
|
room_id,
|
||||||
origin
|
origin
|
||||||
|
@ -1861,14 +1861,12 @@ class FederationHandler(BaseHandler):
|
||||||
raise AuthError(403, "Host not in room.")
|
raise AuthError(403, "Host not in room.")
|
||||||
|
|
||||||
limit = min(limit, 20)
|
limit = min(limit, 20)
|
||||||
min_depth = max(min_depth, 0)
|
|
||||||
|
|
||||||
missing_events = yield self.store.get_missing_events(
|
missing_events = yield self.store.get_missing_events(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
earliest_events=earliest_events,
|
earliest_events=earliest_events,
|
||||||
latest_events=latest_events,
|
latest_events=latest_events,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
min_depth=min_depth,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
missing_events = yield filter_events_for_server(
|
missing_events = yield filter_events_for_server(
|
||||||
|
|
|
@ -376,33 +376,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_missing_events(self, room_id, earliest_events, latest_events,
|
def get_missing_events(self, room_id, earliest_events, latest_events,
|
||||||
limit, min_depth):
|
limit):
|
||||||
ids = yield self.runInteraction(
|
ids = yield self.runInteraction(
|
||||||
"get_missing_events",
|
"get_missing_events",
|
||||||
self._get_missing_events,
|
self._get_missing_events,
|
||||||
room_id, earliest_events, latest_events, limit, min_depth
|
room_id, earliest_events, latest_events, limit,
|
||||||
)
|
)
|
||||||
|
|
||||||
events = yield self._get_events(ids)
|
events = yield self._get_events(ids)
|
||||||
|
defer.returnValue(events)
|
||||||
events = sorted(
|
|
||||||
[ev for ev in events if ev.depth >= min_depth],
|
|
||||||
key=lambda e: e.depth,
|
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue(events[:limit])
|
|
||||||
|
|
||||||
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
|
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
|
||||||
limit, min_depth):
|
limit):
|
||||||
|
|
||||||
earliest_events = set(earliest_events)
|
seen_events = set(earliest_events)
|
||||||
front = set(latest_events) - earliest_events
|
front = set(latest_events) - seen_events
|
||||||
|
event_results = []
|
||||||
event_results = set()
|
|
||||||
|
|
||||||
query = (
|
query = (
|
||||||
"SELECT prev_event_id FROM event_edges "
|
"SELECT prev_event_id FROM event_edges "
|
||||||
"WHERE event_id = ? AND is_state = ? "
|
"WHERE room_id = ? AND event_id = ? AND is_state = ? "
|
||||||
"LIMIT ?"
|
"LIMIT ?"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -411,18 +403,20 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||||
for event_id in front:
|
for event_id in front:
|
||||||
txn.execute(
|
txn.execute(
|
||||||
query,
|
query,
|
||||||
(event_id, False, limit - len(event_results))
|
(room_id, event_id, False, limit - len(event_results))
|
||||||
)
|
)
|
||||||
|
|
||||||
for e_id, in txn:
|
new_results = set(t[0] for t in txn) - seen_events
|
||||||
new_front.add(e_id)
|
|
||||||
|
|
||||||
new_front -= earliest_events
|
new_front |= new_results
|
||||||
new_front -= event_results
|
seen_events |= new_results
|
||||||
|
event_results.extend(new_results)
|
||||||
|
|
||||||
front = new_front
|
front = new_front
|
||||||
event_results |= new_front
|
|
||||||
|
|
||||||
|
# we built the list working backwards from latest_events; we now need to
|
||||||
|
# reverse it so that the events are approximately chronological.
|
||||||
|
event_results.reverse()
|
||||||
return event_results
|
return event_results
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue