Merge branch 'develop' of github.com:matrix-org/synapse into erikj/timings

erikj/timings
Erik Johnston 2016-06-01 16:46:46 +01:00
commit 40c5fffba1
22 changed files with 319 additions and 60 deletions

View File

@ -17,11 +17,15 @@
</td> </td>
<td class="message_contents"> <td class="message_contents">
{% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %} {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
<div class="sender_name">{{ message.sender_name }}</div> <div class="sender_name">{% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}</div>
{% endif %} {% endif %}
<div class="message_body"> <div class="message_body">
{% if message.msgtype == "m.text" %} {% if message.msgtype == "m.text" %}
{{ message.body_text_html }} {{ message.body_text_html }}
{% elif message.msgtype == "m.emote" %}
{{ message.body_text_html }}
{% elif message.msgtype == "m.notice" %}
{{ message.body_text_html }}
{% elif message.msgtype == "m.image" %} {% elif message.msgtype == "m.image" %}
<img src="{{ message.image_url|mxc_to_http(640, 480, scale) }}" /> <img src="{{ message.image_url|mxc_to_http(640, 480, scale) }}" />
{% elif message.msgtype == "m.file" %} {% elif message.msgtype == "m.file" %}

View File

@ -1,7 +1,11 @@
{% for message in notif.messages %} {% for message in notif.messages %}
{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }}) {% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
{% if message.msgtype == "m.text" %} {% if message.msgtype == "m.text" %}
{{ message.body_text_plain }} {{ message.body_text_plain }}
{% elif message.msgtype == "m.emote" %}
{{ message.body_text_plain }}
{% elif message.msgtype == "m.notice" %}
{{ message.body_text_plain }}
{% elif message.msgtype == "m.image" %} {% elif message.msgtype == "m.image" %}
{{ message.body_text_plain }} {{ message.body_text_plain }}
{% elif message.msgtype == "m.file" %} {% elif message.msgtype == "m.file" %}

View File

@ -56,22 +56,22 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AppServiceScheduler(object): class ApplicationServiceScheduler(object):
""" Public facing API for this module. Does the required DI to tie the """ Public facing API for this module. Does the required DI to tie the
components together. This also serves as the "event_pool", which in this components together. This also serves as the "event_pool", which in this
case is a simple array. case is a simple array.
""" """
def __init__(self, clock, store, as_api): def __init__(self, hs):
self.clock = clock self.clock = hs.get_clock()
self.store = store self.store = hs.get_datastore()
self.as_api = as_api self.as_api = hs.get_application_service_api()
def create_recoverer(service, callback): def create_recoverer(service, callback):
return _Recoverer(clock, store, as_api, service, callback) return _Recoverer(self.clock, self.store, self.as_api, service, callback)
self.txn_ctrl = _TransactionController( self.txn_ctrl = _TransactionController(
clock, store, as_api, create_recoverer self.clock, self.store, self.as_api, create_recoverer
) )
self.queuer = _ServiceQueuer(self.txn_ctrl) self.queuer = _ServiceQueuer(self.txn_ctrl)

View File

@ -29,6 +29,7 @@ class ServerConfig(Config):
self.user_agent_suffix = config.get("user_agent_suffix") self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", True) self.use_frozen_dicts = config.get("use_frozen_dicts", True)
self.public_baseurl = config.get("public_baseurl") self.public_baseurl = config.get("public_baseurl")
self.secondary_directory_servers = config.get("secondary_directory_servers", [])
if self.public_baseurl is not None: if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/': if self.public_baseurl[-1] != '/':
@ -156,6 +157,15 @@ class ServerConfig(Config):
# hard limit. # hard limit.
soft_file_limit: 0 soft_file_limit: 0
# A list of other Home Servers to fetch the public room directory from
# and include in the public room directory of this home server
# This is a temporary stopgap solution to populate new server with a
# list of rooms until there exists a good solution of a decentralized
# room directory.
# secondary_directory_servers:
# - matrix.org
# - vector.im
# List of ports that Synapse should listen on, their purpose and their # List of ports that Synapse should listen on, their purpose and their
# configuration. # configuration.
listeners: listeners:

View File

@ -24,6 +24,7 @@ from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError, CodeMessageException, HttpResponseException, SynapseError,
) )
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
@ -550,6 +551,25 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.") raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
def get_public_rooms(self, destinations):
results_by_server = {}
@defer.inlineCallbacks
def _get_result(s):
if s == self.server_name:
defer.returnValue()
try:
result = yield self.transport_layer.get_public_rooms(s)
results_by_server[s] = result
except:
logger.exception("Error getting room list from server %r", s)
yield concurrently_execute(_get_result, destinations, 3)
defer.returnValue(results_by_server)
@defer.inlineCallbacks @defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth): def query_auth(self, destination, room_id, event_id, local_auth):
""" """

View File

@ -224,6 +224,18 @@ class TransportLayerClient(object):
defer.returnValue(response) defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def get_public_rooms(self, remote_server):
path = PREFIX + "/publicRooms"
response = yield self.client.get_json(
destination=remote_server,
path=path,
)
defer.returnValue(response)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def exchange_third_party_invite(self, destination, room_id, event_dict): def exchange_third_party_invite(self, destination, room_id, event_dict):

View File

@ -134,10 +134,12 @@ class Authenticator(object):
class BaseFederationServlet(object): class BaseFederationServlet(object):
def __init__(self, handler, authenticator, ratelimiter, server_name): def __init__(self, handler, authenticator, ratelimiter, server_name,
room_list_handler):
self.handler = handler self.handler = handler
self.authenticator = authenticator self.authenticator = authenticator
self.ratelimiter = ratelimiter self.ratelimiter = ratelimiter
self.room_list_handler = room_list_handler
def _wrap(self, code): def _wrap(self, code):
authenticator = self.authenticator authenticator = self.authenticator
@ -492,6 +494,50 @@ class OpenIdUserInfo(BaseFederationServlet):
return code return code
class PublicRoomList(BaseFederationServlet):
"""
Fetch the public room list for this server.
This API returns information in the same format as /publicRooms on the
client API, but will only ever include local public rooms and hence is
intended for consumption by other home servers.
GET /publicRooms HTTP/1.1
HTTP/1.1 200 OK
Content-Type: application/json
{
"chunk": [
{
"aliases": [
"#test:localhost"
],
"guest_can_join": false,
"name": "test room",
"num_joined_members": 3,
"room_id": "!whkydVegtvatLfXmPN:localhost",
"world_readable": false
}
],
"end": "END",
"start": "START"
}
"""
PATH = "/publicRooms"
@defer.inlineCallbacks
def on_GET(self, request):
data = yield self.room_list_handler.get_local_public_room_list()
defer.returnValue((200, data))
# Avoid doing remote HS authorization checks which are done by default by
# BaseFederationServlet.
def _wrap(self, code):
return code
SERVLET_CLASSES = ( SERVLET_CLASSES = (
FederationSendServlet, FederationSendServlet,
FederationPullServlet, FederationPullServlet,
@ -513,6 +559,7 @@ SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet, FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet, On3pidBindServlet,
OpenIdUserInfo, OpenIdUserInfo,
PublicRoomList,
) )
@ -523,4 +570,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator, authenticator=authenticator,
ratelimiter=ratelimiter, ratelimiter=ratelimiter,
server_name=hs.hostname, server_name=hs.hostname,
room_list_handler=hs.get_room_list_handler(),
).register(resource) ).register(resource)

View File

@ -13,11 +13,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.appservice.scheduler import AppServiceScheduler
from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler from .register import RegistrationHandler
from .room import ( from .room import (
RoomCreationHandler, RoomListHandler, RoomContextHandler, RoomCreationHandler, RoomContextHandler,
) )
from .room_member import RoomMemberHandler from .room_member import RoomMemberHandler
from .message import MessageHandler from .message import MessageHandler
@ -26,7 +24,6 @@ from .federation import FederationHandler
from .profile import ProfileHandler from .profile import ProfileHandler
from .directory import DirectoryHandler from .directory import DirectoryHandler
from .admin import AdminHandler from .admin import AdminHandler
from .appservice import ApplicationServicesHandler
from .auth import AuthHandler from .auth import AuthHandler
from .identity import IdentityHandler from .identity import IdentityHandler
from .receipts import ReceiptsHandler from .receipts import ReceiptsHandler
@ -50,18 +47,9 @@ class Handlers(object):
self.event_handler = EventHandler(hs) self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs) self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs) self.profile_handler = ProfileHandler(hs)
self.room_list_handler = RoomListHandler(hs)
self.directory_handler = DirectoryHandler(hs) self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs) self.admin_handler = AdminHandler(hs)
self.receipts_handler = ReceiptsHandler(hs) self.receipts_handler = ReceiptsHandler(hs)
asapi = ApplicationServiceApi(hs)
self.appservice_handler = ApplicationServicesHandler(
hs, asapi, AppServiceScheduler(
clock=hs.get_clock(),
store=hs.get_datastore(),
as_api=asapi
)
)
self.auth_handler = AuthHandler(hs) self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs) self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs) self.search_handler = SearchHandler(hs)

View File

@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService from synapse.appservice import ApplicationService
from synapse.types import UserID
import logging import logging
@ -35,16 +34,13 @@ def log_failure(failure):
) )
# NB: Purposefully not inheriting BaseHandler since that contains way too much
# setup code which this handler does not need or use. This makes testing a lot
# easier.
class ApplicationServicesHandler(object): class ApplicationServicesHandler(object):
def __init__(self, hs, appservice_api, appservice_scheduler): def __init__(self, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.hs = hs self.is_mine_id = hs.is_mine_id
self.appservice_api = appservice_api self.appservice_api = hs.get_application_service_api()
self.scheduler = appservice_scheduler self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False self.started_scheduler = False
@defer.inlineCallbacks @defer.inlineCallbacks
@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _is_unknown_user(self, user_id): def _is_unknown_user(self, user_id):
user = UserID.from_string(user_id) if not self.is_mine_id(user_id):
if not self.hs.is_mine(user):
# we don't know if they are unknown or not since it isn't one of our # we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes. # users. We can't poke ASes.
defer.returnValue(False) defer.returnValue(False)

View File

@ -18,7 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler from ._base import BaseHandler
from synapse.api.constants import LoginType from synapse.api.constants import LoginType
from synapse.types import UserID from synapse.types import UserID
from synapse.api.errors import AuthError, LoginError, Codes from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from twisted.web.client import PartialDownloadError from twisted.web.client import PartialDownloadError
@ -563,7 +563,12 @@ class AuthHandler(BaseHandler):
except_access_token_ids = [requester.access_token_id] if requester else [] except_access_token_ids = [requester.access_token_id] if requester else []
try:
yield self.store.user_set_password_hash(user_id, password_hash) yield self.store.user_set_password_hash(user_id, password_hash)
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
raise e
yield self.store.user_delete_access_tokens( yield self.store.user_delete_access_tokens(
user_id, except_access_token_ids user_id, except_access_token_ids
) )

View File

@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
super(DirectoryHandler, self).__init__(hs) super(DirectoryHandler, self).__init__(hs)
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.appservice_handler = hs.get_application_service_handler()
self.federation = hs.get_replication_layer() self.federation = hs.get_replication_layer()
self.federation.register_query_handler( self.federation.register_query_handler(
@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
) )
if not result: if not result:
# Query AS to see if it exists # Query AS to see if it exists
as_handler = self.hs.get_handlers().appservice_handler as_handler = self.appservice_handler
result = yield as_handler.query_room_alias_exists(room_alias) result = yield as_handler.query_room_alias_exists(room_alias)
defer.returnValue(result) defer.returnValue(result)

View File

@ -36,6 +36,8 @@ import string
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
id_server_scheme = "https://" id_server_scheme = "https://"
@ -344,8 +346,14 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(RoomListHandler, self).__init__(hs) super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache() self.response_cache = ResponseCache()
self.remote_list_request_cache = ResponseCache()
self.remote_list_cache = {}
self.fetch_looping_call = hs.get_clock().looping_call(
self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
)
self.fetch_all_remote_lists()
def get_public_room_list(self): def get_local_public_room_list(self):
result = self.response_cache.get(()) result = self.response_cache.get(())
if not result: if not result:
result = self.response_cache.set((), self._get_public_room_list()) result = self.response_cache.set((), self._get_public_room_list())
@ -427,6 +435,55 @@ class RoomListHandler(BaseHandler):
# FIXME (erikj): START is no longer a valid value # FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": results}) defer.returnValue({"start": "START", "end": "END", "chunk": results})
@defer.inlineCallbacks
def fetch_all_remote_lists(self):
deferred = self.hs.get_replication_layer().get_public_rooms(
self.hs.config.secondary_directory_servers
)
self.remote_list_request_cache.set((), deferred)
yield deferred
@defer.inlineCallbacks
def get_aggregated_public_room_list(self):
"""
Get the public room list from this server and the servers
specified in the secondary_directory_servers config option.
XXX: Pagination...
"""
# We return the results from out cache which is updated by a looping call,
# unless we're missing a cache entry, in which case wait for the result
# of the fetch if there's one in progress. If not, omit that server.
wait = False
for s in self.hs.config.secondary_directory_servers:
if s not in self.remote_list_cache:
logger.warn("No cached room list from %s: waiting for fetch", s)
wait = True
break
if wait and self.remote_list_request_cache.get(()):
yield self.remote_list_request_cache.get(())
public_rooms = yield self.get_local_public_room_list()
# keep track of which room IDs we've seen so we can de-dup
room_ids = set()
# tag all the ones in our list with our server name.
# Also add the them to the de-deping set
for room in public_rooms['chunk']:
room["server_name"] = self.hs.hostname
room_ids.add(room["room_id"])
# Now add the results from federation
for server_name, server_result in self.remote_list_cache.items():
for room in server_result["chunk"]:
if room["room_id"] not in room_ids:
room["server_name"] = server_name
public_rooms["chunk"].append(room)
room_ids.add(room["room_id"])
defer.returnValue(public_rooms)
class RoomContextHandler(BaseHandler): class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -140,8 +140,6 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs): def __init__(self, hs):
self.hs = hs
self.user_to_user_stream = {} self.user_to_user_stream = {}
self.room_to_user_streams = {} self.room_to_user_streams = {}
self.appservice_to_user_streams = {} self.appservice_to_user_streams = {}
@ -151,6 +149,8 @@ class Notifier(object):
self.pending_new_room_events = [] self.pending_new_room_events = []
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self.state_handler = hs.get_state_handler()
hs.get_distributor().observe( hs.get_distributor().observe(
"user_joined_room", self._user_joined_room "user_joined_room", self._user_joined_room
@ -232,9 +232,7 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]): def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event""" """Notify any user streams that are interested in this room event"""
# poke any interested application service. # poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services( self.appservice_handler.notify_interested_services(event)
event
)
app_streams = set() app_streams = set()
@ -449,7 +447,7 @@ class Notifier(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _is_world_readable(self, room_id): def _is_world_readable(self, room_id):
state = yield self.hs.get_state_handler().get_current_state( state = yield self.state_handler.get_current_state(
room_id, room_id,
EventTypes.RoomHistoryVisibility EventTypes.RoomHistoryVisibility
) )

View File

@ -279,8 +279,9 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
handler = self.handlers.room_list_handler handler = self.hs.get_room_list_handler()
data = yield handler.get_public_room_list() data = yield handler.get_aggregated_public_room_list()
defer.returnValue((200, data)) defer.returnValue((200, data))

View File

@ -22,6 +22,8 @@
from twisted.web.client import BrowserLikePolicyForHTTPS from twisted.web.client import BrowserLikePolicyForHTTPS
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.appservice.api import ApplicationServiceApi
from synapse.federation import initialize_http_replication from synapse.federation import initialize_http_replication
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
from synapse.notifier import Notifier from synapse.notifier import Notifier
@ -30,6 +32,8 @@ from synapse.handlers import Handlers
from synapse.handlers.presence import PresenceHandler from synapse.handlers.presence import PresenceHandler
from synapse.handlers.sync import SyncHandler from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler from synapse.handlers.typing import TypingHandler
from synapse.handlers.room import RoomListHandler
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.state import StateHandler from synapse.state import StateHandler
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.util import Clock from synapse.util import Clock
@ -84,6 +88,10 @@ class HomeServer(object):
'presence_handler', 'presence_handler',
'sync_handler', 'sync_handler',
'typing_handler', 'typing_handler',
'room_list_handler',
'application_service_api',
'application_service_scheduler',
'application_service_handler',
'notifier', 'notifier',
'distributor', 'distributor',
'client_resource', 'client_resource',
@ -179,6 +187,18 @@ class HomeServer(object):
def build_sync_handler(self): def build_sync_handler(self):
return SyncHandler(self) return SyncHandler(self)
def build_room_list_handler(self):
return RoomListHandler(self)
def build_application_service_api(self):
return ApplicationServiceApi(self)
def build_application_service_scheduler(self):
return ApplicationServiceScheduler(self)
def build_application_service_handler(self):
return ApplicationServicesHandler(self)
def build_event_sources(self): def build_event_sources(self):
return EventSources(self) return EventSources(self)

View File

@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import ( from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore ApplicationServiceStore, ApplicationServiceTransactionStore
) )
from ._base import Cache from ._base import Cache, LoggingTransaction
from .directory import DirectoryStore from .directory import DirectoryStore
from .events import EventsStore from .events import EventsStore
from .presence import PresenceStore, UserPresenceState from .presence import PresenceStore, UserPresenceState
@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
self.hs = hs self.hs = hs
self._clock = hs.get_clock()
self.database_engine = hs.database_engine self.database_engine = hs.database_engine
self.client_ip_last_seen = Cache( self.client_ip_last_seen = Cache(
@ -173,6 +174,19 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=push_rules_prefill, prefilled_cache=push_rules_prefill,
) )
cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
database_engine=self.database_engine,
after_callbacks=[]
)
self._find_stream_orderings_for_times_txn(cur)
cur.close()
self.find_stream_orderings_looping_call = self._clock.looping_call(
self._find_stream_orderings_for_times, 60 * 60 * 1000
)
super(DataStore, self).__init__(hs) super(DataStore, self).__init__(hs)
def take_presence_startup_info(self): def take_presence_startup_info(self):

View File

@ -153,7 +153,6 @@ class SQLBaseStore(object):
def __init__(self, hs): def __init__(self, hs):
self.hs = hs self.hs = hs
self._db_pool = hs.get_db_pool() self._db_pool = hs.get_db_pool()
self._clock = hs.get_clock()
self._previous_txn_total_time = 0 self._previous_txn_total_time = 0
self._current_txn_total_time = 0 self._current_txn_total_time = 0

View File

@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore): class EventPushActionsStore(SQLBaseStore):
def __init__(self, hs):
self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs)
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
""" """
Args: Args:
@ -115,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range(self, user_id, def get_unread_push_actions_for_user_in_range(self, user_id,
min_stream_ordering, min_stream_ordering,
max_stream_ordering=None): max_stream_ordering=None,
limit=20):
def get_after_receipt(txn): def get_after_receipt(txn):
sql = ( sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
@ -147,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None: if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?" sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering) args.append(max_stream_ordering)
sql += " ORDER BY ep.stream_ordering ASC" sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
args.append(limit)
txn.execute(sql, args) txn.execute(sql, args)
return txn.fetchall() return txn.fetchall()
after_read_receipt = yield self.runInteraction( after_read_receipt = yield self.runInteraction(
@ -224,18 +230,93 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id) (room_id, event_id)
) )
def _remove_push_actions_before_txn(self, txn, room_id, user_id, def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering): topological_ordering):
"""
Purges old, stale push actions for a user and room before a given
topological_ordering
Args:
txn: The transcation
room_id: Room ID to delete from
user_id: user ID to delete for
topological_ordering: The lowest topological ordering which will
not be deleted.
"""
txn.call_after( txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many, self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id, ) (room_id, user_id, )
) )
# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute( txn.execute(
"DELETE FROM event_push_actions " "DELETE FROM event_push_actions "
" WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", " WHERE user_id = ? AND room_id = ? AND "
(room_id, user_id, topological_ordering,) " topological_ordering < ? AND stream_ordering < ?",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
) )
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
"_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn
)
def _find_stream_orderings_for_times_txn(self, txn):
logger.info("Searching for stream ordering 1 month ago")
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
Find the stream_ordering of the first event that was received after
a given timestamp. This is relatively slow as there is no index on
received_ts but we can then use this to delete push actions before
this.
received_ts must necessarily be in the same order as stream_ordering
and stream_ordering is indexed, so we manually binary search using
stream_ordering
"""
txn.execute("SELECT MAX(stream_ordering) FROM events")
max_stream_ordering = txn.fetchone()[0]
if max_stream_ordering is None:
return 0
range_start = 0
range_end = max_stream_ordering
sql = (
"SELECT received_ts FROM events"
" WHERE stream_ordering > ?"
" ORDER BY stream_ordering"
" LIMIT 1"
)
while range_end - range_start > 1:
middle = int((range_end + range_start) / 2)
txn.execute(sql, (middle,))
middle_ts = txn.fetchone()[0]
if ts > middle_ts:
range_start = middle
else:
range_end = middle
return range_end
def _action_has_highlight(actions): def _action_has_highlight(actions):
for action in actions: for action in actions:

View File

@ -321,7 +321,7 @@ class ReceiptsStore(SQLBaseStore):
) )
if receipt_type == "m.read" and topological_ordering: if receipt_type == "m.read" and topological_ordering:
self._remove_push_actions_before_txn( self._remove_old_push_actions_before_txn(
txn, txn,
room_id=room_id, room_id=room_id,
user_id=user_id, user_id=user_id,

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from synapse.storage.appservice import ApplicationServiceStore from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
logger.warning("Could not get app_service_config_files from config") logger.warning("Could not get app_service_config_files from config")
pass pass
appservices = ApplicationServiceStore.load_appservices( appservices = load_appservices(
config.server_name, config_files config.server_name, config_files
) )

View File

@ -30,9 +30,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_scheduler = Mock() self.mock_scheduler = Mock()
hs = Mock() hs = Mock()
hs.get_datastore = Mock(return_value=self.mock_store) hs.get_datastore = Mock(return_value=self.mock_store)
self.handler = ApplicationServicesHandler( hs.get_application_service_api = Mock(return_value=self.mock_as_api)
hs, self.mock_as_api, self.mock_scheduler hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
) self.handler = ApplicationServicesHandler(hs)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_notify_interested_services(self): def test_notify_interested_services(self):

View File

@ -67,6 +67,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
version_string="Synapse/tests", version_string="Synapse/tests",
database_engine=create_engine(config.database_config), database_engine=create_engine(config.database_config),
get_db_conn=db_pool.get_db_conn, get_db_conn=db_pool.get_db_conn,
room_list_handler=object(),
**kargs **kargs
) )
hs.setup() hs.setup()
@ -75,6 +76,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
name, db_pool=None, datastore=datastore, config=config, name, db_pool=None, datastore=datastore, config=config,
version_string="Synapse/tests", version_string="Synapse/tests",
database_engine=create_engine(config.database_config), database_engine=create_engine(config.database_config),
room_list_handler=object(),
**kargs **kargs
) )