Move get_state methods into FederationHandler (#6503)
This is a non-functional refactor as a precursor to some other work.pull/6560/head
parent
4c7b1bb6cc
commit
be294d6fde
|
@ -0,0 +1 @@
|
||||||
|
Move get_state methods into FederationHandler.
|
|
@ -37,9 +37,9 @@ from synapse.api.room_versions import (
|
||||||
)
|
)
|
||||||
from synapse.events import builder, room_version_to_event_format
|
from synapse.events import builder, room_version_to_event_format
|
||||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.logging.utils import log_function
|
from synapse.logging.utils import log_function
|
||||||
from synapse.util import batch_iter, unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
|
||||||
|
@ -308,19 +308,12 @@ class FederationClient(FederationBase):
|
||||||
return signed_pdu
|
return signed_pdu
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
def get_room_state_ids(self, destination: str, room_id: str, event_id: str):
|
||||||
def get_state_for_room(self, destination, room_id, event_id):
|
"""Calls the /state_ids endpoint to fetch the state at a particular point
|
||||||
"""Requests all of the room state at a given event from a remote homeserver.
|
in the room, and the auth events for the given event
|
||||||
|
|
||||||
Args:
|
|
||||||
destination (str): The remote homeserver to query for the state.
|
|
||||||
room_id (str): The id of the room we're interested in.
|
|
||||||
event_id (str): The id of the event we want the state at.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred[Tuple[List[EventBase], List[EventBase]]]:
|
Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids)
|
||||||
A list of events in the state, and a list of events in the auth chain
|
|
||||||
for the given event.
|
|
||||||
"""
|
"""
|
||||||
result = yield self.transport_layer.get_room_state_ids(
|
result = yield self.transport_layer.get_room_state_ids(
|
||||||
destination, room_id, event_id=event_id
|
destination, room_id, event_id=event_id
|
||||||
|
@ -329,74 +322,12 @@ class FederationClient(FederationBase):
|
||||||
state_event_ids = result["pdu_ids"]
|
state_event_ids = result["pdu_ids"]
|
||||||
auth_event_ids = result.get("auth_chain_ids", [])
|
auth_event_ids = result.get("auth_chain_ids", [])
|
||||||
|
|
||||||
desired_events = set(state_event_ids + auth_event_ids)
|
if not isinstance(state_event_ids, list) or not isinstance(
|
||||||
event_map = yield self.get_events_from_store_or_dest(
|
auth_event_ids, list
|
||||||
destination, room_id, desired_events
|
):
|
||||||
)
|
raise Exception("invalid response from /state_ids")
|
||||||
|
|
||||||
failed_to_fetch = desired_events - event_map.keys()
|
return state_event_ids, auth_event_ids
|
||||||
if failed_to_fetch:
|
|
||||||
logger.warning(
|
|
||||||
"Failed to fetch missing state/auth events for %s: %s",
|
|
||||||
room_id,
|
|
||||||
failed_to_fetch,
|
|
||||||
)
|
|
||||||
|
|
||||||
pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
|
|
||||||
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
|
|
||||||
|
|
||||||
auth_chain.sort(key=lambda e: e.depth)
|
|
||||||
|
|
||||||
return pdus, auth_chain
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
|
|
||||||
"""Fetch events from a remote destination, checking if we already have them.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
destination (str)
|
|
||||||
room_id (str)
|
|
||||||
event_ids (Iterable[str])
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred[dict[str, EventBase]]: A deferred resolving to a map
|
|
||||||
from event_id to event
|
|
||||||
"""
|
|
||||||
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
|
|
||||||
|
|
||||||
missing_events = set(event_ids) - fetched_events.keys()
|
|
||||||
|
|
||||||
if not missing_events:
|
|
||||||
return fetched_events
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
"Fetching unknown state/auth events %s for room %s",
|
|
||||||
missing_events,
|
|
||||||
event_ids,
|
|
||||||
)
|
|
||||||
|
|
||||||
room_version = yield self.store.get_room_version(room_id)
|
|
||||||
|
|
||||||
# XXX 20 requests at once? really?
|
|
||||||
for batch in batch_iter(missing_events, 20):
|
|
||||||
deferreds = [
|
|
||||||
run_in_background(
|
|
||||||
self.get_pdu,
|
|
||||||
destinations=[destination],
|
|
||||||
event_id=e_id,
|
|
||||||
room_version=room_version,
|
|
||||||
)
|
|
||||||
for e_id in batch
|
|
||||||
]
|
|
||||||
|
|
||||||
res = yield make_deferred_yieldable(
|
|
||||||
defer.DeferredList(deferreds, consumeErrors=True)
|
|
||||||
)
|
|
||||||
for success, result in res:
|
|
||||||
if success and result:
|
|
||||||
fetched_events[result.event_id] = result
|
|
||||||
|
|
||||||
return fetched_events
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
|
|
@ -64,7 +64,7 @@ from synapse.replication.http.federation import (
|
||||||
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
||||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||||
from synapse.types import UserID, get_domain_from_id
|
from synapse.types import UserID, get_domain_from_id
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import batch_iter, unwrapFirstError
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.distributor import user_joined_room
|
from synapse.util.distributor import user_joined_room
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
@ -379,11 +379,9 @@ class FederationHandler(BaseHandler):
|
||||||
(
|
(
|
||||||
remote_state,
|
remote_state,
|
||||||
got_auth_chain,
|
got_auth_chain,
|
||||||
) = yield self.federation_client.get_state_for_room(
|
) = yield self._get_state_for_room(origin, room_id, p)
|
||||||
origin, room_id, p
|
|
||||||
)
|
|
||||||
|
|
||||||
# we want the state *after* p; get_state_for_room returns the
|
# we want the state *after* p; _get_state_for_room returns the
|
||||||
# state *before* p.
|
# state *before* p.
|
||||||
remote_event = yield self.federation_client.get_pdu(
|
remote_event = yield self.federation_client.get_pdu(
|
||||||
[origin], p, room_version, outlier=True
|
[origin], p, room_version, outlier=True
|
||||||
|
@ -583,6 +581,97 @@ class FederationHandler(BaseHandler):
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
@log_function
|
||||||
|
def _get_state_for_room(self, destination, room_id, event_id):
|
||||||
|
"""Requests all of the room state at a given event from a remote homeserver.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination (str): The remote homeserver to query for the state.
|
||||||
|
room_id (str): The id of the room we're interested in.
|
||||||
|
event_id (str): The id of the event we want the state at.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[Tuple[List[EventBase], List[EventBase]]]:
|
||||||
|
A list of events in the state, and a list of events in the auth chain
|
||||||
|
for the given event.
|
||||||
|
"""
|
||||||
|
(
|
||||||
|
state_event_ids,
|
||||||
|
auth_event_ids,
|
||||||
|
) = yield self.federation_client.get_room_state_ids(
|
||||||
|
destination, room_id, event_id=event_id
|
||||||
|
)
|
||||||
|
|
||||||
|
desired_events = set(state_event_ids + auth_event_ids)
|
||||||
|
event_map = yield self._get_events_from_store_or_dest(
|
||||||
|
destination, room_id, desired_events
|
||||||
|
)
|
||||||
|
|
||||||
|
failed_to_fetch = desired_events - event_map.keys()
|
||||||
|
if failed_to_fetch:
|
||||||
|
logger.warning(
|
||||||
|
"Failed to fetch missing state/auth events for %s: %s",
|
||||||
|
room_id,
|
||||||
|
failed_to_fetch,
|
||||||
|
)
|
||||||
|
|
||||||
|
pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
|
||||||
|
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
|
||||||
|
|
||||||
|
auth_chain.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
|
return pdus, auth_chain
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
|
||||||
|
"""Fetch events from a remote destination, checking if we already have them.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination (str)
|
||||||
|
room_id (str)
|
||||||
|
event_ids (Iterable[str])
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[dict[str, EventBase]]: A deferred resolving to a map
|
||||||
|
from event_id to event
|
||||||
|
"""
|
||||||
|
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
|
||||||
|
|
||||||
|
missing_events = set(event_ids) - fetched_events.keys()
|
||||||
|
|
||||||
|
if not missing_events:
|
||||||
|
return fetched_events
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"Fetching unknown state/auth events %s for room %s",
|
||||||
|
missing_events,
|
||||||
|
event_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
|
# XXX 20 requests at once? really?
|
||||||
|
for batch in batch_iter(missing_events, 20):
|
||||||
|
deferreds = [
|
||||||
|
run_in_background(
|
||||||
|
self.federation_client.get_pdu,
|
||||||
|
destinations=[destination],
|
||||||
|
event_id=e_id,
|
||||||
|
room_version=room_version,
|
||||||
|
)
|
||||||
|
for e_id in batch
|
||||||
|
]
|
||||||
|
|
||||||
|
res = yield make_deferred_yieldable(
|
||||||
|
defer.DeferredList(deferreds, consumeErrors=True)
|
||||||
|
)
|
||||||
|
for success, result in res:
|
||||||
|
if success and result:
|
||||||
|
fetched_events[result.event_id] = result
|
||||||
|
|
||||||
|
return fetched_events
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _process_received_pdu(self, origin, event, state, auth_chain):
|
def _process_received_pdu(self, origin, event, state, auth_chain):
|
||||||
""" Called when we have a new pdu. We need to do auth checks and put it
|
""" Called when we have a new pdu. We need to do auth checks and put it
|
||||||
|
@ -723,7 +812,7 @@ class FederationHandler(BaseHandler):
|
||||||
state_events = {}
|
state_events = {}
|
||||||
events_to_state = {}
|
events_to_state = {}
|
||||||
for e_id in edges:
|
for e_id in edges:
|
||||||
state, auth = yield self.federation_client.get_state_for_room(
|
state, auth = yield self._get_state_for_room(
|
||||||
destination=dest, room_id=room_id, event_id=e_id
|
destination=dest, room_id=room_id, event_id=e_id
|
||||||
)
|
)
|
||||||
auth_events.update({a.event_id: a for a in auth})
|
auth_events.update({a.event_id: a for a in auth})
|
||||||
|
|
Loading…
Reference in New Issue