Handle if get_missing_pdu returns 400 or not all events.
parent
027fd1242c
commit
ae702d161a
|
@ -19,14 +19,18 @@ from twisted.internet import defer
|
||||||
from .federation_base import FederationBase
|
from .federation_base import FederationBase
|
||||||
from .units import Edu
|
from .units import Edu
|
||||||
|
|
||||||
from synapse.api.errors import CodeMessageException, SynapseError
|
from synapse.api.errors import (
|
||||||
|
CodeMessageException, HttpResponseException, SynapseError,
|
||||||
|
)
|
||||||
from synapse.util.expiringcache import ExpiringCache
|
from synapse.util.expiringcache import ExpiringCache
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
|
|
||||||
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
||||||
|
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
|
import random
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -440,21 +444,100 @@ class FederationClient(FederationBase):
|
||||||
defer.returnValue(ret)
|
defer.returnValue(ret)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_missing_events(self, destination, room_id, earliest_events,
|
def get_missing_events(self, destination, room_id, earliest_events_ids,
|
||||||
latest_events, limit, min_depth):
|
latest_events, limit, min_depth):
|
||||||
content = yield self.transport_layer.get_missing_events(
|
try:
|
||||||
destination, room_id, earliest_events, latest_events, limit,
|
content = yield self.transport_layer.get_missing_events(
|
||||||
min_depth,
|
destination=destination,
|
||||||
)
|
room_id=room_id,
|
||||||
|
earliest_events=earliest_events_ids,
|
||||||
|
latest_events=[e.event_id for e in latest_events],
|
||||||
|
limit=limit,
|
||||||
|
min_depth=min_depth,
|
||||||
|
)
|
||||||
|
|
||||||
events = [
|
events = [
|
||||||
self.event_from_pdu_json(e)
|
self.event_from_pdu_json(e)
|
||||||
for e in content.get("events", [])
|
for e in content.get("events", [])
|
||||||
]
|
]
|
||||||
|
|
||||||
signed_events = yield self._check_sigs_and_hash_and_fetch(
|
signed_events = yield self._check_sigs_and_hash_and_fetch(
|
||||||
destination, events, outlier=True
|
destination, events, outlier=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
have_gotten_all_from_destination = True
|
||||||
|
except HttpResponseException as e:
|
||||||
|
if not e.code == 400:
|
||||||
|
raise
|
||||||
|
|
||||||
|
signed_events = []
|
||||||
|
have_gotten_all_from_destination = False
|
||||||
|
|
||||||
|
if len(signed_events) >= limit:
|
||||||
|
defer.returnValue(signed_events)
|
||||||
|
|
||||||
|
servers = yield self.store.get_joined_hosts_for_room(room_id)
|
||||||
|
|
||||||
|
servers = set(servers)
|
||||||
|
servers.discard(self.server_name)
|
||||||
|
|
||||||
|
failed_to_fetch = set()
|
||||||
|
|
||||||
|
while len(signed_events) < limit:
|
||||||
|
# Are we missing any?
|
||||||
|
|
||||||
|
seen_events = set(earliest_events_ids)
|
||||||
|
seen_events.update(e.event_id for e in signed_events)
|
||||||
|
|
||||||
|
missing_events = {}
|
||||||
|
for e in itertools.chain(latest_events, signed_events):
|
||||||
|
missing_events.update({
|
||||||
|
e_id: e.depth for e_id, _ in e.prev_events
|
||||||
|
if e_id not in seen_events and e_id not in failed_to_fetch
|
||||||
|
})
|
||||||
|
|
||||||
|
if not missing_events:
|
||||||
|
break
|
||||||
|
|
||||||
|
have_seen = yield self.store.have_events(missing_events)
|
||||||
|
|
||||||
|
for k in have_seen:
|
||||||
|
missing_events.pop(k, None)
|
||||||
|
|
||||||
|
if not missing_events:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Okay, we haven't gotten everything yet. Lets get them.
|
||||||
|
ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
|
||||||
|
|
||||||
|
if have_gotten_all_from_destination:
|
||||||
|
servers.discard(destination)
|
||||||
|
|
||||||
|
def random_server_list():
|
||||||
|
srvs = list(servers)
|
||||||
|
random.shuffle(srvs)
|
||||||
|
return srvs
|
||||||
|
|
||||||
|
deferreds = [
|
||||||
|
self.get_pdu(
|
||||||
|
destinations=random_server_list(),
|
||||||
|
event_id=e_id,
|
||||||
|
)
|
||||||
|
for e_id, depth in ordered_missing[:limit - len(signed_events)]
|
||||||
|
]
|
||||||
|
|
||||||
|
got_a_new_event = False
|
||||||
|
|
||||||
|
res = yield defer.DeferredList(deferreds, consumeErrors=True)
|
||||||
|
for (result, val), (e_id, _) in zip(res, ordered_missing):
|
||||||
|
if result:
|
||||||
|
signed_events.append(val)
|
||||||
|
got_a_new_event = True
|
||||||
|
else:
|
||||||
|
failed_to_fetch.add(e_id)
|
||||||
|
|
||||||
|
if not got_a_new_event:
|
||||||
|
break
|
||||||
|
|
||||||
defer.returnValue(signed_events)
|
defer.returnValue(signed_events)
|
||||||
|
|
||||||
|
|
|
@ -413,12 +413,14 @@ class FederationServer(FederationBase):
|
||||||
missing_events = yield self.get_missing_events(
|
missing_events = yield self.get_missing_events(
|
||||||
origin,
|
origin,
|
||||||
pdu.room_id,
|
pdu.room_id,
|
||||||
earliest_events=list(latest),
|
earliest_events_ids=list(latest),
|
||||||
latest_events=[pdu.event_id],
|
latest_events=[pdu],
|
||||||
limit=10,
|
limit=10,
|
||||||
min_depth=min_depth,
|
min_depth=min_depth,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
missing_events.sort(key=lambda x: x.depth)
|
||||||
|
|
||||||
for e in missing_events:
|
for e in missing_events:
|
||||||
yield self._handle_new_pdu(
|
yield self._handle_new_pdu(
|
||||||
origin,
|
origin,
|
||||||
|
|
Loading…
Reference in New Issue