From 27ebc5c8f299488ccc0a6f100ec3b248cd81a058 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Aug 2017 11:21:34 +0100 Subject: [PATCH] Add remote profile cache --- synapse/groups/groups_server.py | 18 ++++ synapse/handlers/groups_local.py | 17 +++- synapse/handlers/profile.py | 81 ++++++++++++++- synapse/storage/profile.py | 98 +++++++++++++++++++ .../storage/schema/delta/43/profile_cache.sql | 28 ++++++ 5 files changed, 237 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/43/profile_cache.sql diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index f25f327eb9..6bccae4bfb 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -503,6 +503,13 @@ class GroupsServerHandler(object): get_domain_from_id(user_id), group_id, user_id, content ) + user_profile = res.get("user_profile", {}) + yield self.store.add_remote_profile_cache( + user_id, + displayname=user_profile.get("displayname"), + avatar_url=user_profile.get("avatar_url"), + ) + if res["state"] == "join": if not self.hs.is_mine_id(user_id): remote_attestation = res["attestation"] @@ -627,6 +634,9 @@ class GroupsServerHandler(object): get_domain_from_id(user_id), group_id, user_id, {} ) + if not self.hs.is_mine_id(user_id): + yield self.store.maybe_delete_remote_profile_cache(user_id) + defer.returnValue({}) @defer.inlineCallbacks @@ -647,6 +657,7 @@ class GroupsServerHandler(object): avatar_url = profile.get("avatar_url") short_description = profile.get("short_description") long_description = profile.get("long_description") + user_profile = content.get("user_profile", {}) yield self.store.create_group( group_id, @@ -679,6 +690,13 @@ class GroupsServerHandler(object): remote_attestation=remote_attestation, ) + if not self.hs.is_mine_id(user_id): + yield self.store.add_remote_profile_cache( + user_id, + displayname=user_profile.get("displayname"), + avatar_url=user_profile.get("avatar_url"), + ) + defer.returnValue({ "group_id": group_id, }) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 274fed9278..bfa10bde5a 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -56,6 +56,9 @@ class GroupsLocalHandler(object): self.notifier = hs.get_notifier() self.attestations = hs.get_groups_attestation_signing() + handlers = hs.get_handlers() + self.profile_handler = handlers.profile_handler + # Ensure attestations get renewed hs.get_groups_attestation_renewer() @@ -123,6 +126,7 @@ class GroupsLocalHandler(object): defer.returnValue(res) + @defer.inlineCallbacks def create_group(self, group_id, user_id, content): """Create a group """ @@ -130,13 +134,16 @@ class GroupsLocalHandler(object): logger.info("Asking to create group with ID: %r", group_id) if self.is_mine_id(group_id): - return self.groups_server_handler.create_group( + res = yield self.groups_server_handler.create_group( group_id, user_id, content ) + defer.returnValue(res) - return self.transport_client.create_group( + content["user_profile"] = yield self.profile_handler.get_profile(user_id) + res = yield self.transport_client.create_group( get_domain_from_id(group_id), group_id, user_id, content, - ) # TODO + ) + defer.returnValue(res) @defer.inlineCallbacks def get_users_in_group(self, group_id, requester_user_id): @@ -265,7 +272,9 @@ class GroupsLocalHandler(object): "groups_key", token, users=[user_id], ) - defer.returnValue({"state": "invite"}) + user_profile = yield self.profile_handler.get_profile(user_id) + + defer.returnValue({"state": "invite", "user_profile": user_profile}) @defer.inlineCallbacks def remove_user_from_group(self, group_id, user_id, requester_user_id, content): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 7abee98dea..57e22edb0d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -19,7 +19,7 @@ from twisted.internet import defer import synapse.types from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.types import UserID +from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -27,15 +27,53 @@ logger = logging.getLogger(__name__) class ProfileHandler(BaseHandler): + PROFILE_UPDATE_MS = 60 * 1000 + PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 def __init__(self, hs): super(ProfileHandler, self).__init__(hs) + self.clock = hs.get_clock() + self.federation = hs.get_replication_layer() self.federation.register_query_handler( "profile", self.on_profile_query ) + self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + + @defer.inlineCallbacks + def get_profile(self, user_id): + target_user = UserID.from_string(user_id) + if self.hs.is_mine(target_user): + displayname = yield self.store.get_profile_displayname( + target_user.localpart + ) + avatar_url = yield self.store.get_profile_avatar_url( + target_user.localpart + ) + + defer.returnValue({ + "displayname": displayname, + "avatar_url": avatar_url, + }) + else: + try: + result = yield self.federation.make_query( + destination=target_user.domain, + query_type="profile", + args={ + "user_id": user_id, + }, + ignore_backoff=True, + ) + defer.returnValue(result) + except CodeMessageException as e: + if e.code != 404: + logger.exception("Failed to get displayname") + + raise + @defer.inlineCallbacks def get_displayname(self, target_user): if self.hs.is_mine(target_user): @@ -182,3 +220,44 @@ class ProfileHandler(BaseHandler): "Failed to update join event for room %s - %s", room_id, str(e.message) ) + + def _update_remote_profile_cache(self): + """Called periodically to check profiles of remote users we havent' + checked in a while. + """ + entries = yield self.store.get_remote_profile_cache_entries_that_expire( + last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS + ) + + for user_id, displayname, avatar_url in entries: + is_subcscribed = yield self.store.is_subscribed_remote_profile_for_user( + user_id, + ) + if not is_subcscribed: + yield self.store.maybe_delete_remote_profile_cache(user_id) + continue + + try: + profile = yield self.federation.make_query( + destination=get_domain_from_id(user_id), + query_type="profile", + args={ + "user_id": user_id, + }, + ignore_backoff=True, + ) + except: + logger.exception("Failed to get avatar_url") + + yield self.store.update_remote_profile_cache( + user_id, displayname, avatar_url + ) + continue + + new_name = profile.get("displayname") + new_avatar = profile.get("avatar_url") + + # We always hit update to update the last_check timestamp + yield self.store.update_remote_profile_cache( + user_id, new_name, new_avatar + ) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 26a40905ae..dca6af8a77 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore @@ -55,3 +57,99 @@ class ProfileStore(SQLBaseStore): updatevalues={"avatar_url": new_avatar_url}, desc="set_profile_avatar_url", ) + + def get_from_remote_profile_cache(self, user_id): + return self._simple_select_one( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + retcols=("displayname", "avatar_url", "last_check"), + allow_none=True, + desc="get_from_remote_profile_cache", + ) + + def add_remote_profile_cache(self, user_id, displayname, avatar_url): + """Ensure we are caching the remote user's profiles. + + This should only be called when `is_subscribed_remote_profile_for_user` + would return true for the user. + """ + return self._simple_upsert( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + values={ + "displayname": displayname, + "avatar_url": avatar_url, + "last_check": self._clock.time_msec(), + }, + desc="add_remote_profile_cache", + ) + + def update_remote_profile_cache(self, user_id, displayname, avatar_url): + return self._simple_update( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + values={ + "displayname": displayname, + "avatar_url": avatar_url, + "last_check": self._clock.time_msec(), + }, + desc="update_remote_profile_cache", + ) + + @defer.inlineCallbacks + def maybe_delete_remote_profile_cache(self, user_id): + """Check if we still care about the remote user's profile, and if we + don't then remove their profile from the cache + """ + subscribed = yield self.is_subscribed_remote_profile_for_user(user_id) + if not subscribed: + yield self._simple_delete( + table="remote_profile_cache", + keyvalues={"user_id": user_id}, + desc="delete_remote_profile_cache", + ) + + def get_remote_profile_cache_entries_that_expire(self, last_checked): + """Get all users who haven't been checked since `last_checked` + """ + def _get_remote_profile_cache_entries_that_expire_txn(txn): + sql = """ + SELECT user_id, displayname, avatar_url + FROM remote_profile_cache + WHERE last_check < ? + """ + + txn.execute(sql, (last_checked,)) + + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_remote_profile_cache_entries_that_expire", + _get_remote_profile_cache_entries_that_expire_txn, + ) + + @defer.inlineCallbacks + def is_subscribed_remote_profile_for_user(self, user_id): + """Check whether we are interested in a remote user's profile. + """ + res = yield self._simple_select_one_onecol( + table="group_users", + keyvalues={"user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="should_update_remote_profile_cache_for_user", + ) + + if res: + defer.returnValue(True) + + res = yield self._simple_select_one_onecol( + table="group_invites", + keyvalues={"user_id": user_id}, + retcol="user_id", + allow_none=True, + desc="should_update_remote_profile_cache_for_user", + ) + + if res: + defer.returnValue(True) diff --git a/synapse/storage/schema/delta/43/profile_cache.sql b/synapse/storage/schema/delta/43/profile_cache.sql new file mode 100644 index 0000000000..e5ddc84df0 --- /dev/null +++ b/synapse/storage/schema/delta/43/profile_cache.sql @@ -0,0 +1,28 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- A subset of remote users whose profiles we have cached. +-- Whether a user is in this table or not is defined by the storage function +-- `is_subscribed_remote_profile_for_user` +CREATE TABLE remote_profile_cache ( + user_id TEXT NOT NULL, + displayname TEXT, + avatar_url TEXT, + last_check BIGINT NOT NULL +); + +CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id); +CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check);