Merge pull request #510 from matrix-org/daniel/nonguestpeeking
Allow non-guests to peek on rooms using /eventspull/513/head
commit
a2ae01cc0f
|
@ -84,7 +84,7 @@ class BaseHandler(object):
|
|||
row["event_id"] for rows in forgotten for row in rows
|
||||
)
|
||||
|
||||
def allowed(event, user_id, is_guest):
|
||||
def allowed(event, user_id, is_peeking):
|
||||
state = event_id_to_state[event.event_id]
|
||||
|
||||
visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
|
||||
|
@ -96,7 +96,7 @@ class BaseHandler(object):
|
|||
if visibility == "world_readable":
|
||||
return True
|
||||
|
||||
if is_guest:
|
||||
if is_peeking:
|
||||
return False
|
||||
|
||||
membership_event = state.get((EventTypes.Member, user_id), None)
|
||||
|
@ -112,7 +112,7 @@ class BaseHandler(object):
|
|||
return True
|
||||
|
||||
if event.type == EventTypes.RoomHistoryVisibility:
|
||||
return not is_guest
|
||||
return not is_peeking
|
||||
|
||||
if visibility == "shared":
|
||||
return True
|
||||
|
@ -127,15 +127,15 @@ class BaseHandler(object):
|
|||
user_id: [
|
||||
event
|
||||
for event in events
|
||||
if allowed(event, user_id, is_guest)
|
||||
if allowed(event, user_id, is_peeking)
|
||||
]
|
||||
for user_id, is_guest in user_tuples
|
||||
for user_id, is_peeking in user_tuples
|
||||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _filter_events_for_client(self, user_id, events, is_guest=False):
|
||||
def _filter_events_for_client(self, user_id, events, is_peeking=False):
|
||||
# Assumes that user has at some point joined the room if not is_guest.
|
||||
res = yield self._filter_events_for_clients([(user_id, is_guest)], events)
|
||||
res = yield self._filter_events_for_clients([(user_id, is_peeking)], events)
|
||||
defer.returnValue(res.get(user_id, []))
|
||||
|
||||
def ratelimit(self, user_id):
|
||||
|
|
|
@ -135,7 +135,7 @@ class EventStreamHandler(BaseHandler):
|
|||
events, tokens = yield self.notifier.get_events_for(
|
||||
auth_user, pagin_config, timeout,
|
||||
only_room_events=only_room_events,
|
||||
is_guest=is_guest, guest_room_id=room_id
|
||||
is_guest=is_guest, explicit_room_id=room_id
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
|
|
@ -78,21 +78,20 @@ class MessageHandler(BaseHandler):
|
|||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
|
||||
as_client_event=True, is_guest=False):
|
||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||
as_client_event=True):
|
||||
"""Get messages in a room.
|
||||
|
||||
Args:
|
||||
user_id (str): The user requesting messages.
|
||||
requester (Requester): The user requesting messages.
|
||||
room_id (str): The room they want messages from.
|
||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||
config rules to apply, if any.
|
||||
as_client_event (bool): True to get events in client-server format.
|
||||
is_guest (bool): Whether the requesting user is a guest (as opposed
|
||||
to a fully registered user).
|
||||
Returns:
|
||||
dict: Pagination API results
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
data_source = self.hs.get_event_sources().sources["room"]
|
||||
|
||||
if pagin_config.from_token:
|
||||
|
@ -115,36 +114,33 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
source_config = pagin_config.get_source_config("room")
|
||||
|
||||
if not is_guest:
|
||||
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
|
||||
if member_event.membership == Membership.LEAVE:
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room.
|
||||
# If they're a guest, we'll just 403 them if they're asking for
|
||||
# events they can't see.
|
||||
leave_token = yield self.store.get_topological_token_for_event(
|
||||
member_event.event_id
|
||||
)
|
||||
leave_token = RoomStreamToken.parse(leave_token)
|
||||
if leave_token.topological < room_token.topological:
|
||||
source_config.from_key = str(leave_token)
|
||||
membership, member_event_id = yield self._check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
if membership == Membership.LEAVE:
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room.
|
||||
leave_token = yield self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
leave_token = RoomStreamToken.parse(leave_token)
|
||||
if leave_token.topological < room_token.topological:
|
||||
source_config.from_key = str(leave_token)
|
||||
|
||||
if source_config.direction == "f":
|
||||
if source_config.to_key is None:
|
||||
if source_config.direction == "f":
|
||||
if source_config.to_key is None:
|
||||
source_config.to_key = str(leave_token)
|
||||
else:
|
||||
to_token = RoomStreamToken.parse(source_config.to_key)
|
||||
if leave_token.topological < to_token.topological:
|
||||
source_config.to_key = str(leave_token)
|
||||
else:
|
||||
to_token = RoomStreamToken.parse(source_config.to_key)
|
||||
if leave_token.topological < to_token.topological:
|
||||
source_config.to_key = str(leave_token)
|
||||
|
||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||
room_id, room_token.topological
|
||||
)
|
||||
|
||||
user = UserID.from_string(user_id)
|
||||
|
||||
events, next_key = yield data_source.get_pagination_rows(
|
||||
user, source_config, room_id
|
||||
requester.user, source_config, room_id
|
||||
)
|
||||
|
||||
next_token = pagin_config.from_token.copy_and_replace(
|
||||
|
@ -158,7 +154,11 @@ class MessageHandler(BaseHandler):
|
|||
"end": next_token.to_string(),
|
||||
})
|
||||
|
||||
events = yield self._filter_events_for_client(user_id, events, is_guest=is_guest)
|
||||
events = yield self._filter_events_for_client(
|
||||
user_id,
|
||||
events,
|
||||
is_peeking=(member_event_id is None),
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
|
@ -289,7 +289,7 @@ class MessageHandler(BaseHandler):
|
|||
SynapseError if something went wrong.
|
||||
"""
|
||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
||||
room_id, user_id, is_guest
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
|
@ -306,7 +306,7 @@ class MessageHandler(BaseHandler):
|
|||
defer.returnValue(data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_in_room_or_world_readable(self, room_id, user_id, is_guest):
|
||||
def _check_in_room_or_world_readable(self, room_id, user_id):
|
||||
try:
|
||||
# check_user_was_in_room will return the most recent membership
|
||||
# event for the user if:
|
||||
|
@ -316,7 +316,7 @@ class MessageHandler(BaseHandler):
|
|||
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
|
||||
defer.returnValue((member_event.membership, member_event.event_id))
|
||||
return
|
||||
except AuthError, auth_error:
|
||||
except AuthError:
|
||||
visibility = yield self.state_handler.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
)
|
||||
|
@ -326,8 +326,6 @@ class MessageHandler(BaseHandler):
|
|||
):
|
||||
defer.returnValue((Membership.JOIN, None))
|
||||
return
|
||||
if not is_guest:
|
||||
raise auth_error
|
||||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
)
|
||||
|
@ -345,7 +343,7 @@ class MessageHandler(BaseHandler):
|
|||
A list of dicts representing state events. [{}, {}, {}]
|
||||
"""
|
||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
||||
room_id, user_id, is_guest
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
|
@ -556,13 +554,13 @@ class MessageHandler(BaseHandler):
|
|||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False):
|
||||
def room_initial_sync(self, requester, room_id, pagin_config=None):
|
||||
"""Capture the a snapshot of a room. If user is currently a member of
|
||||
the room this will be what is currently in the room. If the user left
|
||||
the room this will be what was in the room when they left.
|
||||
|
||||
Args:
|
||||
user_id(str): The user to get a snapshot for.
|
||||
requester(Requester): The user to get a snapshot for.
|
||||
room_id(str): The room to get a snapshot of.
|
||||
pagin_config(synapse.streams.config.PaginationConfig):
|
||||
The pagination config used to determine how many messages to
|
||||
|
@ -573,19 +571,20 @@ class MessageHandler(BaseHandler):
|
|||
A JSON serialisable dict with the snapshot of the room.
|
||||
"""
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
membership, member_event_id = yield self._check_in_room_or_world_readable(
|
||||
room_id,
|
||||
user_id,
|
||||
is_guest
|
||||
room_id, user_id,
|
||||
)
|
||||
is_peeking = member_event_id is None
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
result = yield self._room_initial_sync_joined(
|
||||
user_id, room_id, pagin_config, membership, is_guest
|
||||
user_id, room_id, pagin_config, membership, is_peeking
|
||||
)
|
||||
elif membership == Membership.LEAVE:
|
||||
result = yield self._room_initial_sync_parted(
|
||||
user_id, room_id, pagin_config, membership, member_event_id, is_guest
|
||||
user_id, room_id, pagin_config, membership, member_event_id, is_peeking
|
||||
)
|
||||
|
||||
account_data_events = []
|
||||
|
@ -609,7 +608,7 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
|
||||
membership, member_event_id, is_guest):
|
||||
membership, member_event_id, is_peeking):
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[member_event_id], None
|
||||
)
|
||||
|
@ -631,7 +630,7 @@ class MessageHandler(BaseHandler):
|
|||
)
|
||||
|
||||
messages = yield self._filter_events_for_client(
|
||||
user_id, messages, is_guest=is_guest
|
||||
user_id, messages, is_peeking=is_peeking
|
||||
)
|
||||
|
||||
start_token = StreamToken(token[0], 0, 0, 0, 0)
|
||||
|
@ -654,7 +653,7 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
|
||||
membership, is_guest):
|
||||
membership, is_peeking):
|
||||
current_state = yield self.state.get_current_state(
|
||||
room_id=room_id,
|
||||
)
|
||||
|
@ -718,7 +717,7 @@ class MessageHandler(BaseHandler):
|
|||
).addErrback(unwrapFirstError)
|
||||
|
||||
messages = yield self._filter_events_for_client(
|
||||
user_id, messages, is_guest=is_guest,
|
||||
user_id, messages, is_peeking=is_peeking,
|
||||
)
|
||||
|
||||
start_token = now_token.copy_and_replace("room_key", token[0])
|
||||
|
@ -737,7 +736,7 @@ class MessageHandler(BaseHandler):
|
|||
"presence": presence,
|
||||
"receipts": receipts,
|
||||
}
|
||||
if not is_guest:
|
||||
if not is_peeking:
|
||||
ret["membership"] = membership
|
||||
|
||||
defer.returnValue(ret)
|
||||
|
|
|
@ -936,7 +936,7 @@ class RoomContextHandler(BaseHandler):
|
|||
return self._filter_events_for_client(
|
||||
user.to_string(),
|
||||
events,
|
||||
is_guest=is_guest)
|
||||
is_peeking=is_guest)
|
||||
|
||||
event = yield self.store.get_event(event_id, get_prev_content=True,
|
||||
allow_none=True)
|
||||
|
|
|
@ -150,7 +150,7 @@ class SyncHandler(BaseHandler):
|
|||
return self.current_sync_for_user(sync_config, since_token)
|
||||
|
||||
result = yield self.notifier.wait_for_events(
|
||||
sync_config.user, timeout, current_sync_callback,
|
||||
sync_config.user.to_string(), timeout, current_sync_callback,
|
||||
from_token=since_token
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
@ -640,7 +640,7 @@ class SyncHandler(BaseHandler):
|
|||
loaded_recents = yield self._filter_events_for_client(
|
||||
sync_config.user.to_string(),
|
||||
loaded_recents,
|
||||
is_guest=sync_config.is_guest,
|
||||
is_peeking=sync_config.is_guest,
|
||||
)
|
||||
loaded_recents.extend(recents)
|
||||
recents = loaded_recents
|
||||
|
|
|
@ -63,9 +63,9 @@ class _NotifierUserStream(object):
|
|||
so that it can remove itself from the indexes in the Notifier class.
|
||||
"""
|
||||
|
||||
def __init__(self, user, rooms, current_token, time_now_ms,
|
||||
def __init__(self, user_id, rooms, current_token, time_now_ms,
|
||||
appservice=None):
|
||||
self.user = str(user)
|
||||
self.user_id = user_id
|
||||
self.appservice = appservice
|
||||
self.rooms = set(rooms)
|
||||
self.current_token = current_token
|
||||
|
@ -98,7 +98,7 @@ class _NotifierUserStream(object):
|
|||
lst = notifier.room_to_user_streams.get(room, set())
|
||||
lst.discard(self)
|
||||
|
||||
notifier.user_to_user_stream.pop(self.user)
|
||||
notifier.user_to_user_stream.pop(self.user_id)
|
||||
|
||||
if self.appservice:
|
||||
notifier.appservice_to_user_streams.get(
|
||||
|
@ -271,21 +271,20 @@ class Notifier(object):
|
|||
logger.exception("Failed to notify listener")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_events(self, user, timeout, callback, room_ids=None,
|
||||
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
|
||||
from_token=StreamToken("s0", "0", "0", "0", "0")):
|
||||
"""Wait until the callback returns a non empty response or the
|
||||
timeout fires.
|
||||
"""
|
||||
user = str(user)
|
||||
user_stream = self.user_to_user_stream.get(user)
|
||||
user_stream = self.user_to_user_stream.get(user_id)
|
||||
if user_stream is None:
|
||||
appservice = yield self.store.get_app_service_by_user_id(user)
|
||||
appservice = yield self.store.get_app_service_by_user_id(user_id)
|
||||
current_token = yield self.event_sources.get_current_token()
|
||||
if room_ids is None:
|
||||
rooms = yield self.store.get_rooms_for_user(user)
|
||||
rooms = yield self.store.get_rooms_for_user(user_id)
|
||||
room_ids = [room.room_id for room in rooms]
|
||||
user_stream = _NotifierUserStream(
|
||||
user=user,
|
||||
user_id=user_id,
|
||||
rooms=room_ids,
|
||||
appservice=appservice,
|
||||
current_token=current_token,
|
||||
|
@ -333,12 +332,17 @@ class Notifier(object):
|
|||
@defer.inlineCallbacks
|
||||
def get_events_for(self, user, pagination_config, timeout,
|
||||
only_room_events=False,
|
||||
is_guest=False, guest_room_id=None):
|
||||
is_guest=False, explicit_room_id=None):
|
||||
""" For the given user and rooms, return any new events for them. If
|
||||
there are no new events wait for up to `timeout` milliseconds for any
|
||||
new events to happen before returning.
|
||||
|
||||
If `only_room_events` is `True` only room events will be returned.
|
||||
|
||||
If explicit_room_id is not set, the user's joined rooms will be polled
|
||||
for events.
|
||||
If explicit_room_id is set, that room will be polled for events only if
|
||||
it is world readable or the user has joined the room.
|
||||
"""
|
||||
from_token = pagination_config.from_token
|
||||
if not from_token:
|
||||
|
@ -346,15 +350,8 @@ class Notifier(object):
|
|||
|
||||
limit = pagination_config.limit
|
||||
|
||||
room_ids = []
|
||||
if is_guest:
|
||||
if guest_room_id:
|
||||
if not (yield self._is_world_readable(guest_room_id)):
|
||||
raise AuthError(403, "Guest access not allowed")
|
||||
room_ids = [guest_room_id]
|
||||
else:
|
||||
rooms = yield self.store.get_rooms_for_user(user.to_string())
|
||||
room_ids = [room.room_id for room in rooms]
|
||||
room_ids, is_joined = yield self._get_room_ids(user, explicit_room_id)
|
||||
is_peeking = not is_joined
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_for_updates(before_token, after_token):
|
||||
|
@ -376,7 +373,7 @@ class Notifier(object):
|
|||
user=user,
|
||||
from_key=getattr(from_token, keyname),
|
||||
limit=limit,
|
||||
is_guest=is_guest,
|
||||
is_guest=is_peeking,
|
||||
room_ids=room_ids,
|
||||
)
|
||||
|
||||
|
@ -385,7 +382,7 @@ class Notifier(object):
|
|||
new_events = yield room_member_handler._filter_events_for_client(
|
||||
user.to_string(),
|
||||
new_events,
|
||||
is_guest=is_guest,
|
||||
is_peeking=is_peeking,
|
||||
)
|
||||
|
||||
events.extend(new_events)
|
||||
|
@ -396,8 +393,24 @@ class Notifier(object):
|
|||
else:
|
||||
defer.returnValue(None)
|
||||
|
||||
user_id_for_stream = user.to_string()
|
||||
if is_peeking:
|
||||
# Internally, the notifier keeps an event stream per user_id.
|
||||
# This is used by both /sync and /events.
|
||||
# We want /events to be used for peeking independently of /sync,
|
||||
# without polluting its contents. So we invent an illegal user ID
|
||||
# (which thus cannot clash with any real users) for keying peeking
|
||||
# over /events.
|
||||
#
|
||||
# I am sorry for what I have done.
|
||||
user_id_for_stream = "_PEEKING_" + user_id_for_stream
|
||||
|
||||
result = yield self.wait_for_events(
|
||||
user, timeout, check_for_updates, room_ids=room_ids, from_token=from_token
|
||||
user_id_for_stream,
|
||||
timeout,
|
||||
check_for_updates,
|
||||
room_ids=room_ids,
|
||||
from_token=from_token,
|
||||
)
|
||||
|
||||
if result is None:
|
||||
|
@ -405,6 +418,18 @@ class Notifier(object):
|
|||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_room_ids(self, user, explicit_room_id):
|
||||
joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
|
||||
joined_room_ids = map(lambda r: r.room_id, joined_rooms)
|
||||
if explicit_room_id:
|
||||
if explicit_room_id in joined_room_ids:
|
||||
defer.returnValue(([explicit_room_id], True))
|
||||
if (yield self._is_world_readable(explicit_room_id)):
|
||||
defer.returnValue(([explicit_room_id], False))
|
||||
raise AuthError(403, "Non-joined access not allowed")
|
||||
defer.returnValue((joined_room_ids, True))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_world_readable(self, room_id):
|
||||
state = yield self.hs.get_state_handler().get_current_state(
|
||||
|
@ -432,7 +457,7 @@ class Notifier(object):
|
|||
|
||||
@log_function
|
||||
def _register_with_keys(self, user_stream):
|
||||
self.user_to_user_stream[user_stream.user] = user_stream
|
||||
self.user_to_user_stream[user_stream.user_id] = user_stream
|
||||
|
||||
for room in user_stream.rooms:
|
||||
s = self.room_to_user_streams.setdefault(room, set())
|
||||
|
|
|
@ -43,6 +43,7 @@ class EventStreamRestServlet(ClientV1RestServlet):
|
|||
if is_guest:
|
||||
if "room_id" not in request.args:
|
||||
raise SynapseError(400, "Guest users must specify room_id param")
|
||||
if "room_id" in request.args:
|
||||
room_id = request.args["room_id"][0]
|
||||
try:
|
||||
handler = self.handlers.event_stream_handler
|
||||
|
|
|
@ -348,8 +348,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
|
|||
handler = self.handlers.message_handler
|
||||
msgs = yield handler.get_messages(
|
||||
room_id=room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
is_guest=requester.is_guest,
|
||||
requester=requester,
|
||||
pagin_config=pagination_config,
|
||||
as_client_event=as_client_event
|
||||
)
|
||||
|
@ -384,9 +383,8 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet):
|
|||
pagination_config = PaginationConfig.from_request(request)
|
||||
content = yield self.handlers.message_handler.room_initial_sync(
|
||||
room_id=room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
requester=requester,
|
||||
pagin_config=pagination_config,
|
||||
is_guest=requester.is_guest,
|
||||
)
|
||||
defer.returnValue((200, content))
|
||||
|
||||
|
|
Loading…
Reference in New Issue