Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

pull/8675/head
Erik Johnston 2020-10-14 13:32:07 +01:00
commit 9250ee8650
15 changed files with 63 additions and 22 deletions

1
changelog.d/8517.bugfix Normal file
View File

@ -0,0 +1 @@
Fix error code for `/profile/{userId}/displayname` to be `M_BAD_JSON`.

1
changelog.d/8527.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in v1.7.0 that could cause Synapse to insert values from non-state `m.room.retention` events into the `room_retention` database table.

1
changelog.d/8536.bugfix Normal file
View File

@ -0,0 +1 @@
Fix not sending events over federation when using sharded event writers.

View File

@ -790,10 +790,6 @@ class FederationSenderHandler:
send_queue.process_rows_for_federation(self.federation_sender, rows) send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token) await self.update_token(token)
# We also need to poke the federation sender when new events happen
elif stream_name == "events":
self.federation_sender.notify_new_events(token)
# ... and when new receipts happen # ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME: elif stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows) await self._on_new_receipts(rows)

View File

@ -83,6 +83,9 @@ class EventValidator:
Args: Args:
event (FrozenEvent): The event to validate. event (FrozenEvent): The event to validate.
""" """
if not event.is_state():
raise SynapseError(code=400, msg="must be a state event")
min_lifetime = event.content.get("min_lifetime") min_lifetime = event.content.get("min_lifetime")
max_lifetime = event.content.get("max_lifetime") max_lifetime = event.content.get("max_lifetime")

View File

@ -188,7 +188,7 @@ class FederationRemoteSendQueue:
for key in keys[:i]: for key in keys[:i]:
del self.edus[key] del self.edus[key]
def notify_new_events(self, current_id): def notify_new_events(self, max_token):
"""As per FederationSender""" """As per FederationSender"""
# We don't need to replicate this as it gets sent down a different # We don't need to replicate this as it gets sent down a different
# stream. # stream.

View File

@ -40,7 +40,7 @@ from synapse.metrics import (
events_processed_counter, events_processed_counter,
) )
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt from synapse.types import ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -154,10 +154,15 @@ class FederationSender:
self._per_destination_queues[destination] = queue self._per_destination_queues[destination] = queue
return queue return queue
def notify_new_events(self, current_id: int) -> None: def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to """This gets called when we have some new events we might want to
send out to other servers. send out to other servers.
""" """
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
current_id = max_token.stream
self._last_poked_id = max(current_id, self._last_poked_id) self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing: if self._is_processing:

View File

@ -27,6 +27,7 @@ from synapse.metrics import (
event_processing_loop_room_count, event_processing_loop_room_count,
) )
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -47,15 +48,17 @@ class ApplicationServicesHandler:
self.current_max = 0 self.current_max = 0
self.is_processing = False self.is_processing = False
async def notify_interested_services(self, current_id): async def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event. """Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any Pushing is done asynchronously, so this method won't block for any
prolonged length of time. prolonged length of time.
Args:
current_id(int): The current maximum ID.
""" """
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
current_id = max_token.stream
services = self.store.get_app_services() services = self.store.get_app_services()
if not services or not self.notify_appservices: if not services or not self.notify_appservices:
return return

View File

@ -319,19 +319,19 @@ class Notifier:
) )
if self.federation_sender: if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token.stream) self.federation_sender.notify_new_events(max_room_stream_token)
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try: try:
await self.appservice_handler.notify_interested_services( await self.appservice_handler.notify_interested_services(
max_room_stream_token.stream max_room_stream_token
) )
except Exception: except Exception:
logger.exception("Error notifying application services of event") logger.exception("Error notifying application services of event")
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try: try:
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream) await self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception: except Exception:
logger.exception("Error pusher pool of event") logger.exception("Error pusher pool of event")

View File

@ -18,6 +18,7 @@ import logging
from twisted.internet.error import AlreadyCalled, AlreadyCancelled from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -91,7 +92,12 @@ class EmailPusher:
pass pass
self.timed_call = None self.timed_call = None
def on_new_notifications(self, max_stream_ordering): def on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream
if self.max_stream_ordering: if self.max_stream_ordering:
self.max_stream_ordering = max( self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering max_stream_ordering, self.max_stream_ordering

View File

@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.logging import opentracing from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException from synapse.push import PusherConfigException
from synapse.types import RoomStreamToken
from . import push_rule_evaluator, push_tools from . import push_rule_evaluator, push_tools
@ -118,7 +119,12 @@ class HttpPusher:
if should_check_for_notifs: if should_check_for_notifs:
self._start_processing() self._start_processing()
def on_new_notifications(self, max_stream_ordering): def on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream
self.max_stream_ordering = max( self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0 max_stream_ordering, self.max_stream_ordering or 0
) )

View File

@ -24,6 +24,7 @@ from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import concurrently_execute
if TYPE_CHECKING: if TYPE_CHECKING:
@ -186,11 +187,16 @@ class PusherPool:
) )
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
async def on_new_notifications(self, max_stream_id: int): async def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers: if not self.pushers:
# nothing to do here. # nothing to do here.
return return
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_id = max_token.stream
if max_stream_id < self._last_room_stream_id_seen: if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do # Nothing to do
return return
@ -214,7 +220,7 @@ class PusherPool:
if u in self.pushers: if u in self.pushers:
for p in self.pushers[u].values(): for p in self.pushers[u].values():
p.on_new_notifications(max_stream_id) p.on_new_notifications(max_token)
except Exception: except Exception:
logger.exception("Exception in pusher on_new_notifications") logger.exception("Exception in pusher on_new_notifications")

View File

@ -59,7 +59,9 @@ class ProfileDisplaynameRestServlet(RestServlet):
try: try:
new_name = content["displayname"] new_name = content["displayname"]
except Exception: except Exception:
return 400, "Unable to parse name" raise SynapseError(
code=400, msg="Unable to parse name", errcode=Codes.BAD_JSON,
)
await self.profile_handler.set_displayname(user, requester, new_name, is_admin) await self.profile_handler.set_displayname(user, requester, new_name, is_admin)

View File

@ -1270,6 +1270,10 @@ class PersistEventsStore:
) )
def _store_retention_policy_for_room_txn(self, txn, event): def _store_retention_policy_for_room_txn(self, txn, event):
if not event.is_state():
logger.debug("Ignoring non-state m.room.retention event")
return
if hasattr(event, "content") and ( if hasattr(event, "content") and (
"min_lifetime" in event.content or "max_lifetime" in event.content "min_lifetime" in event.content or "max_lifetime" in event.content
): ):

View File

@ -18,6 +18,7 @@ from mock import Mock
from twisted.internet import defer from twisted.internet import defer
from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.types import RoomStreamToken
from tests.test_utils import make_awaitable from tests.test_utils import make_awaitable
from tests.utils import MockClock from tests.utils import MockClock
@ -61,7 +62,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])), defer.succeed((0, [event])),
defer.succeed((0, [])), defer.succeed((0, [])),
] ]
yield defer.ensureDeferred(self.handler.notify_interested_services(0)) yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.mock_scheduler.submit_event_for_as.assert_called_once_with( self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event interested_service, event
) )
@ -80,7 +83,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])), defer.succeed((0, [event])),
defer.succeed((0, [])), defer.succeed((0, [])),
] ]
yield defer.ensureDeferred(self.handler.notify_interested_services(0)) yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -97,7 +102,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])), defer.succeed((0, [event])),
defer.succeed((0, [])), defer.succeed((0, [])),
] ]
yield defer.ensureDeferred(self.handler.notify_interested_services(0)) yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.assertFalse( self.assertFalse(
self.mock_as_api.query_user.called, self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been.", "query_user called when it shouldn't have been.",