Merge pull request #2948 from matrix-org/erikj/kill_as_sync
Remove ability for AS users to call /events and /syncpull/2909/merge
commit
7aed50a038
|
@ -475,12 +475,9 @@ class RoomEventSource(object):
|
||||||
user.to_string()
|
user.to_string()
|
||||||
)
|
)
|
||||||
if app_service:
|
if app_service:
|
||||||
events, end_key = yield self.store.get_appservice_room_stream(
|
# We no longer support AS users using /sync directly.
|
||||||
service=app_service,
|
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||||
from_key=from_key,
|
raise NotImplementedError()
|
||||||
to_key=to_key,
|
|
||||||
limit=limit,
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
room_events = yield self.store.get_membership_changes_for_user(
|
room_events = yield self.store.get_membership_changes_for_user(
|
||||||
user.to_string(), from_key, to_key
|
user.to_string(), from_key, to_key
|
||||||
|
|
|
@ -998,8 +998,9 @@ class SyncHandler(object):
|
||||||
|
|
||||||
app_service = self.store.get_app_service_by_user_id(user_id)
|
app_service = self.store.get_app_service_by_user_id(user_id)
|
||||||
if app_service:
|
if app_service:
|
||||||
rooms = yield self.store.get_app_service_rooms(app_service)
|
# We no longer support AS users using /sync directly.
|
||||||
joined_room_ids = set(r.room_id for r in rooms)
|
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||||
|
raise NotImplementedError()
|
||||||
else:
|
else:
|
||||||
joined_room_ids = yield self.store.get_rooms_for_user(user_id)
|
joined_room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
|
||||||
|
@ -1030,8 +1031,9 @@ class SyncHandler(object):
|
||||||
|
|
||||||
app_service = self.store.get_app_service_by_user_id(user_id)
|
app_service = self.store.get_app_service_by_user_id(user_id)
|
||||||
if app_service:
|
if app_service:
|
||||||
rooms = yield self.store.get_app_service_rooms(app_service)
|
# We no longer support AS users using /sync directly.
|
||||||
joined_room_ids = set(r.room_id for r in rooms)
|
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||||
|
raise NotImplementedError()
|
||||||
else:
|
else:
|
||||||
joined_room_ids = yield self.store.get_rooms_for_user(user_id)
|
joined_room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||||
|
|
||||||
|
|
|
@ -18,11 +18,9 @@ import re
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import Membership
|
|
||||||
from synapse.appservice import AppServiceTransaction
|
from synapse.appservice import AppServiceTransaction
|
||||||
from synapse.config.appservice import load_appservices
|
from synapse.config.appservice import load_appservices
|
||||||
from synapse.storage.events import EventsWorkerStore
|
from synapse.storage.events import EventsWorkerStore
|
||||||
from synapse.storage.roommember import RoomsForUser
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
|
||||||
|
@ -115,81 +113,11 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
|
|
||||||
class ApplicationServiceStore(ApplicationServiceWorkerStore):
|
class ApplicationServiceStore(ApplicationServiceWorkerStore):
|
||||||
|
# This is currently empty due to there not being any AS storage functions
|
||||||
def __init__(self, db_conn, hs):
|
# that can't be run on the workers. Since this may change in future, and
|
||||||
super(ApplicationServiceStore, self).__init__(db_conn, hs)
|
# to keep consistency with the other stores, we keep this empty class for
|
||||||
self.hostname = hs.hostname
|
# now.
|
||||||
|
pass
|
||||||
def get_app_service_rooms(self, service):
|
|
||||||
"""Get a list of RoomsForUser for this application service.
|
|
||||||
|
|
||||||
Application services may be "interested" in lots of rooms depending on
|
|
||||||
the room ID, the room aliases, or the members in the room. This function
|
|
||||||
takes all of these into account and returns a list of RoomsForUser which
|
|
||||||
represent the entire list of room IDs that this application service
|
|
||||||
wants to know about.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
service: The application service to get a room list for.
|
|
||||||
Returns:
|
|
||||||
A list of RoomsForUser.
|
|
||||||
"""
|
|
||||||
return self.runInteraction(
|
|
||||||
"get_app_service_rooms",
|
|
||||||
self._get_app_service_rooms_txn,
|
|
||||||
service,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_app_service_rooms_txn(self, txn, service):
|
|
||||||
# get all rooms matching the room ID regex.
|
|
||||||
room_entries = self._simple_select_list_txn(
|
|
||||||
txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
|
|
||||||
)
|
|
||||||
matching_room_list = set([
|
|
||||||
r["room_id"] for r in room_entries if
|
|
||||||
service.is_interested_in_room(r["room_id"])
|
|
||||||
])
|
|
||||||
|
|
||||||
# resolve room IDs for matching room alias regex.
|
|
||||||
room_alias_mappings = self._simple_select_list_txn(
|
|
||||||
txn=txn, table="room_aliases", keyvalues=None,
|
|
||||||
retcols=["room_id", "room_alias"]
|
|
||||||
)
|
|
||||||
matching_room_list |= set([
|
|
||||||
r["room_id"] for r in room_alias_mappings if
|
|
||||||
service.is_interested_in_alias(r["room_alias"])
|
|
||||||
])
|
|
||||||
|
|
||||||
# get all rooms for every user for this AS. This is scoped to users on
|
|
||||||
# this HS only.
|
|
||||||
user_list = self._simple_select_list_txn(
|
|
||||||
txn=txn, table="users", keyvalues=None, retcols=["name"]
|
|
||||||
)
|
|
||||||
user_list = [
|
|
||||||
u["name"] for u in user_list if
|
|
||||||
service.is_interested_in_user(u["name"])
|
|
||||||
]
|
|
||||||
rooms_for_user_matching_user_id = set() # RoomsForUser list
|
|
||||||
for user_id in user_list:
|
|
||||||
# FIXME: This assumes this store is linked with RoomMemberStore :(
|
|
||||||
rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
|
|
||||||
txn=txn,
|
|
||||||
user_id=user_id,
|
|
||||||
membership_list=[Membership.JOIN]
|
|
||||||
)
|
|
||||||
rooms_for_user_matching_user_id |= set(rooms_for_user)
|
|
||||||
|
|
||||||
# make RoomsForUser tuples for room ids and aliases which are not in the
|
|
||||||
# main rooms_for_user_list - e.g. they are rooms which do not have AS
|
|
||||||
# registered users in it.
|
|
||||||
known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
|
|
||||||
missing_rooms_for_user = [
|
|
||||||
RoomsForUser(r, service.sender, "join") for r in
|
|
||||||
matching_room_list if r not in known_room_ids
|
|
||||||
]
|
|
||||||
rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
|
|
||||||
|
|
||||||
return rooms_for_user_matching_user_id
|
|
||||||
|
|
||||||
|
|
||||||
class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
|
class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
|
||||||
|
|
|
@ -39,7 +39,6 @@ from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.events import EventsWorkerStore
|
from synapse.storage.events import EventsWorkerStore
|
||||||
|
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.api.constants import EventTypes
|
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||||
|
@ -717,81 +716,6 @@ class StreamStore(StreamWorkerStore):
|
||||||
def get_room_min_stream_ordering(self):
|
def get_room_min_stream_ordering(self):
|
||||||
return self._backfill_id_gen.get_current_token()
|
return self._backfill_id_gen.get_current_token()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
|
|
||||||
# NB this lives here instead of appservice.py so we can reuse the
|
|
||||||
# 'private' StreamToken class in this file.
|
|
||||||
if limit:
|
|
||||||
limit = max(limit, MAX_STREAM_SIZE)
|
|
||||||
else:
|
|
||||||
limit = MAX_STREAM_SIZE
|
|
||||||
|
|
||||||
# From and to keys should be integers from ordering.
|
|
||||||
from_id = RoomStreamToken.parse_stream_token(from_key)
|
|
||||||
to_id = RoomStreamToken.parse_stream_token(to_key)
|
|
||||||
|
|
||||||
if from_key == to_key:
|
|
||||||
defer.returnValue(([], to_key))
|
|
||||||
return
|
|
||||||
|
|
||||||
# select all the events between from/to with a sensible limit
|
|
||||||
sql = (
|
|
||||||
"SELECT e.event_id, e.room_id, e.type, s.state_key, "
|
|
||||||
"e.stream_ordering FROM events AS e "
|
|
||||||
"LEFT JOIN state_events as s ON "
|
|
||||||
"e.event_id = s.event_id "
|
|
||||||
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
|
|
||||||
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
|
|
||||||
) % {
|
|
||||||
"limit": limit
|
|
||||||
}
|
|
||||||
|
|
||||||
def f(txn):
|
|
||||||
# pull out all the events between the tokens
|
|
||||||
txn.execute(sql, (from_id.stream, to_id.stream,))
|
|
||||||
rows = self.cursor_to_dict(txn)
|
|
||||||
|
|
||||||
# Logic:
|
|
||||||
# - We want ALL events which match the AS room_id regex
|
|
||||||
# - We want ALL events which match the rooms represented by the AS
|
|
||||||
# room_alias regex
|
|
||||||
# - We want ALL events for rooms that AS users have joined.
|
|
||||||
# This is currently supported via get_app_service_rooms (which is
|
|
||||||
# used for the Notifier listener rooms). We can't reasonably make a
|
|
||||||
# SQL query for these room IDs, so we'll pull all the events between
|
|
||||||
# from/to and filter in python.
|
|
||||||
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
|
|
||||||
room_ids_for_as = [r.room_id for r in rooms_for_as]
|
|
||||||
|
|
||||||
def app_service_interested(row):
|
|
||||||
if row["room_id"] in room_ids_for_as:
|
|
||||||
return True
|
|
||||||
|
|
||||||
if row["type"] == EventTypes.Member:
|
|
||||||
if service.is_interested_in_user(row.get("state_key")):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
return [r for r in rows if app_service_interested(r)]
|
|
||||||
|
|
||||||
rows = yield self.runInteraction("get_appservice_room_stream", f)
|
|
||||||
|
|
||||||
ret = yield self._get_events(
|
|
||||||
[r["event_id"] for r in rows],
|
|
||||||
get_prev_content=True
|
|
||||||
)
|
|
||||||
|
|
||||||
self._set_before_and_after(ret, rows, topo_order=from_id is None)
|
|
||||||
|
|
||||||
if rows:
|
|
||||||
key = "s%d" % max(r["stream_ordering"] for r in rows)
|
|
||||||
else:
|
|
||||||
# Assume we didn't get anything because there was nothing to
|
|
||||||
# get.
|
|
||||||
key = to_key
|
|
||||||
|
|
||||||
defer.returnValue((ret, key))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def paginate_room_events(self, room_id, from_key, to_key=None,
|
def paginate_room_events(self, room_id, from_key, to_key=None,
|
||||||
direction='b', limit=-1, event_filter=None):
|
direction='b', limit=-1, event_filter=None):
|
||||||
|
|
Loading…
Reference in New Issue