Merge remote-tracking branch 'origin/develop' into markjh/synchrotron

markjh/synchrotron
Mark Haines 2016-05-26 16:41:06 +01:00
commit e50861eeb7
15 changed files with 223 additions and 56 deletions

View File

@ -32,5 +32,4 @@ The format of the AS configuration file is as follows:
See the spec_ for further details on how application services work.
.. _spec: https://github.com/matrix-org/matrix-doc/blob/master/specification/25_application_service_api.rst#application-service-api
.. _spec: https://matrix.org/docs/spec/application_service/unstable.html

View File

@ -145,6 +145,11 @@ pre, code {
text-decoration: none;
}
.debug {
font-size: 10px;
color: #888;
}
.footer {
margin-top: 20px;
text-align: center;

View File

@ -30,18 +30,20 @@
{% include 'room.html' with context %}
{% endfor %}
<div class="footer">
<small>
Sending email at {{ reason.now|format_ts("%c") }} due to activity in room '{{ reason.room_name }}' because:<br/>
1. An event was received at {{ reason.received_at|format_ts("%c") }}
which is more than {{ "%.1f"|format(reason.delay_before_mail_ms / (60*1000)) }} (delay_before_mail_ms) mins ago.<br/>
<a href="{{ unsubscribe_link }}">Unsubscribe</a>
<br/>
<br/>
<div class="debug">
Sending email at {{ reason.now|format_ts("%c") }} due to activity in room {{ reason.room_name }} because
an event was received at {{ reason.received_at|format_ts("%c") }}
which is more than {{ "%.1f"|format(reason.delay_before_mail_ms / (60*1000)) }} (delay_before_mail_ms) mins ago,
{% if reason.last_sent_ts %}
2. The last time we sent a mail for this room was {{ reason.last_sent_ts|format_ts("%c") }},
and the last time we sent a mail for this room was {{ reason.last_sent_ts|format_ts("%c") }},
which is more than {{ "%.1f"|format(reason.throttle_ms / (60*1000)) }} (current throttle_ms) mins ago.
{% else %}
2. We can't remember the last time we sent a mail for this room.
and we don't have a last time we sent a mail for this room.
{% endif %}
</small>
<a href="{{ unsubscribe_link }}">Unsubscribe</a>
</div>
</div>
</td>
<td> </td>

View File

@ -16,7 +16,7 @@
"""Contains functions for registering clients."""
from twisted.internet import defer
from synapse.types import UserID
from synapse.types import UserID, Requester
from synapse.api.errors import (
AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
)
@ -360,7 +360,8 @@ class RegistrationHandler(BaseHandler):
@defer.inlineCallbacks
def get_or_create_user(self, localpart, displayname, duration_seconds):
"""Creates a new user or returns an access token for an existing one
"""Creates a new user if the user does not exist,
else revokes all previous access tokens and generates a new one.
Args:
localpart : The local part of the user ID to register. If None,
@ -399,14 +400,14 @@ class RegistrationHandler(BaseHandler):
yield registered_user(self.distributor, user)
else:
yield self.store.flush_user(user_id=user_id)
yield self.store.user_delete_access_tokens(user_id=user_id)
yield self.store.add_access_token_to_user(user_id=user_id, token=token)
if displayname is not None:
logger.info("setting user display name: %s -> %s", user_id, displayname)
profile_handler = self.hs.get_handlers().profile_handler
yield profile_handler.set_displayname(
user, user, displayname
user, Requester(user, token, False), displayname
)
defer.returnValue((user_id, token))

View File

@ -32,12 +32,19 @@ DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000
# Each room maintains its own throttle counter, but each new mail notification
# sends the pending notifications for all rooms.
THROTTLE_START_MS = 10 * 60 * 1000
THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # (2 * 60 * 1000) * (2 ** 11) # ~3 days
THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours
THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # 24h
# THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours
THROTTLE_MULTIPLIER = 144 # 10 mins, 24 hours - i.e. jump straight to 1 day
# If no event triggers a notification for this long after the previous,
# the throttle is released.
THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days
# 12 hours - a gap of 12 hours in conversation is surely enough to merit a new
# notification when things get going again...
THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
# does each email include all unread notifs, or just the ones which have happened
# since the last mail?
INCLUDE_ALL_UNREAD_NOTIFS = True
class EmailPusher(object):
@ -126,8 +133,9 @@ class EmailPusher(object):
up logging, measures and guards against multiple instances of it
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
self.user_id, start, self.max_stream_ordering
)
soonest_due_at = None
@ -150,7 +158,6 @@ class EmailPusher(object):
# we then consider all previously outstanding notifications
# to be delivered.
# debugging:
reason = {
'room_id': push_action['room_id'],
'now': self.clock.time_msec(),
@ -165,9 +172,12 @@ class EmailPusher(object):
yield self.save_last_stream_ordering_and_success(max([
ea['stream_ordering'] for ea in unprocessed
]))
yield self.sent_notif_update_throttle(
push_action['room_id'], push_action
)
# we update the throttle on all the possible unprocessed push actions
for ea in unprocessed:
yield self.sent_notif_update_throttle(
ea['room_id'], ea
)
break
else:
if soonest_due_at is None or should_notify_at < soonest_due_at:

View File

@ -44,8 +44,11 @@ MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \
"in the %s room..."
MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
MESSAGES_IN_ROOM = "There are some messages on %(app)s for you in the %(room)s room..."
MESSAGES_IN_ROOMS = "Here are some messages on %(app)s you may have missed..."
MESSAGES_IN_ROOM = "You have messages on %(app)s in the %(room)s room..."
MESSAGES_IN_ROOM_AND_OTHERS = \
"You have messages on %(app)s in the %(room)s room and others..."
MESSAGES_FROM_PERSON_AND_OTHERS = \
"You have messages on %(app)s from %(person)s and others..."
INVITE_FROM_PERSON_TO_ROOM = "%(person)s has invited you to join the " \
"%(room)s room on %(app)s..."
INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
@ -128,9 +131,14 @@ class Mailer(object):
state_by_room[room_id] = room_state
# Run at most 3 of these at once: sync does 10 at a time but email
# notifs are much realtime than sync so we can afford to wait a bit.
# notifs are much less realtime than sync so we can afford to wait a bit.
yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
# actually sort our so-called rooms_in_order list, most recent room first
rooms_in_order.sort(
key=lambda r: -(notifs_by_room[r][-1]['received_ts'] or 0)
)
rooms = []
for r in rooms_in_order:
@ -139,12 +147,12 @@ class Mailer(object):
)
rooms.append(roomvars)
summary_text = self.make_summary_text(
notifs_by_room, state_by_room, notif_events, user_id
reason['room_name'] = calculate_room_name(
state_by_room[reason['room_id']], user_id, fallback_to_members=True
)
reason['room_name'] = calculate_room_name(
state_by_room[reason['room_id']], user_id, fallback_to_members=False
summary_text = self.make_summary_text(
notifs_by_room, state_by_room, notif_events, user_id, reason
)
template_vars = {
@ -251,7 +259,9 @@ class Mailer(object):
sender_state_event = room_state[("m.room.member", event.sender)]
sender_name = name_from_member_event(sender_state_event)
sender_avatar_url = sender_state_event.content["avatar_url"]
sender_avatar_url = None
if "avatar_url" in sender_state_event.content:
sender_avatar_url = sender_state_event.content["avatar_url"]
# 'hash' for deterministically picking default images: use
# sender_hash % the number of default images to choose from
@ -296,7 +306,8 @@ class Mailer(object):
return messagevars
def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id):
def make_summary_text(self, notifs_by_room, state_by_room,
notif_events, user_id, reason):
if len(notifs_by_room) == 1:
# Only one room has new stuff
room_id = notifs_by_room.keys()[0]
@ -371,9 +382,28 @@ class Mailer(object):
}
else:
# Stuff's happened in multiple different rooms
return MESSAGES_IN_ROOMS % {
"app": self.app_name,
}
# ...but we still refer to the 'reason' room which triggered the mail
if reason['room_name'] is not None:
return MESSAGES_IN_ROOM_AND_OTHERS % {
"room": reason['room_name'],
"app": self.app_name,
}
else:
# If the reason room doesn't have a name, say who the messages
# are from explicitly to avoid, "messages in the Bob room"
sender_ids = list(set([
notif_events[n['event_id']].sender
for n in notifs_by_room[reason['room_id']]
]))
return MESSAGES_FROM_PERSON_AND_OTHERS % {
"person": descriptor_from_member_events([
state_by_room[reason['room_id']][("m.room.member", s)]
for s in sender_ids
]),
"app": self.app_name,
}
def make_room_link(self, room_id):
# need /beta for Universal Links to work on iOS

View File

@ -38,7 +38,9 @@ def get_badge_count(store, user_id):
r.room_id, user_id, last_unread_event_id
)
)
badge += notifs["notify_count"]
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
defer.returnValue(badge)

View File

@ -232,7 +232,10 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
if RoomID.is_valid(room_identifier):
room_id = room_identifier
remote_room_hosts = None
try:
remote_room_hosts = request.args["server_name"]
except:
remote_room_hosts = None
elif RoomAlias.is_valid(room_identifier):
handler = self.handlers.room_member_handler
room_alias = RoomAlias.from_string(room_identifier)

View File

@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
def __init__(self, db_conn, hs):
self.hs = hs
self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.client_ip_last_seen = Cache(
@ -173,6 +174,14 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=push_rules_prefill,
)
cur = db_conn.cursor()
self._find_stream_orderings_for_times_txn(cur)
cur.close()
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 60 * 60 * 1000
)
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):

View File

@ -153,7 +153,6 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
self._clock = hs.get_clock()
self._previous_txn_total_time = 0
self._current_txn_total_time = 0

View File

@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
def __init__(self, hs):
self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs)
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@ -224,18 +228,93 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
def _remove_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering):
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering):
"""
Purges old, stale push actions for a user and room before a given
topological_ordering
Args:
txn: The transcation
room_id: Room ID to delete from
user_id: user ID to delete for
topological_ordering: The lowest topological ordering which will
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id, )
)
# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute(
"DELETE FROM event_push_actions"
" WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
(room_id, user_id, topological_ordering,)
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" topological_ordering < ? AND stream_ordering < ?",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
)
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
"_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn
)
def _find_stream_orderings_for_times_txn(self, txn):
logger.info("Searching for stream ordering 1 month ago")
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
Find the stream_ordering of the first event that was received after
a given timestamp. This is relatively slow as there is no index on
received_ts but we can then use this to delete push actions before
this.
received_ts must necessarily be in the same order as stream_ordering
and stream_ordering is indexed, so we manually binary search using
stream_ordering
"""
txn.execute("SELECT MAX(stream_ordering) FROM events")
max_stream_ordering = txn.fetchone()[0]
if max_stream_ordering is None:
return 0
range_start = 0
range_end = max_stream_ordering
sql = (
"SELECT received_ts FROM events"
" WHERE stream_ordering > ?"
" ORDER BY stream_ordering"
" LIMIT 1"
)
while range_end - range_start > 1:
middle = int((range_end + range_start) / 2)
txn.execute(sql, (middle,))
middle_ts = txn.fetchone()[0]
if ts > middle_ts:
range_start = middle
else:
range_end = middle
return range_end
def _action_has_highlight(actions):
for action in actions:

View File

@ -149,6 +149,7 @@ class PresenceStore(SQLBaseStore):
"status_msg",
"currently_active",
),
desc="get_presence_for_users",
)
for row in rows:

View File

@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
)
if receipt_type == "m.read" and topological_ordering:
self._remove_push_actions_before_txn(
self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
user_id=user_id,

View File

@ -14,6 +14,9 @@
# limitations under the License.
import re
import logging
logger = logging.getLogger(__name__)
# intentionally looser than what aliases we allow to be registered since
# other HSes may allow aliases that we would not
@ -105,13 +108,21 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True):
# or inbound invite, or outbound 3PID invite.
if all_members[0].sender == user_id:
if "m.room.third_party_invite" in room_state_bytype:
third_party_invites = room_state_bytype["m.room.third_party_invite"]
third_party_invites = (
room_state_bytype["m.room.third_party_invite"].values()
)
if len(third_party_invites) > 0:
# technically third party invite events are not member
# events, but they are close enough
return "Inviting %s" (
descriptor_from_member_events(third_party_invites)
)
# FIXME: no they're not - they look nothing like a member;
# they have a great big encrypted thing as their name to
# prevent leaking the 3PID name...
# return "Inviting %s" % (
# descriptor_from_member_events(third_party_invites)
# )
return "Inviting email address"
else:
return ALL_ALONE
else:

View File

@ -17,6 +17,7 @@ from twisted.internet import defer
from .. import unittest
from synapse.handlers.register import RegistrationHandler
from synapse.types import UserID
from tests.utils import setup_test_homeserver
@ -36,25 +37,21 @@ class RegistrationTestCase(unittest.TestCase):
self.mock_distributor = Mock()
self.mock_distributor.declare("registered_user")
self.mock_captcha_client = Mock()
hs = yield setup_test_homeserver(
self.hs = yield setup_test_homeserver(
handlers=None,
http_client=None,
expire_access_token=True)
hs.handlers = RegistrationHandlers(hs)
self.handler = hs.get_handlers().registration_handler
hs.get_handlers().profile_handler = Mock()
self.hs.handlers = RegistrationHandlers(self.hs)
self.handler = self.hs.get_handlers().registration_handler
self.hs.get_handlers().profile_handler = Mock()
self.mock_handler = Mock(spec=[
"generate_short_term_login_token",
])
hs.get_handlers().auth_handler = self.mock_handler
self.hs.get_handlers().auth_handler = self.mock_handler
@defer.inlineCallbacks
def test_user_is_created_and_logged_in_if_doesnt_exist(self):
"""
Returns:
The user doess not exist in this case so it will register and log it in
"""
duration_ms = 200
local_part = "someone"
display_name = "someone"
@ -65,3 +62,22 @@ class RegistrationTestCase(unittest.TestCase):
local_part, display_name, duration_ms)
self.assertEquals(result_user_id, user_id)
self.assertEquals(result_token, 'secret')
@defer.inlineCallbacks
def test_if_user_exists(self):
store = self.hs.get_datastore()
frank = UserID.from_string("@frank:test")
yield store.register(
user_id=frank.to_string(),
token="jkv;g498752-43gj['eamb!-5",
password_hash=None)
duration_ms = 200
local_part = "frank"
display_name = "Frank"
user_id = "@frank:test"
mock_token = self.mock_handler.generate_short_term_login_token
mock_token.return_value = 'secret'
result_user_id, result_token = yield self.handler.get_or_create_user(
local_part, display_name, duration_ms)
self.assertEquals(result_user_id, user_id)
self.assertEquals(result_token, 'secret')