Merge pull request #1964 from matrix-org/erikj/device_list_update_fix
Fix device list update to not constantly resyncpull/1967/head
commit
82f7f1543b
|
@ -16,6 +16,7 @@ from synapse.api import errors
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.util import stringutils
|
from synapse.util import stringutils
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util.async import Linearizer
|
||||||
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.types import get_domain_from_id, RoomStreamToken
|
from synapse.types import get_domain_from_id, RoomStreamToken
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
@ -34,10 +35,11 @@ class DeviceHandler(BaseHandler):
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self.federation_sender = hs.get_federation_sender()
|
self.federation_sender = hs.get_federation_sender()
|
||||||
self.federation = hs.get_replication_layer()
|
self.federation = hs.get_replication_layer()
|
||||||
self._remote_edue_linearizer = Linearizer(name="remote_device_list")
|
|
||||||
|
self._edu_updater = DeviceListEduUpdater(hs, self)
|
||||||
|
|
||||||
self.federation.register_edu_handler(
|
self.federation.register_edu_handler(
|
||||||
"m.device_list_update", self._incoming_device_list_update,
|
"m.device_list_update", self._edu_updater.incoming_device_list_update,
|
||||||
)
|
)
|
||||||
self.federation.register_query_handler(
|
self.federation.register_query_handler(
|
||||||
"user_devices", self.on_federation_query_user_devices,
|
"user_devices", self.on_federation_query_user_devices,
|
||||||
|
@ -299,58 +301,6 @@ class DeviceHandler(BaseHandler):
|
||||||
# and those that actually still share a room with the user
|
# and those that actually still share a room with the user
|
||||||
defer.returnValue(users_who_share_room & possibly_changed)
|
defer.returnValue(users_who_share_room & possibly_changed)
|
||||||
|
|
||||||
@measure_func("_incoming_device_list_update")
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _incoming_device_list_update(self, origin, edu_content):
|
|
||||||
user_id = edu_content["user_id"]
|
|
||||||
device_id = edu_content["device_id"]
|
|
||||||
stream_id = edu_content["stream_id"]
|
|
||||||
prev_ids = edu_content.get("prev_id", [])
|
|
||||||
|
|
||||||
if get_domain_from_id(user_id) != origin:
|
|
||||||
# TODO: Raise?
|
|
||||||
logger.warning("Got device list update edu for %r from %r", user_id, origin)
|
|
||||||
return
|
|
||||||
|
|
||||||
rooms = yield self.store.get_rooms_for_user(user_id)
|
|
||||||
if not rooms:
|
|
||||||
# We don't share any rooms with this user. Ignore update, as we
|
|
||||||
# probably won't get any further updates.
|
|
||||||
return
|
|
||||||
|
|
||||||
with (yield self._remote_edue_linearizer.queue(user_id)):
|
|
||||||
# If the prev id matches whats in our cache table, then we don't need
|
|
||||||
# to resync the users device list, otherwise we do.
|
|
||||||
resync = True
|
|
||||||
if len(prev_ids) == 1:
|
|
||||||
extremity = yield self.store.get_device_list_last_stream_id_for_remote(
|
|
||||||
user_id
|
|
||||||
)
|
|
||||||
logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
|
|
||||||
if str(extremity) == str(prev_ids[0]):
|
|
||||||
resync = False
|
|
||||||
|
|
||||||
if resync:
|
|
||||||
# Fetch all devices for the user.
|
|
||||||
result = yield self.federation.query_user_devices(origin, user_id)
|
|
||||||
stream_id = result["stream_id"]
|
|
||||||
devices = result["devices"]
|
|
||||||
yield self.store.update_remote_device_list_cache(
|
|
||||||
user_id, devices, stream_id,
|
|
||||||
)
|
|
||||||
device_ids = [device["device_id"] for device in devices]
|
|
||||||
yield self.notify_device_update(user_id, device_ids)
|
|
||||||
else:
|
|
||||||
# Simply update the single device, since we know that is the only
|
|
||||||
# change (becuase of the single prev_id matching the current cache)
|
|
||||||
content = dict(edu_content)
|
|
||||||
for key in ("user_id", "device_id", "stream_id", "prev_ids"):
|
|
||||||
content.pop(key, None)
|
|
||||||
yield self.store.update_remote_device_list_cache_entry(
|
|
||||||
user_id, device_id, content, stream_id,
|
|
||||||
)
|
|
||||||
yield self.notify_device_update(user_id, [device_id])
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_federation_query_user_devices(self, user_id):
|
def on_federation_query_user_devices(self, user_id):
|
||||||
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
|
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
|
||||||
|
@ -376,3 +326,129 @@ def _update_device_from_client_ips(device, client_ips):
|
||||||
"last_seen_ts": ip.get("last_seen"),
|
"last_seen_ts": ip.get("last_seen"),
|
||||||
"last_seen_ip": ip.get("ip"),
|
"last_seen_ip": ip.get("ip"),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceListEduUpdater(object):
|
||||||
|
"Handles incoming device list updates from federation and updates the DB"
|
||||||
|
|
||||||
|
def __init__(self, hs, device_handler):
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.federation = hs.get_replication_layer()
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
self.device_handler = device_handler
|
||||||
|
|
||||||
|
self._remote_edu_linearizer = Linearizer(name="remote_device_list")
|
||||||
|
|
||||||
|
# user_id -> list of updates waiting to be handled.
|
||||||
|
self._pending_updates = {}
|
||||||
|
|
||||||
|
# Recently seen stream ids. We don't bother keeping these in the DB,
|
||||||
|
# but they're useful to have them about to reduce the number of spurious
|
||||||
|
# resyncs.
|
||||||
|
self._seen_updates = ExpiringCache(
|
||||||
|
cache_name="device_update_edu",
|
||||||
|
clock=self.clock,
|
||||||
|
max_len=10000,
|
||||||
|
expiry_ms=30 * 60 * 1000,
|
||||||
|
iterable=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def incoming_device_list_update(self, origin, edu_content):
|
||||||
|
"""Called on incoming device list update from federation. Responsible
|
||||||
|
for parsing the EDU and adding to pending updates list.
|
||||||
|
"""
|
||||||
|
|
||||||
|
user_id = edu_content.pop("user_id")
|
||||||
|
device_id = edu_content.pop("device_id")
|
||||||
|
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
|
||||||
|
prev_ids = edu_content.pop("prev_id", [])
|
||||||
|
prev_ids = [str(p) for p in prev_ids] # They may come as ints
|
||||||
|
|
||||||
|
if get_domain_from_id(user_id) != origin:
|
||||||
|
# TODO: Raise?
|
||||||
|
logger.warning("Got device list update edu for %r from %r", user_id, origin)
|
||||||
|
return
|
||||||
|
|
||||||
|
rooms = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
if not rooms:
|
||||||
|
# We don't share any rooms with this user. Ignore update, as we
|
||||||
|
# probably won't get any further updates.
|
||||||
|
return
|
||||||
|
|
||||||
|
self._pending_updates.setdefault(user_id, []).append(
|
||||||
|
(device_id, stream_id, prev_ids, edu_content)
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self._handle_device_updates(user_id)
|
||||||
|
|
||||||
|
@measure_func("_incoming_device_list_update")
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _handle_device_updates(self, user_id):
|
||||||
|
"Actually handle pending updates."
|
||||||
|
|
||||||
|
with (yield self._remote_edu_linearizer.queue(user_id)):
|
||||||
|
pending_updates = self._pending_updates.pop(user_id, [])
|
||||||
|
if not pending_updates:
|
||||||
|
# This can happen since we batch updates
|
||||||
|
return
|
||||||
|
|
||||||
|
resync = yield self._need_to_do_resync(user_id, pending_updates)
|
||||||
|
|
||||||
|
if resync:
|
||||||
|
# Fetch all devices for the user.
|
||||||
|
origin = get_domain_from_id(user_id)
|
||||||
|
result = yield self.federation.query_user_devices(origin, user_id)
|
||||||
|
stream_id = result["stream_id"]
|
||||||
|
devices = result["devices"]
|
||||||
|
yield self.store.update_remote_device_list_cache(
|
||||||
|
user_id, devices, stream_id,
|
||||||
|
)
|
||||||
|
device_ids = [device["device_id"] for device in devices]
|
||||||
|
yield self.device_handler.notify_device_update(user_id, device_ids)
|
||||||
|
else:
|
||||||
|
# Simply update the single device, since we know that is the only
|
||||||
|
# change (becuase of the single prev_id matching the current cache)
|
||||||
|
for device_id, stream_id, prev_ids, content in pending_updates:
|
||||||
|
yield self.store.update_remote_device_list_cache_entry(
|
||||||
|
user_id, device_id, content, stream_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self.device_handler.notify_device_update(
|
||||||
|
user_id, [device_id for device_id, _, _, _ in pending_updates]
|
||||||
|
)
|
||||||
|
|
||||||
|
self._seen_updates.setdefault(user_id, set()).update(
|
||||||
|
stream_id for _, stream_id, _, _ in pending_updates
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _need_to_do_resync(self, user_id, updates):
|
||||||
|
"""Given a list of updates for a user figure out if we need to do a full
|
||||||
|
resync, or whether we have enough data that we can just apply the delta.
|
||||||
|
"""
|
||||||
|
seen_updates = self._seen_updates.get(user_id, set())
|
||||||
|
|
||||||
|
extremity = yield self.store.get_device_list_last_stream_id_for_remote(
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
|
||||||
|
stream_id_in_updates = set() # stream_ids in updates list
|
||||||
|
for _, stream_id, prev_ids, _ in updates:
|
||||||
|
if not prev_ids:
|
||||||
|
# We always do a resync if there are no previous IDs
|
||||||
|
defer.returnValue(True)
|
||||||
|
|
||||||
|
for prev_id in prev_ids:
|
||||||
|
if prev_id == extremity:
|
||||||
|
continue
|
||||||
|
elif prev_id in seen_updates:
|
||||||
|
continue
|
||||||
|
elif prev_id in stream_id_in_updates:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
defer.returnValue(True)
|
||||||
|
|
||||||
|
stream_id_in_updates.add(stream_id)
|
||||||
|
|
||||||
|
defer.returnValue(False)
|
||||||
|
|
Loading…
Reference in New Issue