Allow moving group read APIs to workers (#6866)

pull/6873/head
Erik Johnston 2020-02-07 11:14:19 +00:00 committed by GitHub
parent 56ca93ef59
commit de2d267375
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 723 additions and 688 deletions

1
changelog.d/6866.feature Normal file
View File

@ -0,0 +1 @@
Add ability to run some group APIs on workers.

View File

@ -177,8 +177,13 @@ endpoints matching the following regular expressions:
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/send/
^/_matrix/federation/v1/get_groups_publicised$
^/_matrix/key/v2/query
Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/federation/v1/groups/
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
@ -254,10 +259,13 @@ following regular expressions:
^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
^/_matrix/client/(api/v1|r0|unstable)/get_groups_publicised$
Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$
^/_matrix/client/(api/v1|r0|unstable)/groups/.*$
Additionally, the following REST endpoints can be handled, but all requests must
be routed to the same instance:

View File

@ -57,6 +57,7 @@ from synapse.rest.client.v1.room import (
RoomStateRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
@ -124,6 +125,8 @@ class ClientReaderServer(HomeServer):
PushRuleRestServlet(self).register(resource)
VersionsRestServlet(self).register(resource)
groups.register_servlets(self, resource)
resources.update({"/_matrix/client": resource})
root_resource = create_resource_tree(resources, NoResource())

View File

@ -35,6 +35,7 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
@ -66,6 +67,7 @@ class FederationReaderSlavedStore(
SlavedEventStore,
SlavedKeyStore,
SlavedRegistrationStore,
SlavedGroupServerStore,
RoomStore,
DirectoryStore,
SlavedTransactionStore,

View File

@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
# TODO: Flairs
class GroupsServerHandler(object):
class GroupsServerWorkerHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@ -51,9 +51,6 @@ class GroupsServerHandler(object):
self.transport_client = hs.get_federation_transport_client()
self.profile_handler = hs.get_profile_handler()
# Ensure attestations get renewed
hs.get_groups_attestation_renewer()
@defer.inlineCallbacks
def check_group_is_ours(
self, group_id, requester_user_id, and_exists=False, and_is_admin=None
@ -167,68 +164,6 @@ class GroupsServerHandler(object):
"user": membership_info,
}
@defer.inlineCallbacks
def update_group_summary_room(
self, group_id, requester_user_id, room_id, category_id, content
):
"""Add/update a room to the group summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
RoomID.from_string(room_id) # Ensure valid room id
order = content.get("order", None)
is_public = _parse_visibility_from_contents(content)
yield self.store.add_room_to_summary(
group_id=group_id,
room_id=room_id,
category_id=category_id,
order=order,
is_public=is_public,
)
return {}
@defer.inlineCallbacks
def delete_group_summary_room(
self, group_id, requester_user_id, room_id, category_id
):
"""Remove a room from the summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_room_from_summary(
group_id=group_id, room_id=room_id, category_id=category_id
)
return {}
@defer.inlineCallbacks
def set_group_join_policy(self, group_id, requester_user_id, content):
"""Sets the group join policy.
Currently supported policies are:
- "invite": an invite must be received and accepted in order to join.
- "open": anyone can join.
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
join_policy = _parse_join_policy_from_contents(content)
if join_policy is None:
raise SynapseError(400, "No value specified for 'm.join_policy'")
yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
return {}
@defer.inlineCallbacks
def get_group_categories(self, group_id, requester_user_id):
"""Get all categories in a group (as seen by user)
@ -248,42 +183,10 @@ class GroupsServerHandler(object):
group_id=group_id, category_id=category_id
)
logger.info("group %s", res)
return res
@defer.inlineCallbacks
def update_group_category(self, group_id, requester_user_id, category_id, content):
"""Add/Update a group category
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
is_public = _parse_visibility_from_contents(content)
profile = content.get("profile")
yield self.store.upsert_group_category(
group_id=group_id,
category_id=category_id,
is_public=is_public,
profile=profile,
)
return {}
@defer.inlineCallbacks
def delete_group_category(self, group_id, requester_user_id, category_id):
"""Delete a group category
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_group_category(
group_id=group_id, category_id=category_id
)
return {}
@defer.inlineCallbacks
def get_group_roles(self, group_id, requester_user_id):
"""Get all roles in a group (as seen by user)
@ -302,74 +205,6 @@ class GroupsServerHandler(object):
res = yield self.store.get_group_role(group_id=group_id, role_id=role_id)
return res
@defer.inlineCallbacks
def update_group_role(self, group_id, requester_user_id, role_id, content):
"""Add/update a role in a group
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
is_public = _parse_visibility_from_contents(content)
profile = content.get("profile")
yield self.store.upsert_group_role(
group_id=group_id, role_id=role_id, is_public=is_public, profile=profile
)
return {}
@defer.inlineCallbacks
def delete_group_role(self, group_id, requester_user_id, role_id):
"""Remove role from group
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_group_role(group_id=group_id, role_id=role_id)
return {}
@defer.inlineCallbacks
def update_group_summary_user(
self, group_id, requester_user_id, user_id, role_id, content
):
"""Add/update a users entry in the group summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
order = content.get("order", None)
is_public = _parse_visibility_from_contents(content)
yield self.store.add_user_to_summary(
group_id=group_id,
user_id=user_id,
role_id=role_id,
order=order,
is_public=is_public,
)
return {}
@defer.inlineCallbacks
def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id):
"""Remove a user from the group summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_user_from_summary(
group_id=group_id, user_id=user_id, role_id=role_id
)
return {}
@defer.inlineCallbacks
def get_group_profile(self, group_id, requester_user_id):
"""Get the group profile as seen by requester_user_id
@ -394,24 +229,6 @@ class GroupsServerHandler(object):
else:
raise SynapseError(404, "Unknown group")
@defer.inlineCallbacks
def update_group_profile(self, group_id, requester_user_id, content):
"""Update the group profile
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
profile = {}
for keyname in ("name", "avatar_url", "short_description", "long_description"):
if keyname in content:
value = content[keyname]
if not isinstance(value, string_types):
raise SynapseError(400, "%r value is not a string" % (keyname,))
profile[keyname] = value
yield self.store.update_group_profile(group_id, profile)
@defer.inlineCallbacks
def get_users_in_group(self, group_id, requester_user_id):
"""Get the users in group as seen by requester_user_id.
@ -530,6 +347,196 @@ class GroupsServerHandler(object):
return {"chunk": chunk, "total_room_count_estimate": len(room_results)}
class GroupsServerHandler(GroupsServerWorkerHandler):
def __init__(self, hs):
super(GroupsServerHandler, self).__init__(hs)
# Ensure attestations get renewed
hs.get_groups_attestation_renewer()
@defer.inlineCallbacks
def update_group_summary_room(
self, group_id, requester_user_id, room_id, category_id, content
):
"""Add/update a room to the group summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
RoomID.from_string(room_id) # Ensure valid room id
order = content.get("order", None)
is_public = _parse_visibility_from_contents(content)
yield self.store.add_room_to_summary(
group_id=group_id,
room_id=room_id,
category_id=category_id,
order=order,
is_public=is_public,
)
return {}
@defer.inlineCallbacks
def delete_group_summary_room(
self, group_id, requester_user_id, room_id, category_id
):
"""Remove a room from the summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_room_from_summary(
group_id=group_id, room_id=room_id, category_id=category_id
)
return {}
@defer.inlineCallbacks
def set_group_join_policy(self, group_id, requester_user_id, content):
"""Sets the group join policy.
Currently supported policies are:
- "invite": an invite must be received and accepted in order to join.
- "open": anyone can join.
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
join_policy = _parse_join_policy_from_contents(content)
if join_policy is None:
raise SynapseError(400, "No value specified for 'm.join_policy'")
yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
return {}
@defer.inlineCallbacks
def update_group_category(self, group_id, requester_user_id, category_id, content):
"""Add/Update a group category
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
is_public = _parse_visibility_from_contents(content)
profile = content.get("profile")
yield self.store.upsert_group_category(
group_id=group_id,
category_id=category_id,
is_public=is_public,
profile=profile,
)
return {}
@defer.inlineCallbacks
def delete_group_category(self, group_id, requester_user_id, category_id):
"""Delete a group category
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_group_category(
group_id=group_id, category_id=category_id
)
return {}
@defer.inlineCallbacks
def update_group_role(self, group_id, requester_user_id, role_id, content):
"""Add/update a role in a group
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
is_public = _parse_visibility_from_contents(content)
profile = content.get("profile")
yield self.store.upsert_group_role(
group_id=group_id, role_id=role_id, is_public=is_public, profile=profile
)
return {}
@defer.inlineCallbacks
def delete_group_role(self, group_id, requester_user_id, role_id):
"""Remove role from group
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_group_role(group_id=group_id, role_id=role_id)
return {}
@defer.inlineCallbacks
def update_group_summary_user(
self, group_id, requester_user_id, user_id, role_id, content
):
"""Add/update a users entry in the group summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
order = content.get("order", None)
is_public = _parse_visibility_from_contents(content)
yield self.store.add_user_to_summary(
group_id=group_id,
user_id=user_id,
role_id=role_id,
order=order,
is_public=is_public,
)
return {}
@defer.inlineCallbacks
def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id):
"""Remove a user from the group summary
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
yield self.store.remove_user_from_summary(
group_id=group_id, user_id=user_id, role_id=role_id
)
return {}
@defer.inlineCallbacks
def update_group_profile(self, group_id, requester_user_id, content):
"""Update the group profile
"""
yield self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
)
profile = {}
for keyname in ("name", "avatar_url", "short_description", "long_description"):
if keyname in content:
value = content[keyname]
if not isinstance(value, string_types):
raise SynapseError(400, "%r value is not a string" % (keyname,))
profile[keyname] = value
yield self.store.update_group_profile(group_id, profile)
@defer.inlineCallbacks
def add_room_to_group(self, group_id, requester_user_id, room_id, content):
"""Add room to group

View File

@ -63,7 +63,7 @@ def _create_rerouter(func_name):
return f
class GroupsLocalHandler(object):
class GroupsLocalWorkerHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@ -81,40 +81,17 @@ class GroupsLocalHandler(object):
self.profile_handler = hs.get_profile_handler()
# Ensure attestations get renewed
hs.get_groups_attestation_renewer()
# The following functions merely route the query to the local groups server
# or federation depending on if the group is local or remote
get_group_profile = _create_rerouter("get_group_profile")
update_group_profile = _create_rerouter("update_group_profile")
get_rooms_in_group = _create_rerouter("get_rooms_in_group")
get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
add_room_to_group = _create_rerouter("add_room_to_group")
update_room_in_group = _create_rerouter("update_room_in_group")
remove_room_from_group = _create_rerouter("remove_room_from_group")
update_group_summary_room = _create_rerouter("update_group_summary_room")
delete_group_summary_room = _create_rerouter("delete_group_summary_room")
update_group_category = _create_rerouter("update_group_category")
delete_group_category = _create_rerouter("delete_group_category")
get_group_category = _create_rerouter("get_group_category")
get_group_categories = _create_rerouter("get_group_categories")
update_group_summary_user = _create_rerouter("update_group_summary_user")
delete_group_summary_user = _create_rerouter("delete_group_summary_user")
update_group_role = _create_rerouter("update_group_role")
delete_group_role = _create_rerouter("delete_group_role")
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
set_group_join_policy = _create_rerouter("set_group_join_policy")
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
@ -169,6 +146,144 @@ class GroupsLocalHandler(object):
return res
@defer.inlineCallbacks
def get_users_in_group(self, group_id, requester_user_id):
"""Get users in a group
"""
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_users_in_group(
group_id, requester_user_id
)
return res
group_server_name = get_domain_from_id(group_id)
try:
res = yield self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
chunk = res["chunk"]
valid_entries = []
for entry in chunk:
g_user_id = entry["user_id"]
attestation = entry.pop("attestation", {})
try:
if get_domain_from_id(g_user_id) != group_server_name:
yield self.attestations.verify_attestation(
attestation,
group_id=group_id,
user_id=g_user_id,
server_name=get_domain_from_id(g_user_id),
)
valid_entries.append(entry)
except Exception as e:
logger.info("Failed to verify user is in group: %s", e)
res["chunk"] = valid_entries
return res
@defer.inlineCallbacks
def get_joined_groups(self, user_id):
group_ids = yield self.store.get_joined_groups(user_id)
return {"groups": group_ids}
@defer.inlineCallbacks
def get_publicised_groups_for_user(self, user_id):
if self.hs.is_mine_id(user_id):
result = yield self.store.get_publicised_groups_for_user(user_id)
# Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`)
for app_service in self.store.get_app_services():
result.extend(app_service.get_groups_for_user(user_id))
return {"groups": result}
else:
try:
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
get_domain_from_id(user_id), [user_id]
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
result = bulk_result.get("users", {}).get(user_id)
# TODO: Verify attestations
return {"groups": result}
@defer.inlineCallbacks
def bulk_get_publicised_groups(self, user_ids, proxy=True):
destinations = {}
local_users = set()
for user_id in user_ids:
if self.hs.is_mine_id(user_id):
local_users.add(user_id)
else:
destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
if not proxy and destinations:
raise SynapseError(400, "Some user_ids are not local")
results = {}
failed_results = []
for destination, dest_user_ids in iteritems(destinations):
try:
r = yield self.transport_client.bulk_get_publicised_groups(
destination, list(dest_user_ids)
)
results.update(r["users"])
except Exception:
failed_results.extend(dest_user_ids)
for uid in local_users:
results[uid] = yield self.store.get_publicised_groups_for_user(uid)
# Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`)
for app_service in self.store.get_app_services():
results[uid].extend(app_service.get_groups_for_user(uid))
return {"users": results}
class GroupsLocalHandler(GroupsLocalWorkerHandler):
def __init__(self, hs):
super(GroupsLocalHandler, self).__init__(hs)
# Ensure attestations get renewed
hs.get_groups_attestation_renewer()
# The following functions merely route the query to the local groups server
# or federation depending on if the group is local or remote
update_group_profile = _create_rerouter("update_group_profile")
add_room_to_group = _create_rerouter("add_room_to_group")
update_room_in_group = _create_rerouter("update_room_in_group")
remove_room_from_group = _create_rerouter("remove_room_from_group")
update_group_summary_room = _create_rerouter("update_group_summary_room")
delete_group_summary_room = _create_rerouter("delete_group_summary_room")
update_group_category = _create_rerouter("update_group_category")
delete_group_category = _create_rerouter("delete_group_category")
update_group_summary_user = _create_rerouter("update_group_summary_user")
delete_group_summary_user = _create_rerouter("delete_group_summary_user")
update_group_role = _create_rerouter("update_group_role")
delete_group_role = _create_rerouter("delete_group_role")
set_group_join_policy = _create_rerouter("set_group_join_policy")
@defer.inlineCallbacks
def create_group(self, group_id, user_id, content):
"""Create a group
@ -219,48 +334,6 @@ class GroupsLocalHandler(object):
return res
@defer.inlineCallbacks
def get_users_in_group(self, group_id, requester_user_id):
"""Get users in a group
"""
if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_users_in_group(
group_id, requester_user_id
)
return res
group_server_name = get_domain_from_id(group_id)
try:
res = yield self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
chunk = res["chunk"]
valid_entries = []
for entry in chunk:
g_user_id = entry["user_id"]
attestation = entry.pop("attestation", {})
try:
if get_domain_from_id(g_user_id) != group_server_name:
yield self.attestations.verify_attestation(
attestation,
group_id=group_id,
user_id=g_user_id,
server_name=get_domain_from_id(g_user_id),
)
valid_entries.append(entry)
except Exception as e:
logger.info("Failed to verify user is in group: %s", e)
res["chunk"] = valid_entries
return res
@defer.inlineCallbacks
def join_group(self, group_id, user_id, content):
"""Request to join a group
@ -452,68 +525,3 @@ class GroupsLocalHandler(object):
group_id, user_id, membership="leave"
)
self.notifier.on_new_event("groups_key", token, users=[user_id])
@defer.inlineCallbacks
def get_joined_groups(self, user_id):
group_ids = yield self.store.get_joined_groups(user_id)
return {"groups": group_ids}
@defer.inlineCallbacks
def get_publicised_groups_for_user(self, user_id):
if self.hs.is_mine_id(user_id):
result = yield self.store.get_publicised_groups_for_user(user_id)
# Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`)
for app_service in self.store.get_app_services():
result.extend(app_service.get_groups_for_user(user_id))
return {"groups": result}
else:
try:
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
get_domain_from_id(user_id), [user_id]
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
result = bulk_result.get("users", {}).get(user_id)
# TODO: Verify attestations
return {"groups": result}
@defer.inlineCallbacks
def bulk_get_publicised_groups(self, user_ids, proxy=True):
destinations = {}
local_users = set()
for user_id in user_ids:
if self.hs.is_mine_id(user_id):
local_users.add(user_id)
else:
destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
if not proxy and destinations:
raise SynapseError(400, "Some user_ids are not local")
results = {}
failed_results = []
for destination, dest_user_ids in iteritems(destinations):
try:
r = yield self.transport_client.bulk_get_publicised_groups(
destination, list(dest_user_ids)
)
results.update(r["users"])
except Exception:
failed_results.extend(dest_user_ids)
for uid in local_users:
results[uid] = yield self.store.get_publicised_groups_for_user(uid)
# Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`)
for app_service in self.store.get_app_services():
results[uid].extend(app_service.get_groups_for_user(uid))
return {"users": results}

View File

@ -13,15 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage import DataStore
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
class SlavedGroupServerStore(BaseSlavedStore):
class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
def __init__(self, database: Database, db_conn, hs):
super(SlavedGroupServerStore, self).__init__(database, db_conn, hs)
@ -35,9 +34,8 @@ class SlavedGroupServerStore(BaseSlavedStore):
self._group_updates_id_gen.get_current_token(),
)
get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
get_group_stream_token = __func__(DataStore.get_group_stream_token)
get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
def get_group_stream_token(self):
return self._group_updates_id_gen.get_current_token()
def stream_positions(self):
result = super(SlavedGroupServerStore, self).stream_positions()

View File

@ -50,7 +50,7 @@ from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
from synapse.groups.groups_server import GroupsServerHandler
from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
from synapse.handlers import Handlers
from synapse.handlers.account_validity import AccountValidityHandler
from synapse.handlers.acme import AcmeHandler
@ -62,7 +62,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.message import EventCreationHandler, MessageHandler
from synapse.handlers.pagination import PaginationHandler
@ -460,10 +460,16 @@ class HomeServer(object):
return UserDirectoryHandler(self)
def build_groups_local_handler(self):
return GroupsLocalHandler(self)
if self.config.worker_app:
return GroupsLocalWorkerHandler(self)
else:
return GroupsLocalHandler(self)
def build_groups_server_handler(self):
return GroupsServerHandler(self)
if self.config.worker_app:
return GroupsServerWorkerHandler(self)
else:
return GroupsServerHandler(self)
def build_groups_attestation_signing(self):
return GroupAttestationSigning(self)

View File

@ -27,21 +27,7 @@ _DEFAULT_CATEGORY_ID = ""
_DEFAULT_ROLE_ID = ""
class GroupServerStore(SQLBaseStore):
def set_group_join_policy(self, group_id, join_policy):
"""Set the join policy of a group.
join_policy can be one of:
* "invite"
* "open"
"""
return self.db.simple_update_one(
table="groups",
keyvalues={"group_id": group_id},
updatevalues={"join_policy": join_policy},
desc="set_group_join_policy",
)
class GroupServerWorkerStore(SQLBaseStore):
def get_group(self, group_id):
return self.db.simple_select_one(
table="groups",
@ -157,6 +143,366 @@ class GroupServerStore(SQLBaseStore):
"get_rooms_for_summary", _get_rooms_for_summary_txn
)
@defer.inlineCallbacks
def get_group_categories(self, group_id):
rows = yield self.db.simple_select_list(
table="group_room_categories",
keyvalues={"group_id": group_id},
retcols=("category_id", "is_public", "profile"),
desc="get_group_categories",
)
return {
row["category_id"]: {
"is_public": row["is_public"],
"profile": json.loads(row["profile"]),
}
for row in rows
}
@defer.inlineCallbacks
def get_group_category(self, group_id, category_id):
category = yield self.db.simple_select_one(
table="group_room_categories",
keyvalues={"group_id": group_id, "category_id": category_id},
retcols=("is_public", "profile"),
desc="get_group_category",
)
category["profile"] = json.loads(category["profile"])
return category
@defer.inlineCallbacks
def get_group_roles(self, group_id):
rows = yield self.db.simple_select_list(
table="group_roles",
keyvalues={"group_id": group_id},
retcols=("role_id", "is_public", "profile"),
desc="get_group_roles",
)
return {
row["role_id"]: {
"is_public": row["is_public"],
"profile": json.loads(row["profile"]),
}
for row in rows
}
@defer.inlineCallbacks
def get_group_role(self, group_id, role_id):
role = yield self.db.simple_select_one(
table="group_roles",
keyvalues={"group_id": group_id, "role_id": role_id},
retcols=("is_public", "profile"),
desc="get_group_role",
)
role["profile"] = json.loads(role["profile"])
return role
def get_local_groups_for_room(self, room_id):
"""Get all of the local group that contain a given room
Args:
room_id (str): The ID of a room
Returns:
Deferred[list[str]]: A twisted.Deferred containing a list of group ids
containing this room
"""
return self.db.simple_select_onecol(
table="group_rooms",
keyvalues={"room_id": room_id},
retcol="group_id",
desc="get_local_groups_for_room",
)
def get_users_for_summary_by_role(self, group_id, include_private=False):
"""Get the users and roles that should be included in a summary request
Returns ([users], [roles])
"""
def _get_users_for_summary_txn(txn):
keyvalues = {"group_id": group_id}
if not include_private:
keyvalues["is_public"] = True
sql = """
SELECT user_id, is_public, role_id, user_order
FROM group_summary_users
WHERE group_id = ?
"""
if not include_private:
sql += " AND is_public = ?"
txn.execute(sql, (group_id, True))
else:
txn.execute(sql, (group_id,))
users = [
{
"user_id": row[0],
"is_public": row[1],
"role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
"order": row[3],
}
for row in txn
]
sql = """
SELECT role_id, is_public, profile, role_order
FROM group_summary_roles
INNER JOIN group_roles USING (group_id, role_id)
WHERE group_id = ?
"""
if not include_private:
sql += " AND is_public = ?"
txn.execute(sql, (group_id, True))
else:
txn.execute(sql, (group_id,))
roles = {
row[0]: {
"is_public": row[1],
"profile": json.loads(row[2]),
"order": row[3],
}
for row in txn
}
return users, roles
return self.db.runInteraction(
"get_users_for_summary_by_role", _get_users_for_summary_txn
)
def is_user_in_group(self, user_id, group_id):
return self.db.simple_select_one_onecol(
table="group_users",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="user_id",
allow_none=True,
desc="is_user_in_group",
).addCallback(lambda r: bool(r))
def is_user_admin_in_group(self, group_id, user_id):
return self.db.simple_select_one_onecol(
table="group_users",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="is_admin",
allow_none=True,
desc="is_user_admin_in_group",
)
def is_user_invited_to_local_group(self, group_id, user_id):
"""Has the group server invited a user?
"""
return self.db.simple_select_one_onecol(
table="group_invites",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="user_id",
desc="is_user_invited_to_local_group",
allow_none=True,
)
def get_users_membership_info_in_group(self, group_id, user_id):
"""Get a dict describing the membership of a user in a group.
Example if joined:
{
"membership": "join",
"is_public": True,
"is_privileged": False,
}
Returns an empty dict if the user is not join/invite/etc
"""
def _get_users_membership_in_group_txn(txn):
row = self.db.simple_select_one_txn(
txn,
table="group_users",
keyvalues={"group_id": group_id, "user_id": user_id},
retcols=("is_admin", "is_public"),
allow_none=True,
)
if row:
return {
"membership": "join",
"is_public": row["is_public"],
"is_privileged": row["is_admin"],
}
row = self.db.simple_select_one_onecol_txn(
txn,
table="group_invites",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="user_id",
allow_none=True,
)
if row:
return {"membership": "invite"}
return {}
return self.db.runInteraction(
"get_users_membership_info_in_group", _get_users_membership_in_group_txn
)
def get_publicised_groups_for_user(self, user_id):
"""Get all groups a user is publicising
"""
return self.db.simple_select_onecol(
table="local_group_membership",
keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
retcol="group_id",
desc="get_publicised_groups_for_user",
)
def get_attestations_need_renewals(self, valid_until_ms):
"""Get all attestations that need to be renewed until givent time
"""
def _get_attestations_need_renewals_txn(txn):
sql = """
SELECT group_id, user_id FROM group_attestations_renewals
WHERE valid_until_ms <= ?
"""
txn.execute(sql, (valid_until_ms,))
return self.db.cursor_to_dict(txn)
return self.db.runInteraction(
"get_attestations_need_renewals", _get_attestations_need_renewals_txn
)
@defer.inlineCallbacks
def get_remote_attestation(self, group_id, user_id):
"""Get the attestation that proves the remote agrees that the user is
in the group.
"""
row = yield self.db.simple_select_one(
table="group_attestations_remote",
keyvalues={"group_id": group_id, "user_id": user_id},
retcols=("valid_until_ms", "attestation_json"),
desc="get_remote_attestation",
allow_none=True,
)
now = int(self._clock.time_msec())
if row and now < row["valid_until_ms"]:
return json.loads(row["attestation_json"])
return None
def get_joined_groups(self, user_id):
return self.db.simple_select_onecol(
table="local_group_membership",
keyvalues={"user_id": user_id, "membership": "join"},
retcol="group_id",
desc="get_joined_groups",
)
def get_all_groups_for_user(self, user_id, now_token):
def _get_all_groups_for_user_txn(txn):
sql = """
SELECT group_id, type, membership, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND membership != 'leave'
AND stream_id <= ?
"""
txn.execute(sql, (user_id, now_token))
return [
{
"group_id": row[0],
"type": row[1],
"membership": row[2],
"content": json.loads(row[3]),
}
for row in txn
]
return self.db.runInteraction(
"get_all_groups_for_user", _get_all_groups_for_user_txn
)
def get_groups_changes_for_user(self, user_id, from_token, to_token):
from_token = int(from_token)
has_changed = self._group_updates_stream_cache.has_entity_changed(
user_id, from_token
)
if not has_changed:
return defer.succeed([])
def _get_groups_changes_for_user_txn(txn):
sql = """
SELECT group_id, membership, type, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (user_id, from_token, to_token))
return [
{
"group_id": group_id,
"membership": membership,
"type": gtype,
"content": json.loads(content_json),
}
for group_id, membership, gtype, content_json in txn
]
return self.db.runInteraction(
"get_groups_changes_for_user", _get_groups_changes_for_user_txn
)
def get_all_groups_changes(self, from_token, to_token, limit):
from_token = int(from_token)
has_changed = self._group_updates_stream_cache.has_any_entity_changed(
from_token
)
if not has_changed:
return defer.succeed([])
def _get_all_groups_changes_txn(txn):
sql = """
SELECT stream_id, group_id, user_id, type, content
FROM local_group_updates
WHERE ? < stream_id AND stream_id <= ?
LIMIT ?
"""
txn.execute(sql, (from_token, to_token, limit))
return [
(stream_id, group_id, user_id, gtype, json.loads(content_json))
for stream_id, group_id, user_id, gtype, content_json in txn
]
return self.db.runInteraction(
"get_all_groups_changes", _get_all_groups_changes_txn
)
class GroupServerStore(GroupServerWorkerStore):
def set_group_join_policy(self, group_id, join_policy):
"""Set the join policy of a group.
join_policy can be one of:
* "invite"
* "open"
"""
return self.db.simple_update_one(
table="groups",
keyvalues={"group_id": group_id},
updatevalues={"join_policy": join_policy},
desc="set_group_join_policy",
)
def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
return self.db.runInteraction(
"add_room_to_summary",
@ -299,36 +645,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_room_from_summary",
)
@defer.inlineCallbacks
def get_group_categories(self, group_id):
rows = yield self.db.simple_select_list(
table="group_room_categories",
keyvalues={"group_id": group_id},
retcols=("category_id", "is_public", "profile"),
desc="get_group_categories",
)
return {
row["category_id"]: {
"is_public": row["is_public"],
"profile": json.loads(row["profile"]),
}
for row in rows
}
@defer.inlineCallbacks
def get_group_category(self, group_id, category_id):
category = yield self.db.simple_select_one(
table="group_room_categories",
keyvalues={"group_id": group_id, "category_id": category_id},
retcols=("is_public", "profile"),
desc="get_group_category",
)
category["profile"] = json.loads(category["profile"])
return category
def upsert_group_category(self, group_id, category_id, profile, is_public):
"""Add/update room category for group
"""
@ -360,36 +676,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_group_category",
)
@defer.inlineCallbacks
def get_group_roles(self, group_id):
rows = yield self.db.simple_select_list(
table="group_roles",
keyvalues={"group_id": group_id},
retcols=("role_id", "is_public", "profile"),
desc="get_group_roles",
)
return {
row["role_id"]: {
"is_public": row["is_public"],
"profile": json.loads(row["profile"]),
}
for row in rows
}
@defer.inlineCallbacks
def get_group_role(self, group_id, role_id):
role = yield self.db.simple_select_one(
table="group_roles",
keyvalues={"group_id": group_id, "role_id": role_id},
retcols=("is_public", "profile"),
desc="get_group_role",
)
role["profile"] = json.loads(role["profile"])
return role
def upsert_group_role(self, group_id, role_id, profile, is_public):
"""Add/remove user role
"""
@ -555,100 +841,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_user_from_summary",
)
def get_local_groups_for_room(self, room_id):
"""Get all of the local group that contain a given room
Args:
room_id (str): The ID of a room
Returns:
Deferred[list[str]]: A twisted.Deferred containing a list of group ids
containing this room
"""
return self.db.simple_select_onecol(
table="group_rooms",
keyvalues={"room_id": room_id},
retcol="group_id",
desc="get_local_groups_for_room",
)
def get_users_for_summary_by_role(self, group_id, include_private=False):
"""Get the users and roles that should be included in a summary request
Returns ([users], [roles])
"""
def _get_users_for_summary_txn(txn):
keyvalues = {"group_id": group_id}
if not include_private:
keyvalues["is_public"] = True
sql = """
SELECT user_id, is_public, role_id, user_order
FROM group_summary_users
WHERE group_id = ?
"""
if not include_private:
sql += " AND is_public = ?"
txn.execute(sql, (group_id, True))
else:
txn.execute(sql, (group_id,))
users = [
{
"user_id": row[0],
"is_public": row[1],
"role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
"order": row[3],
}
for row in txn
]
sql = """
SELECT role_id, is_public, profile, role_order
FROM group_summary_roles
INNER JOIN group_roles USING (group_id, role_id)
WHERE group_id = ?
"""
if not include_private:
sql += " AND is_public = ?"
txn.execute(sql, (group_id, True))
else:
txn.execute(sql, (group_id,))
roles = {
row[0]: {
"is_public": row[1],
"profile": json.loads(row[2]),
"order": row[3],
}
for row in txn
}
return users, roles
return self.db.runInteraction(
"get_users_for_summary_by_role", _get_users_for_summary_txn
)
def is_user_in_group(self, user_id, group_id):
return self.db.simple_select_one_onecol(
table="group_users",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="user_id",
allow_none=True,
desc="is_user_in_group",
).addCallback(lambda r: bool(r))
def is_user_admin_in_group(self, group_id, user_id):
return self.db.simple_select_one_onecol(
table="group_users",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="is_admin",
allow_none=True,
desc="is_user_admin_in_group",
)
def add_group_invite(self, group_id, user_id):
"""Record that the group server has invited a user
"""
@ -658,64 +850,6 @@ class GroupServerStore(SQLBaseStore):
desc="add_group_invite",
)
def is_user_invited_to_local_group(self, group_id, user_id):
"""Has the group server invited a user?
"""
return self.db.simple_select_one_onecol(
table="group_invites",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="user_id",
desc="is_user_invited_to_local_group",
allow_none=True,
)
def get_users_membership_info_in_group(self, group_id, user_id):
"""Get a dict describing the membership of a user in a group.
Example if joined:
{
"membership": "join",
"is_public": True,
"is_privileged": False,
}
Returns an empty dict if the user is not join/invite/etc
"""
def _get_users_membership_in_group_txn(txn):
row = self.db.simple_select_one_txn(
txn,
table="group_users",
keyvalues={"group_id": group_id, "user_id": user_id},
retcols=("is_admin", "is_public"),
allow_none=True,
)
if row:
return {
"membership": "join",
"is_public": row["is_public"],
"is_privileged": row["is_admin"],
}
row = self.db.simple_select_one_onecol_txn(
txn,
table="group_invites",
keyvalues={"group_id": group_id, "user_id": user_id},
retcol="user_id",
allow_none=True,
)
if row:
return {"membership": "invite"}
return {}
return self.db.runInteraction(
"get_users_membership_info_in_group", _get_users_membership_in_group_txn
)
def add_user_to_group(
self,
group_id,
@ -846,16 +980,6 @@ class GroupServerStore(SQLBaseStore):
"remove_room_from_group", _remove_room_from_group_txn
)
def get_publicised_groups_for_user(self, user_id):
"""Get all groups a user is publicising
"""
return self.db.simple_select_onecol(
table="local_group_membership",
keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
retcol="group_id",
desc="get_publicised_groups_for_user",
)
def update_group_publicity(self, group_id, user_id, publicise):
"""Update whether the user is publicising their membership of the group
"""
@ -1000,22 +1124,6 @@ class GroupServerStore(SQLBaseStore):
desc="update_group_profile",
)
def get_attestations_need_renewals(self, valid_until_ms):
"""Get all attestations that need to be renewed until givent time
"""
def _get_attestations_need_renewals_txn(txn):
sql = """
SELECT group_id, user_id FROM group_attestations_renewals
WHERE valid_until_ms <= ?
"""
txn.execute(sql, (valid_until_ms,))
return self.db.cursor_to_dict(txn)
return self.db.runInteraction(
"get_attestations_need_renewals", _get_attestations_need_renewals_txn
)
def update_attestation_renewal(self, group_id, user_id, attestation):
"""Update an attestation that we have renewed
"""
@ -1054,112 +1162,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_attestation_renewal",
)
@defer.inlineCallbacks
def get_remote_attestation(self, group_id, user_id):
"""Get the attestation that proves the remote agrees that the user is
in the group.
"""
row = yield self.db.simple_select_one(
table="group_attestations_remote",
keyvalues={"group_id": group_id, "user_id": user_id},
retcols=("valid_until_ms", "attestation_json"),
desc="get_remote_attestation",
allow_none=True,
)
now = int(self._clock.time_msec())
if row and now < row["valid_until_ms"]:
return json.loads(row["attestation_json"])
return None
def get_joined_groups(self, user_id):
return self.db.simple_select_onecol(
table="local_group_membership",
keyvalues={"user_id": user_id, "membership": "join"},
retcol="group_id",
desc="get_joined_groups",
)
def get_all_groups_for_user(self, user_id, now_token):
def _get_all_groups_for_user_txn(txn):
sql = """
SELECT group_id, type, membership, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND membership != 'leave'
AND stream_id <= ?
"""
txn.execute(sql, (user_id, now_token))
return [
{
"group_id": row[0],
"type": row[1],
"membership": row[2],
"content": json.loads(row[3]),
}
for row in txn
]
return self.db.runInteraction(
"get_all_groups_for_user", _get_all_groups_for_user_txn
)
def get_groups_changes_for_user(self, user_id, from_token, to_token):
from_token = int(from_token)
has_changed = self._group_updates_stream_cache.has_entity_changed(
user_id, from_token
)
if not has_changed:
return defer.succeed([])
def _get_groups_changes_for_user_txn(txn):
sql = """
SELECT group_id, membership, type, u.content
FROM local_group_updates AS u
INNER JOIN local_group_membership USING (group_id, user_id)
WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (user_id, from_token, to_token))
return [
{
"group_id": group_id,
"membership": membership,
"type": gtype,
"content": json.loads(content_json),
}
for group_id, membership, gtype, content_json in txn
]
return self.db.runInteraction(
"get_groups_changes_for_user", _get_groups_changes_for_user_txn
)
def get_all_groups_changes(self, from_token, to_token, limit):
from_token = int(from_token)
has_changed = self._group_updates_stream_cache.has_any_entity_changed(
from_token
)
if not has_changed:
return defer.succeed([])
def _get_all_groups_changes_txn(txn):
sql = """
SELECT stream_id, group_id, user_id, type, content
FROM local_group_updates
WHERE ? < stream_id AND stream_id <= ?
LIMIT ?
"""
txn.execute(sql, (from_token, to_token, limit))
return [
(stream_id, group_id, user_id, gtype, json.loads(content_json))
for stream_id, group_id, user_id, gtype, content_json in txn
]
return self.db.runInteraction(
"get_all_groups_changes", _get_all_groups_changes_txn
)
def get_group_stream_token(self):
return self._group_updates_id_gen.get_current_token()