Convert user directory handler and related classes to async/await. (#7640)

pull/7649/head
Patrick Cloke 2020-06-05 14:42:55 -04:00 committed by GitHub
parent 09099313e6
commit 737b4a936e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 78 additions and 111 deletions

1
changelog.d/7640.misc Normal file
View File

@ -0,0 +1 @@
Convert user directory, state deltas, and stats handlers to async/await.

View File

@ -207,8 +207,10 @@ class RegistrationHandler(BaseHandler):
if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(localpart)
yield self.user_directory_handler.handle_local_profile_change(
user_id, profile
yield defer.ensureDeferred(
self.user_directory_handler.handle_local_profile_change(
user_id, profile
)
)
else:

View File

@ -15,8 +15,6 @@
import logging
from twisted.internet import defer
logger = logging.getLogger(__name__)
@ -24,8 +22,7 @@ class StateDeltasHandler(object):
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
async def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
"""Given two events check if the `key_name` field in content changed
from not matching `public_value` to doing so.
@ -41,10 +38,10 @@ class StateDeltasHandler(object):
prev_event = None
event = None
if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
if event_id:
event = yield self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)
if not event and not prev_event:
logger.debug("Neither event exists: %r %r", prev_event_id, event_id)

View File

@ -16,17 +16,14 @@
import logging
from collections import Counter
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
class StatsHandler(StateDeltasHandler):
class StatsHandler:
"""Handles keeping the *_stats tables updated with a simple time-series of
information about the users, rooms and media on the server, such that admins
have some idea of who is consuming their resources.
@ -35,7 +32,6 @@ class StatsHandler(StateDeltasHandler):
"""
def __init__(self, hs):
super(StatsHandler, self).__init__(hs)
self.hs = hs
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@ -68,20 +64,18 @@ class StatsHandler(StateDeltasHandler):
self._is_processing = True
@defer.inlineCallbacks
def process():
async def process():
try:
yield self._unsafe_process()
await self._unsafe_process()
finally:
self._is_processing = False
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
async def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_positions()
self.pos = await self.store.get_stats_positions()
# Loop round handling deltas until we're up to date
@ -96,13 +90,13 @@ class StatsHandler(StateDeltasHandler):
logger.debug(
"Processing room stats %s->%s", self.pos, room_max_stream_ordering
)
max_pos, deltas = yield self.store.get_current_state_deltas(
max_pos, deltas = await self.store.get_current_state_deltas(
self.pos, room_max_stream_ordering
)
if deltas:
logger.debug("Handling %d state deltas", len(deltas))
room_deltas, user_deltas = yield self._handle_deltas(deltas)
room_deltas, user_deltas = await self._handle_deltas(deltas)
else:
room_deltas = {}
user_deltas = {}
@ -111,7 +105,7 @@ class StatsHandler(StateDeltasHandler):
(
room_count,
user_count,
) = yield self.store.get_changes_room_total_events_and_bytes(
) = await self.store.get_changes_room_total_events_and_bytes(
self.pos, max_pos
)
@ -125,7 +119,7 @@ class StatsHandler(StateDeltasHandler):
logger.debug("user_deltas: %s", user_deltas)
# Always call this so that we update the stats position.
yield self.store.bulk_update_stats_delta(
await self.store.bulk_update_stats_delta(
self.clock.time_msec(),
updates={"room": room_deltas, "user": user_deltas},
stream_id=max_pos,
@ -137,13 +131,12 @@ class StatsHandler(StateDeltasHandler):
self.pos = max_pos
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
async def _handle_deltas(self, deltas):
"""Called with the state deltas to process
Returns:
Deferred[tuple[dict[str, Counter], dict[str, counter]]]
Resovles to two dicts, the room deltas and the user deltas,
tuple[dict[str, Counter], dict[str, counter]]
Two dicts: the room deltas and the user deltas,
mapping from room/user ID to changes in the various fields.
"""
@ -162,7 +155,7 @@ class StatsHandler(StateDeltasHandler):
logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_stats("room", room_id)
token = await self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@ -184,7 +177,7 @@ class StatsHandler(StateDeltasHandler):
sender = None
if event_id is not None:
event = yield self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}
sender = event.sender
@ -200,16 +193,16 @@ class StatsHandler(StateDeltasHandler):
room_stats_delta["current_state_events"] += 1
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
# we could use StateDeltasHandler._get_key_change here but it's
# a bit inefficient given we're not testing for a specific
# result; might as well just grab the prev_membership and
# membership strings and compare them.
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event = await self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
@ -301,6 +294,6 @@ class StatsHandler(StateDeltasHandler):
for room_id, state in room_to_state_updates.items():
logger.debug("Updating room_stats_state for %s: %s", room_id, state)
yield self.store.update_room_state(room_id, state)
await self.store.update_room_state(room_id, state)
return room_to_stats_deltas, user_to_stats_deltas

View File

@ -17,14 +17,11 @@ import logging
from six import iteritems, iterkeys
from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@ -103,43 +100,39 @@ class UserDirectoryHandler(StateDeltasHandler):
if self._is_processing:
return
@defer.inlineCallbacks
def process():
async def process():
try:
yield self._unsafe_process()
await self._unsafe_process()
finally:
self._is_processing = False
self._is_processing = True
run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):
async def handle_local_profile_change(self, user_id, profile):
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
is_support = yield self.store.is_support_user(user_id)
is_support = await self.store.is_support_user(user_id)
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
await self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
def handle_user_deactivated(self, user_id):
async def handle_user_deactivated(self, user_id):
"""Called when a user ID is deactivated
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
await self.store.remove_from_user_dir(user_id)
@defer.inlineCallbacks
def _unsafe_process(self):
async def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()
self.pos = await self.store.get_user_directory_stream_pos()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
@ -155,12 +148,12 @@ class UserDirectoryHandler(StateDeltasHandler):
logger.debug(
"Processing user stats %s->%s", self.pos, room_max_stream_ordering
)
max_pos, deltas = yield self.store.get_current_state_deltas(
max_pos, deltas = await self.store.get_current_state_deltas(
self.pos, room_max_stream_ordering
)
logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
await self._handle_deltas(deltas)
self.pos = max_pos
@ -169,10 +162,9 @@ class UserDirectoryHandler(StateDeltasHandler):
max_pos
)
yield self.store.update_user_directory_stream_pos(max_pos)
await self.store.update_user_directory_stream_pos(max_pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
async def _handle_deltas(self, deltas):
"""Called with the state deltas to process
"""
for delta in deltas:
@ -187,11 +179,11 @@ class UserDirectoryHandler(StateDeltasHandler):
# For join rule and visibility changes we need to check if the room
# may have become public or not and add/remove the users in said room
if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
yield self._handle_room_publicity_change(
await self._handle_room_publicity_change(
room_id, prev_event_id, event_id, typ
)
elif typ == EventTypes.Member:
change = yield self._get_key_change(
change = await self._get_key_change(
prev_event_id,
event_id,
key_name="membership",
@ -201,7 +193,7 @@ class UserDirectoryHandler(StateDeltasHandler):
if change is False:
# Need to check if the server left the room entirely, if so
# we might need to remove all the users in that room
is_in_room = yield self.store.is_host_joined(
is_in_room = await self.store.is_host_joined(
room_id, self.server_name
)
if not is_in_room:
@ -209,40 +201,41 @@ class UserDirectoryHandler(StateDeltasHandler):
# Fetch all the users that we marked as being in user
# directory due to being in the room and then check if
# need to remove those users or not
user_ids = yield self.store.get_users_in_dir_due_to_room(
user_ids = await self.store.get_users_in_dir_due_to_room(
room_id
)
for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id)
await self._handle_remove_user(room_id, user_id)
return
else:
logger.debug("Server is still in room: %r", room_id)
is_support = yield self.store.is_support_user(state_key)
is_support = await self.store.is_support_user(state_key)
if not is_support:
if change is None:
# Handle any profile changes
yield self._handle_profile_change(
await self._handle_profile_change(
state_key, room_id, prev_event_id, event_id
)
continue
if change: # The user joined
event = yield self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)
profile = ProfileInfo(
avatar_url=event.content.get("avatar_url"),
display_name=event.content.get("displayname"),
)
yield self._handle_new_user(room_id, state_key, profile)
await self._handle_new_user(room_id, state_key, profile)
else: # The user left
yield self._handle_remove_user(room_id, state_key)
await self._handle_remove_user(room_id, state_key)
else:
logger.debug("Ignoring irrelevant type: %r", typ)
@defer.inlineCallbacks
def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ):
async def _handle_room_publicity_change(
self, room_id, prev_event_id, event_id, typ
):
"""Handle a room having potentially changed from/to world_readable/publically
joinable.
@ -255,14 +248,14 @@ class UserDirectoryHandler(StateDeltasHandler):
logger.debug("Handling change for %s: %s", typ, room_id)
if typ == EventTypes.RoomHistoryVisibility:
change = yield self._get_key_change(
change = await self._get_key_change(
prev_event_id,
event_id,
key_name="history_visibility",
public_value="world_readable",
)
elif typ == EventTypes.JoinRules:
change = yield self._get_key_change(
change = await self._get_key_change(
prev_event_id,
event_id,
key_name="join_rule",
@ -278,7 +271,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# There's been a change to or from being world readable.
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
is_public = await self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
@ -293,11 +286,11 @@ class UserDirectoryHandler(StateDeltasHandler):
# ignore the change
return
users_with_profile = yield self.state.get_current_users_in_room(room_id)
users_with_profile = await self.state.get_current_users_in_room(room_id)
# Remove every user from the sharing tables for that room.
for user_id in iterkeys(users_with_profile):
yield self.store.remove_user_who_share_room(user_id, room_id)
await self.store.remove_user_who_share_room(user_id, room_id)
# Then, re-add them to the tables.
# NOTE: this is not the most efficient method, as handle_new_user sets
@ -306,26 +299,9 @@ class UserDirectoryHandler(StateDeltasHandler):
# being added multiple times. The batching upserts shouldn't make this
# too bad, though.
for user_id, profile in iteritems(users_with_profile):
yield self._handle_new_user(room_id, user_id, profile)
await self._handle_new_user(room_id, user_id, profile)
@defer.inlineCallbacks
def _handle_local_user(self, user_id):
"""Adds a new local roomless user into the user_directory_search table.
Used to populate up the user index when we have an
user_directory_search_all_users specified.
"""
logger.debug("Adding new local user to dir, %r", user_id)
profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id))
row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
async def _handle_new_user(self, room_id, user_id, profile):
"""Called when we might need to add user to directory
Args:
@ -334,18 +310,18 @@ class UserDirectoryHandler(StateDeltasHandler):
"""
logger.debug("Adding new user to dir, %r", user_id)
yield self.store.update_profile_in_user_dir(
await self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
is_public = await self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
# Now we update users who share rooms with users.
users_with_profile = yield self.state.get_current_users_in_room(room_id)
users_with_profile = await self.state.get_current_users_in_room(room_id)
if is_public:
yield self.store.add_users_in_public_rooms(room_id, (user_id,))
await self.store.add_users_in_public_rooms(room_id, (user_id,))
else:
to_insert = set()
@ -376,10 +352,9 @@ class UserDirectoryHandler(StateDeltasHandler):
to_insert.add((other_user_id, user_id))
if to_insert:
yield self.store.add_users_who_share_private_room(room_id, to_insert)
await self.store.add_users_who_share_private_room(room_id, to_insert)
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
async def _handle_remove_user(self, room_id, user_id):
"""Called when we might need to remove user from directory
Args:
@ -389,24 +364,23 @@ class UserDirectoryHandler(StateDeltasHandler):
logger.debug("Removing user %r", user_id)
# Remove user from sharing tables
yield self.store.remove_user_who_share_room(user_id, room_id)
await self.store.remove_user_who_share_room(user_id, room_id)
# Are they still in any rooms? If not, remove them entirely.
rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
rooms_user_is_in = await self.store.get_user_dir_rooms_user_is_in(user_id)
if len(rooms_user_is_in) == 0:
yield self.store.remove_from_user_dir(user_id)
await self.store.remove_from_user_dir(user_id)
@defer.inlineCallbacks
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
async def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
"""Check member event changes for any profile changes and update the
database if there are.
"""
if not prev_event_id or not event_id:
return
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
event = yield self.store.get_event(event_id, allow_none=True)
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)
if not prev_event or not event:
return
@ -421,4 +395,4 @@ class UserDirectoryHandler(StateDeltasHandler):
new_avatar = event.content.get("avatar_url")
if prev_name != new_name or prev_avatar != new_avatar:
yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)

View File

@ -14,6 +14,8 @@
# limitations under the License.
from mock import Mock
from twisted.internet import defer
import synapse.rest.admin
from synapse.api.constants import UserTypes
from synapse.rest.client.v1 import login, room
@ -75,18 +77,16 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
)
)
self.store.remove_from_user_dir = Mock()
self.store.remove_from_user_in_public_room = Mock()
self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None))
self.get_success(self.handler.handle_user_deactivated(s_user_id))
self.store.remove_from_user_dir.not_called()
self.store.remove_from_user_in_public_room.not_called()
def test_handle_user_deactivated_regular_user(self):
r_user_id = "@regular:test"
self.get_success(
self.store.register_user(user_id=r_user_id, password_hash=None)
)
self.store.remove_from_user_dir = Mock()
self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None))
self.get_success(self.handler.handle_user_deactivated(r_user_id))
self.store.remove_from_user_dir.called_once_with(r_user_id)