handle federation not telling us about prev_events
parent
cd6bcdaf87
commit
77078d6c8e
|
@ -549,7 +549,9 @@ class FederationServer(FederationBase):
|
||||||
affected=pdu.event_id,
|
affected=pdu.event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
|
yield self.handler.on_receive_pdu(
|
||||||
|
origin, pdu, get_missing=True, sent_to_us_directly=True,
|
||||||
|
)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "<ReplicationLayer(%s)>" % self.server_name
|
return "<ReplicationLayer(%s)>" % self.server_name
|
||||||
|
|
|
@ -44,6 +44,7 @@ from synapse.util.frozenutils import unfreeze
|
||||||
from synapse.crypto.event_signing import (
|
from synapse.crypto.event_signing import (
|
||||||
compute_event_signature, add_hashes_and_signatures,
|
compute_event_signature, add_hashes_and_signatures,
|
||||||
)
|
)
|
||||||
|
from synapse.state import resolve_events_with_factory
|
||||||
from synapse.types import UserID, get_domain_from_id
|
from synapse.types import UserID, get_domain_from_id
|
||||||
|
|
||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
|
@ -89,7 +90,9 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_receive_pdu(self, origin, pdu, get_missing=True):
|
def on_receive_pdu(
|
||||||
|
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
|
||||||
|
):
|
||||||
""" Process a PDU received via a federation /send/ transaction, or
|
""" Process a PDU received via a federation /send/ transaction, or
|
||||||
via backfill of missing prev_events
|
via backfill of missing prev_events
|
||||||
|
|
||||||
|
@ -163,7 +166,7 @@ class FederationHandler(BaseHandler):
|
||||||
"Ignoring PDU %s for room %s from %s as we've left the room!",
|
"Ignoring PDU %s for room %s from %s as we've left the room!",
|
||||||
pdu.event_id, pdu.room_id, origin,
|
pdu.event_id, pdu.room_id, origin,
|
||||||
)
|
)
|
||||||
return
|
defer.returnValue(None)
|
||||||
|
|
||||||
state = None
|
state = None
|
||||||
|
|
||||||
|
@ -225,26 +228,54 @@ class FederationHandler(BaseHandler):
|
||||||
list(prevs - seen)[:5],
|
list(prevs - seen)[:5],
|
||||||
)
|
)
|
||||||
|
|
||||||
if prevs - seen:
|
if sent_to_us_directly and prevs - seen:
|
||||||
logger.info(
|
# If they have sent it to us directly, and the server
|
||||||
"Still missing %d events for room %r: %r...",
|
# isn't telling us about the auth events that it's
|
||||||
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
|
# made a message referencing, we explode
|
||||||
)
|
raise FederationError(
|
||||||
fetch_state = True
|
"ERROR",
|
||||||
|
403,
|
||||||
if fetch_state:
|
("Your server isn't divulging details about prev_events "
|
||||||
# We need to get the state at this event, since we haven't
|
"referenced in this event."),
|
||||||
# processed all the prev events.
|
affected=pdu.event_id,
|
||||||
logger.debug(
|
|
||||||
"_handle_new_pdu getting state for %s",
|
|
||||||
pdu.room_id
|
|
||||||
)
|
)
|
||||||
|
elif prevs - seen:
|
||||||
|
# If we're walking back up the chain to fetch it, then
|
||||||
|
# try and find the states. If we can't get the states,
|
||||||
|
# discard it.
|
||||||
|
state_groups = []
|
||||||
|
auth_chains = set()
|
||||||
try:
|
try:
|
||||||
|
# Get the ones we know about
|
||||||
|
ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
|
||||||
|
state_groups.append(ours)
|
||||||
|
|
||||||
|
for p in prevs - seen:
|
||||||
state, auth_chain = yield self.replication_layer.get_state_for_room(
|
state, auth_chain = yield self.replication_layer.get_state_for_room(
|
||||||
origin, pdu.room_id, pdu.event_id,
|
origin, pdu.room_id, p
|
||||||
)
|
)
|
||||||
|
auth_chains.update(auth_chain)
|
||||||
|
state_group = {
|
||||||
|
(x.type, x.state_key): x.event_id for x in state
|
||||||
|
}
|
||||||
|
state_groups.append(state_group)
|
||||||
|
|
||||||
|
def fetch(ev_ids):
|
||||||
|
return self.store.get_events(
|
||||||
|
ev_ids, get_prev_content=False, check_redacted=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
state = yield resolve_events_with_factory(state_groups, {pdu.event_id: pdu}, fetch)
|
||||||
|
|
||||||
|
state = yield self.store.get_events(state.values())
|
||||||
|
state = state.values()
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to get state for event: %s", pdu.event_id)
|
raise FederationError(
|
||||||
|
"ERROR",
|
||||||
|
403,
|
||||||
|
"We can't get valid state history.",
|
||||||
|
affected=pdu.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
yield self._process_received_pdu(
|
yield self._process_received_pdu(
|
||||||
origin,
|
origin,
|
||||||
|
@ -322,11 +353,17 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
for e in missing_events:
|
for e in missing_events:
|
||||||
logger.info("Handling found event %s", e.event_id)
|
logger.info("Handling found event %s", e.event_id)
|
||||||
|
try:
|
||||||
yield self.on_receive_pdu(
|
yield self.on_receive_pdu(
|
||||||
origin,
|
origin,
|
||||||
e,
|
e,
|
||||||
get_missing=False
|
get_missing=False
|
||||||
)
|
)
|
||||||
|
except FederationError as e:
|
||||||
|
if e.code == 403:
|
||||||
|
logger.warn("Event %s failed history check.")
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
|
@ -0,0 +1,235 @@
|
||||||
|
|
||||||
|
from twisted.internet.defer import Deferred, succeed, maybeDeferred
|
||||||
|
|
||||||
|
from synapse.util import Clock
|
||||||
|
from synapse.events import FrozenEvent
|
||||||
|
from synapse.types import Requester, UserID
|
||||||
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
from tests.server import make_request, setup_test_homeserver, ThreadedMemoryReactorClock
|
||||||
|
|
||||||
|
from mock import Mock
|
||||||
|
|
||||||
|
from synapse.api.errors import CodeMessageException, HttpResponseException
|
||||||
|
|
||||||
|
|
||||||
|
class MessageAcceptTests(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
|
||||||
|
self.http_client = Mock()
|
||||||
|
self.reactor = ThreadedMemoryReactorClock()
|
||||||
|
self.hs_clock = Clock(self.reactor)
|
||||||
|
self.homeserver = setup_test_homeserver(
|
||||||
|
http_client=self.http_client, clock=self.hs_clock, reactor=self.reactor
|
||||||
|
)
|
||||||
|
|
||||||
|
user_id = UserID("us", "test")
|
||||||
|
our_user = Requester(user_id, None, False, None, None)
|
||||||
|
room_creator = self.homeserver.get_room_creation_handler()
|
||||||
|
room = room_creator.create_room(
|
||||||
|
our_user, room_creator.PRESETS_DICT["public_chat"], ratelimit=False
|
||||||
|
)
|
||||||
|
self.reactor.advance(0.1)
|
||||||
|
self.room_id = self.successResultOf(room)["room_id"]
|
||||||
|
|
||||||
|
# Figure out what the most recent event is
|
||||||
|
most_recent = self.successResultOf(
|
||||||
|
self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
join_event = FrozenEvent(
|
||||||
|
{
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"sender": "@baduser:test.serv",
|
||||||
|
"state_key": "@baduser:test.serv",
|
||||||
|
"event_id": "$join:test.serv",
|
||||||
|
"depth": 1000,
|
||||||
|
"origin_server_ts": 1,
|
||||||
|
"type": "m.room.member",
|
||||||
|
"origin": "test.servx",
|
||||||
|
"content": {"membership": "join"},
|
||||||
|
"auth_events": [],
|
||||||
|
"prev_state": [(most_recent, {})],
|
||||||
|
"prev_events": [(most_recent, {})],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
self.handler = self.homeserver.get_handlers().federation_handler
|
||||||
|
self.handler.do_auth = lambda *a, **b: succeed(True)
|
||||||
|
self.client = self.homeserver.get_federation_client()
|
||||||
|
self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed(
|
||||||
|
pdus
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send the join, it should return None (which is not an error)
|
||||||
|
d = self.handler.on_receive_pdu(
|
||||||
|
"test.serv", join_event, sent_to_us_directly=True
|
||||||
|
)
|
||||||
|
self.reactor.advance(1)
|
||||||
|
self.assertEqual(self.successResultOf(d), None)
|
||||||
|
|
||||||
|
# Make sure we actually joined the room
|
||||||
|
self.assertEqual(
|
||||||
|
self.successResultOf(
|
||||||
|
self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)[0],
|
||||||
|
"$join:test.serv",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_cant_hide_direct_ancestors(self):
|
||||||
|
"""
|
||||||
|
If you send a message, you must be able to provide the direct
|
||||||
|
prev_events that said event references.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def post_json(destination, path, data, headers=None, timeout=0):
|
||||||
|
# If it asks us for new missing events, give them NOTHING
|
||||||
|
if path.startswith("/_matrix/federation/v1/get_missing_events/"):
|
||||||
|
return {"events": []}
|
||||||
|
|
||||||
|
self.http_client.post_json = post_json
|
||||||
|
|
||||||
|
# Figure out what the most recent event is
|
||||||
|
most_recent = self.successResultOf(
|
||||||
|
self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
# Now lie about an event
|
||||||
|
lying_event = FrozenEvent(
|
||||||
|
{
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"sender": "@baduser:test.serv",
|
||||||
|
"event_id": "one:test.serv",
|
||||||
|
"depth": 1000,
|
||||||
|
"origin_server_ts": 1,
|
||||||
|
"type": "m.room.message",
|
||||||
|
"origin": "test.serv",
|
||||||
|
"content": "hewwo?",
|
||||||
|
"auth_events": [],
|
||||||
|
"prev_events": [("two:test.serv", {}), (most_recent, {})],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
d = self.handler.on_receive_pdu(
|
||||||
|
"test.serv", lying_event, sent_to_us_directly=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step the reactor, so the database fetches come back
|
||||||
|
self.reactor.advance(1)
|
||||||
|
|
||||||
|
# on_receive_pdu should throw an error
|
||||||
|
failure = self.failureResultOf(d)
|
||||||
|
self.assertEqual(
|
||||||
|
failure.value.args[0],
|
||||||
|
(
|
||||||
|
"ERROR 403: Your server isn't divulging details about prev_events "
|
||||||
|
"referenced in this event."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure the invalid event isn't there
|
||||||
|
extrem = self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
|
||||||
|
|
||||||
|
@unittest.DEBUG
|
||||||
|
def test_cant_hide_past_history(self):
|
||||||
|
"""
|
||||||
|
If you send a message, you must be able to provide the direct
|
||||||
|
prev_events that said event references.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def post_json(destination, path, data, headers=None, timeout=0):
|
||||||
|
if path.startswith("/_matrix/federation/v1/get_missing_events/"):
|
||||||
|
return {
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"sender": "@baduser:test.serv",
|
||||||
|
"event_id": "three:test.serv",
|
||||||
|
"depth": 1000,
|
||||||
|
"origin_server_ts": 1,
|
||||||
|
"type": "m.room.message",
|
||||||
|
"origin": "test.serv",
|
||||||
|
"content": "hewwo?",
|
||||||
|
"auth_events": [],
|
||||||
|
"prev_events": [("four:test.serv", {})],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
self.http_client.post_json = post_json
|
||||||
|
|
||||||
|
def get_json(destination, path, args, headers=None):
|
||||||
|
print(destination, path, args)
|
||||||
|
if path.startswith("/_matrix/federation/v1/state_ids/"):
|
||||||
|
d = self.successResultOf(
|
||||||
|
self.homeserver.datastore.get_state_ids_for_event("one:test.serv")
|
||||||
|
)
|
||||||
|
|
||||||
|
return succeed(
|
||||||
|
{
|
||||||
|
"pdu_ids": [
|
||||||
|
y
|
||||||
|
for x, y in d.items()
|
||||||
|
if x == ("m.room.member", "@us:test")
|
||||||
|
],
|
||||||
|
"auth_chain_ids": d.values(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
self.http_client.get_json = get_json
|
||||||
|
|
||||||
|
# Figure out what the most recent event is
|
||||||
|
most_recent = self.successResultOf(
|
||||||
|
self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
# Make a good event
|
||||||
|
good_event = FrozenEvent(
|
||||||
|
{
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"sender": "@baduser:test.serv",
|
||||||
|
"event_id": "one:test.serv",
|
||||||
|
"depth": 1000,
|
||||||
|
"origin_server_ts": 1,
|
||||||
|
"type": "m.room.message",
|
||||||
|
"origin": "test.serv",
|
||||||
|
"content": "hewwo?",
|
||||||
|
"auth_events": [],
|
||||||
|
"prev_events": [(most_recent, {})],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
d = self.handler.on_receive_pdu(
|
||||||
|
"test.serv", good_event, sent_to_us_directly=True
|
||||||
|
)
|
||||||
|
self.reactor.advance(1)
|
||||||
|
self.assertEqual(self.successResultOf(d), None)
|
||||||
|
|
||||||
|
bad_event = FrozenEvent(
|
||||||
|
{
|
||||||
|
"room_id": self.room_id,
|
||||||
|
"sender": "@baduser:test.serv",
|
||||||
|
"event_id": "two:test.serv",
|
||||||
|
"depth": 1000,
|
||||||
|
"origin_server_ts": 1,
|
||||||
|
"type": "m.room.message",
|
||||||
|
"origin": "test.serv",
|
||||||
|
"content": "hewwo?",
|
||||||
|
"auth_events": [],
|
||||||
|
"prev_events": [("one:test.serv", {}), ("three:test.serv", {})],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
d = self.handler.on_receive_pdu(
|
||||||
|
"test.serv", bad_event, sent_to_us_directly=True
|
||||||
|
)
|
||||||
|
self.reactor.advance(1)
|
||||||
|
|
||||||
|
extrem = self.homeserver.datastore.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
self.assertEqual(self.successResultOf(extrem)[0], "two:test.serv")
|
||||||
|
|
||||||
|
state = self.homeserver.get_state_handler().get_current_state_ids(self.room_id)
|
||||||
|
self.reactor.advance(1)
|
||||||
|
self.assertIn(("m.room.member", "@us:test"), self.successResultOf(state).keys())
|
|
@ -35,7 +35,7 @@ class ToTwistedHandler(logging.Handler):
|
||||||
def emit(self, record):
|
def emit(self, record):
|
||||||
log_entry = self.format(record)
|
log_entry = self.format(record)
|
||||||
log_level = record.levelname.lower().replace('warning', 'warn')
|
log_level = record.levelname.lower().replace('warning', 'warn')
|
||||||
self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry)
|
self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry.replace("{", r"(").replace("}", r")"))
|
||||||
|
|
||||||
|
|
||||||
handler = ToTwistedHandler()
|
handler = ToTwistedHandler()
|
||||||
|
|
Loading…
Reference in New Issue