Merge pull request #2443 from matrix-org/erikj/rejoin_device_lists
Send down device list change notif when member leaves/rejoins roompull/2451/head
commit
993d3f710b
|
@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler):
|
||||||
user_id (str)
|
user_id (str)
|
||||||
from_token (StreamToken)
|
from_token (StreamToken)
|
||||||
"""
|
"""
|
||||||
|
now_token = yield self.hs.get_event_sources().get_current_token()
|
||||||
|
|
||||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
|
||||||
# First we check if any devices have changed
|
# First we check if any devices have changed
|
||||||
|
@ -280,11 +282,24 @@ class DeviceHandler(BaseHandler):
|
||||||
# Then work out if any users have since joined
|
# Then work out if any users have since joined
|
||||||
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
|
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
|
||||||
|
|
||||||
|
member_events = yield self.store.get_membership_changes_for_user(
|
||||||
|
user_id, from_token.room_key, now_token.room_key
|
||||||
|
)
|
||||||
|
rooms_changed.update(event.room_id for event in member_events)
|
||||||
|
|
||||||
stream_ordering = RoomStreamToken.parse_stream_token(
|
stream_ordering = RoomStreamToken.parse_stream_token(
|
||||||
from_token.room_key).stream
|
from_token.room_key
|
||||||
|
).stream
|
||||||
|
|
||||||
possibly_changed = set(changed)
|
possibly_changed = set(changed)
|
||||||
|
possibly_left_rooms = set()
|
||||||
for room_id in rooms_changed:
|
for room_id in rooms_changed:
|
||||||
|
# The user may have left the room
|
||||||
|
# TODO: Check if they actually did or if we were just invited.
|
||||||
|
if room_id not in room_ids:
|
||||||
|
possibly_left_rooms.add(room_id)
|
||||||
|
continue
|
||||||
|
|
||||||
# Fetch the current state at the time.
|
# Fetch the current state at the time.
|
||||||
try:
|
try:
|
||||||
event_ids = yield self.store.get_forward_extremeties_for_room(
|
event_ids = yield self.store.get_forward_extremeties_for_room(
|
||||||
|
@ -307,9 +322,25 @@ class DeviceHandler(BaseHandler):
|
||||||
possibly_changed.add(state_key)
|
possibly_changed.add(state_key)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
current_member_id = current_state_ids.get((EventTypes.Member, user_id))
|
||||||
|
if not current_member_id:
|
||||||
|
continue
|
||||||
|
|
||||||
# mapping from event_id -> state_dict
|
# mapping from event_id -> state_dict
|
||||||
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
|
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
|
||||||
|
|
||||||
|
# Check if we've joined the room? If so we just blindly add all the users to
|
||||||
|
# the "possibly changed" users.
|
||||||
|
for state_dict in prev_state_ids.itervalues():
|
||||||
|
member_event = state_dict.get((EventTypes.Member, user_id), None)
|
||||||
|
if not member_event or member_event != current_member_id:
|
||||||
|
for key, event_id in current_state_ids.iteritems():
|
||||||
|
etype, state_key = key
|
||||||
|
if etype != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
possibly_changed.add(state_key)
|
||||||
|
break
|
||||||
|
|
||||||
# If there has been any change in membership, include them in the
|
# If there has been any change in membership, include them in the
|
||||||
# possibly changed list. We'll check if they are joined below,
|
# possibly changed list. We'll check if they are joined below,
|
||||||
# and we're not toooo worried about spuriously adding users.
|
# and we're not toooo worried about spuriously adding users.
|
||||||
|
@ -320,19 +351,35 @@ class DeviceHandler(BaseHandler):
|
||||||
|
|
||||||
# check if this member has changed since any of the extremities
|
# check if this member has changed since any of the extremities
|
||||||
# at the stream_ordering, and add them to the list if so.
|
# at the stream_ordering, and add them to the list if so.
|
||||||
for state_dict in prev_state_ids.values():
|
for state_dict in prev_state_ids.itervalues():
|
||||||
prev_event_id = state_dict.get(key, None)
|
prev_event_id = state_dict.get(key, None)
|
||||||
if not prev_event_id or prev_event_id != event_id:
|
if not prev_event_id or prev_event_id != event_id:
|
||||||
possibly_changed.add(state_key)
|
possibly_changed.add(state_key)
|
||||||
|
if state_key == user_id:
|
||||||
|
for key, event_id in current_state_ids.iteritems():
|
||||||
|
etype, state_key = key
|
||||||
|
if etype != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
possibly_changed.add(room_id)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
if possibly_changed:
|
||||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||||
user_id
|
user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# Take the intersection of the users whose devices may have changed
|
# Take the intersection of the users whose devices may have changed
|
||||||
# and those that actually still share a room with the user
|
# and those that actually still share a room with the user
|
||||||
defer.returnValue(users_who_share_room & possibly_changed)
|
possibly_joined = possibly_changed & users_who_share_room
|
||||||
|
possibly_left = possibly_changed - users_who_share_room
|
||||||
|
else:
|
||||||
|
possibly_joined = []
|
||||||
|
possibly_left = []
|
||||||
|
|
||||||
|
defer.returnValue({
|
||||||
|
"changed": list(possibly_joined),
|
||||||
|
"left": list(possibly_left),
|
||||||
|
})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_federation_query_user_devices(self, user_id):
|
def on_federation_query_user_devices(self, user_id):
|
||||||
|
|
|
@ -108,6 +108,16 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceLists(collections.namedtuple("DeviceLists", [
|
||||||
|
"changed", # list of user_ids whose devices may have changed
|
||||||
|
"left", # list of user_ids whose devices we no longer track
|
||||||
|
])):
|
||||||
|
__slots__ = []
|
||||||
|
|
||||||
|
def __nonzero__(self):
|
||||||
|
return bool(self.changed or self.left)
|
||||||
|
|
||||||
|
|
||||||
class SyncResult(collections.namedtuple("SyncResult", [
|
class SyncResult(collections.namedtuple("SyncResult", [
|
||||||
"next_batch", # Token for the next sync
|
"next_batch", # Token for the next sync
|
||||||
"presence", # List of presence events for the user.
|
"presence", # List of presence events for the user.
|
||||||
|
@ -535,7 +545,8 @@ class SyncHandler(object):
|
||||||
res = yield self._generate_sync_entry_for_rooms(
|
res = yield self._generate_sync_entry_for_rooms(
|
||||||
sync_result_builder, account_data_by_room
|
sync_result_builder, account_data_by_room
|
||||||
)
|
)
|
||||||
newly_joined_rooms, newly_joined_users = res
|
newly_joined_rooms, newly_joined_users, _, _ = res
|
||||||
|
_, _, newly_left_rooms, newly_left_users = res
|
||||||
|
|
||||||
block_all_presence_data = (
|
block_all_presence_data = (
|
||||||
since_token is None and
|
since_token is None and
|
||||||
|
@ -549,7 +560,11 @@ class SyncHandler(object):
|
||||||
yield self._generate_sync_entry_for_to_device(sync_result_builder)
|
yield self._generate_sync_entry_for_to_device(sync_result_builder)
|
||||||
|
|
||||||
device_lists = yield self._generate_sync_entry_for_device_list(
|
device_lists = yield self._generate_sync_entry_for_device_list(
|
||||||
sync_result_builder
|
sync_result_builder,
|
||||||
|
newly_joined_rooms=newly_joined_rooms,
|
||||||
|
newly_joined_users=newly_joined_users,
|
||||||
|
newly_left_rooms=newly_left_rooms,
|
||||||
|
newly_left_users=newly_left_users,
|
||||||
)
|
)
|
||||||
|
|
||||||
device_id = sync_config.device_id
|
device_id = sync_config.device_id
|
||||||
|
@ -574,7 +589,9 @@ class SyncHandler(object):
|
||||||
|
|
||||||
@measure_func("_generate_sync_entry_for_device_list")
|
@measure_func("_generate_sync_entry_for_device_list")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _generate_sync_entry_for_device_list(self, sync_result_builder):
|
def _generate_sync_entry_for_device_list(self, sync_result_builder,
|
||||||
|
newly_joined_rooms, newly_joined_users,
|
||||||
|
newly_left_rooms, newly_left_users):
|
||||||
user_id = sync_result_builder.sync_config.user.to_string()
|
user_id = sync_result_builder.sync_config.user.to_string()
|
||||||
since_token = sync_result_builder.since_token
|
since_token = sync_result_builder.since_token
|
||||||
|
|
||||||
|
@ -582,16 +599,40 @@ class SyncHandler(object):
|
||||||
changed = yield self.store.get_user_whose_devices_changed(
|
changed = yield self.store.get_user_whose_devices_changed(
|
||||||
since_token.device_list_key
|
since_token.device_list_key
|
||||||
)
|
)
|
||||||
if not changed:
|
|
||||||
defer.returnValue([])
|
# TODO: Be more clever than this, i.e. remove users who we already
|
||||||
|
# share a room with?
|
||||||
|
for room_id in newly_joined_rooms:
|
||||||
|
joined_users = yield self.state.get_current_user_in_room(room_id)
|
||||||
|
newly_joined_users.update(joined_users)
|
||||||
|
|
||||||
|
for room_id in newly_left_rooms:
|
||||||
|
left_users = yield self.state.get_current_user_in_room(room_id)
|
||||||
|
newly_left_users.update(left_users)
|
||||||
|
|
||||||
|
# TODO: Check that these users are actually new, i.e. either they
|
||||||
|
# weren't in the previous sync *or* they left and rejoined.
|
||||||
|
changed.update(newly_joined_users)
|
||||||
|
|
||||||
|
if not changed and not newly_left_users:
|
||||||
|
defer.returnValue(DeviceLists(
|
||||||
|
changed=[],
|
||||||
|
left=newly_left_users,
|
||||||
|
))
|
||||||
|
|
||||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||||
user_id
|
user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue(users_who_share_room & changed)
|
defer.returnValue(DeviceLists(
|
||||||
|
changed=users_who_share_room & changed,
|
||||||
|
left=set(newly_left_users) - users_who_share_room,
|
||||||
|
))
|
||||||
else:
|
else:
|
||||||
defer.returnValue([])
|
defer.returnValue(DeviceLists(
|
||||||
|
changed=[],
|
||||||
|
left=[],
|
||||||
|
))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _generate_sync_entry_for_to_device(self, sync_result_builder):
|
def _generate_sync_entry_for_to_device(self, sync_result_builder):
|
||||||
|
@ -755,8 +796,8 @@ class SyncHandler(object):
|
||||||
account_data_by_room(dict): Dictionary of per room account data
|
account_data_by_room(dict): Dictionary of per room account data
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred(tuple): Returns a 2-tuple of
|
Deferred(tuple): Returns a 4-tuple of
|
||||||
`(newly_joined_rooms, newly_joined_users)`
|
`(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
|
||||||
"""
|
"""
|
||||||
user_id = sync_result_builder.sync_config.user.to_string()
|
user_id = sync_result_builder.sync_config.user.to_string()
|
||||||
block_all_room_ephemeral = (
|
block_all_room_ephemeral = (
|
||||||
|
@ -787,7 +828,7 @@ class SyncHandler(object):
|
||||||
)
|
)
|
||||||
if not tags_by_room:
|
if not tags_by_room:
|
||||||
logger.debug("no-oping sync")
|
logger.debug("no-oping sync")
|
||||||
defer.returnValue(([], []))
|
defer.returnValue(([], [], [], []))
|
||||||
|
|
||||||
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
|
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
|
||||||
"m.ignored_user_list", user_id=user_id,
|
"m.ignored_user_list", user_id=user_id,
|
||||||
|
@ -800,7 +841,7 @@ class SyncHandler(object):
|
||||||
|
|
||||||
if since_token:
|
if since_token:
|
||||||
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
|
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
|
||||||
room_entries, invited, newly_joined_rooms = res
|
room_entries, invited, newly_joined_rooms, newly_left_rooms = res
|
||||||
|
|
||||||
tags_by_room = yield self.store.get_updated_tags(
|
tags_by_room = yield self.store.get_updated_tags(
|
||||||
user_id, since_token.account_data_key,
|
user_id, since_token.account_data_key,
|
||||||
|
@ -808,6 +849,7 @@ class SyncHandler(object):
|
||||||
else:
|
else:
|
||||||
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
|
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
|
||||||
room_entries, invited, newly_joined_rooms = res
|
room_entries, invited, newly_joined_rooms = res
|
||||||
|
newly_left_rooms = []
|
||||||
|
|
||||||
tags_by_room = yield self.store.get_tags_for_user(user_id)
|
tags_by_room = yield self.store.get_tags_for_user(user_id)
|
||||||
|
|
||||||
|
@ -828,17 +870,30 @@ class SyncHandler(object):
|
||||||
|
|
||||||
# Now we want to get any newly joined users
|
# Now we want to get any newly joined users
|
||||||
newly_joined_users = set()
|
newly_joined_users = set()
|
||||||
|
newly_left_users = set()
|
||||||
if since_token:
|
if since_token:
|
||||||
for joined_sync in sync_result_builder.joined:
|
for joined_sync in sync_result_builder.joined:
|
||||||
it = itertools.chain(
|
it = itertools.chain(
|
||||||
joined_sync.timeline.events, joined_sync.state.values()
|
joined_sync.timeline.events, joined_sync.state.itervalues()
|
||||||
)
|
)
|
||||||
for event in it:
|
for event in it:
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
newly_joined_users.add(event.state_key)
|
newly_joined_users.add(event.state_key)
|
||||||
|
else:
|
||||||
|
prev_content = event.unsigned.get("prev_content", {})
|
||||||
|
prev_membership = prev_content.get("membership", None)
|
||||||
|
if prev_membership == Membership.JOIN:
|
||||||
|
newly_left_users.add(event.state_key)
|
||||||
|
|
||||||
defer.returnValue((newly_joined_rooms, newly_joined_users))
|
newly_left_users -= newly_joined_users
|
||||||
|
|
||||||
|
defer.returnValue((
|
||||||
|
newly_joined_rooms,
|
||||||
|
newly_joined_users,
|
||||||
|
newly_left_rooms,
|
||||||
|
newly_left_users,
|
||||||
|
))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _have_rooms_changed(self, sync_result_builder):
|
def _have_rooms_changed(self, sync_result_builder):
|
||||||
|
@ -908,15 +963,17 @@ class SyncHandler(object):
|
||||||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
||||||
|
|
||||||
newly_joined_rooms = []
|
newly_joined_rooms = []
|
||||||
|
newly_left_rooms = []
|
||||||
room_entries = []
|
room_entries = []
|
||||||
invited = []
|
invited = []
|
||||||
for room_id, events in mem_change_events_by_room_id.items():
|
for room_id, events in mem_change_events_by_room_id.iteritems():
|
||||||
non_joins = [e for e in events if e.membership != Membership.JOIN]
|
non_joins = [e for e in events if e.membership != Membership.JOIN]
|
||||||
has_join = len(non_joins) != len(events)
|
has_join = len(non_joins) != len(events)
|
||||||
|
|
||||||
# We want to figure out if we joined the room at some point since
|
# We want to figure out if we joined the room at some point since
|
||||||
# the last sync (even if we have since left). This is to make sure
|
# the last sync (even if we have since left). This is to make sure
|
||||||
# we do send down the room, and with full state, where necessary
|
# we do send down the room, and with full state, where necessary
|
||||||
|
old_state_ids = None
|
||||||
if room_id in joined_room_ids or has_join:
|
if room_id in joined_room_ids or has_join:
|
||||||
old_state_ids = yield self.get_state_at(room_id, since_token)
|
old_state_ids = yield self.get_state_at(room_id, since_token)
|
||||||
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
|
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
|
||||||
|
@ -934,6 +991,26 @@ class SyncHandler(object):
|
||||||
if not non_joins:
|
if not non_joins:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Check if we have left the room. This can either be because we were
|
||||||
|
# joined before *or* that we since joined and then left.
|
||||||
|
if events[-1].membership != Membership.JOIN:
|
||||||
|
if has_join:
|
||||||
|
newly_left_rooms.append(room_id)
|
||||||
|
else:
|
||||||
|
if not old_state_ids:
|
||||||
|
old_state_ids = yield self.get_state_at(room_id, since_token)
|
||||||
|
old_mem_ev_id = old_state_ids.get(
|
||||||
|
(EventTypes.Member, user_id),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
old_mem_ev = None
|
||||||
|
if old_mem_ev_id:
|
||||||
|
old_mem_ev = yield self.store.get_event(
|
||||||
|
old_mem_ev_id, allow_none=True
|
||||||
|
)
|
||||||
|
if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
|
||||||
|
newly_left_rooms.append(room_id)
|
||||||
|
|
||||||
# Only bother if we're still currently invited
|
# Only bother if we're still currently invited
|
||||||
should_invite = non_joins[-1].membership == Membership.INVITE
|
should_invite = non_joins[-1].membership == Membership.INVITE
|
||||||
if should_invite:
|
if should_invite:
|
||||||
|
@ -1011,7 +1088,7 @@ class SyncHandler(object):
|
||||||
upto_token=since_token,
|
upto_token=since_token,
|
||||||
))
|
))
|
||||||
|
|
||||||
defer.returnValue((room_entries, invited, newly_joined_rooms))
|
defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_all_rooms(self, sync_result_builder, ignored_users):
|
def _get_all_rooms(self, sync_result_builder, ignored_users):
|
||||||
|
@ -1259,6 +1336,7 @@ class SyncResultBuilder(object):
|
||||||
self.invited = []
|
self.invited = []
|
||||||
self.archived = []
|
self.archived = []
|
||||||
self.device = []
|
self.device = []
|
||||||
|
self.to_device = []
|
||||||
|
|
||||||
|
|
||||||
class RoomSyncResultBuilder(object):
|
class RoomSyncResultBuilder(object):
|
||||||
|
|
|
@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet):
|
||||||
|
|
||||||
user_id = requester.user.to_string()
|
user_id = requester.user.to_string()
|
||||||
|
|
||||||
changed = yield self.device_handler.get_user_ids_changed(
|
results = yield self.device_handler.get_user_ids_changed(
|
||||||
user_id, from_token,
|
user_id, from_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((200, {
|
defer.returnValue((200, results))
|
||||||
"changed": list(changed),
|
|
||||||
}))
|
|
||||||
|
|
||||||
|
|
||||||
class OneTimeKeyServlet(RestServlet):
|
class OneTimeKeyServlet(RestServlet):
|
||||||
|
|
|
@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet):
|
||||||
"account_data": {"events": sync_result.account_data},
|
"account_data": {"events": sync_result.account_data},
|
||||||
"to_device": {"events": sync_result.to_device},
|
"to_device": {"events": sync_result.to_device},
|
||||||
"device_lists": {
|
"device_lists": {
|
||||||
"changed": list(sync_result.device_lists),
|
"changed": list(sync_result.device_lists.changed),
|
||||||
|
"left": list(sync_result.device_lists.left),
|
||||||
},
|
},
|
||||||
"presence": SyncRestServlet.encode_presence(
|
"presence": SyncRestServlet.encode_presence(
|
||||||
sync_result.presence, time_now
|
sync_result.presence, time_now
|
||||||
|
|
Loading…
Reference in New Issue