Move fetching of events into their own transactions
parent
cdb3757942
commit
f6f902d459
|
@ -13,6 +13,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from ._base import SQLBaseStore, cached
|
||||
from syutil.base64util import encode_base64
|
||||
|
||||
|
@ -33,16 +35,7 @@ class EventFederationStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
def get_auth_chain(self, event_ids):
|
||||
return self.runInteraction(
|
||||
"get_auth_chain",
|
||||
self._get_auth_chain_txn,
|
||||
event_ids
|
||||
)
|
||||
|
||||
def _get_auth_chain_txn(self, txn, event_ids):
|
||||
results = self._get_auth_chain_ids_txn(txn, event_ids)
|
||||
|
||||
return self._get_events_txn(txn, results)
|
||||
return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
|
||||
|
||||
def get_auth_chain_ids(self, event_ids):
|
||||
return self.runInteraction(
|
||||
|
@ -369,7 +362,7 @@ class EventFederationStore(SQLBaseStore):
|
|||
return self.runInteraction(
|
||||
"get_backfill_events",
|
||||
self._get_backfill_events, room_id, event_list, limit
|
||||
)
|
||||
).addCallback(self._get_events)
|
||||
|
||||
def _get_backfill_events(self, txn, room_id, event_list, limit):
|
||||
logger.debug(
|
||||
|
@ -415,16 +408,26 @@ class EventFederationStore(SQLBaseStore):
|
|||
front = new_front
|
||||
event_results += new_front
|
||||
|
||||
return self._get_events_txn(txn, event_results)
|
||||
return event_results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_missing_events(self, room_id, earliest_events, latest_events,
|
||||
limit, min_depth):
|
||||
return self.runInteraction(
|
||||
ids = yield self.runInteraction(
|
||||
"get_missing_events",
|
||||
self._get_missing_events,
|
||||
room_id, earliest_events, latest_events, limit, min_depth
|
||||
)
|
||||
|
||||
events = yield self._get_events(ids)
|
||||
|
||||
events = sorted(
|
||||
[ev for ev in events if ev.depth >= min_depth],
|
||||
key=lambda e: e.depth,
|
||||
)
|
||||
|
||||
defer.returnValue(events[:limit])
|
||||
|
||||
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
|
||||
limit, min_depth):
|
||||
|
||||
|
@ -456,14 +459,7 @@ class EventFederationStore(SQLBaseStore):
|
|||
front = new_front
|
||||
event_results |= new_front
|
||||
|
||||
events = self._get_events_txn(txn, event_results)
|
||||
|
||||
events = sorted(
|
||||
[ev for ev in events if ev.depth >= min_depth],
|
||||
key=lambda e: e.depth,
|
||||
)
|
||||
|
||||
return events[:limit]
|
||||
return event_results
|
||||
|
||||
def clean_room_for_join(self, room_id):
|
||||
return self.runInteraction(
|
||||
|
|
|
@ -76,17 +76,17 @@ class RoomMemberStore(SQLBaseStore):
|
|||
Returns:
|
||||
Deferred: Results in a MembershipEvent or None.
|
||||
"""
|
||||
def f(txn):
|
||||
events = self._get_members_events_txn(
|
||||
txn,
|
||||
return self.runInteraction(
|
||||
"get_room_member",
|
||||
self._get_members_events_txn,
|
||||
room_id,
|
||||
user_id=user_id,
|
||||
).addCallback(
|
||||
self._get_events
|
||||
).addCallback(
|
||||
lambda events: events[0] if events else None
|
||||
)
|
||||
|
||||
return events[0] if events else None
|
||||
|
||||
return self.runInteraction("get_room_member", f)
|
||||
|
||||
def get_users_in_room(self, room_id):
|
||||
def f(txn):
|
||||
|
||||
|
@ -110,15 +110,12 @@ class RoomMemberStore(SQLBaseStore):
|
|||
Returns:
|
||||
list of namedtuples representing the members in this room.
|
||||
"""
|
||||
|
||||
def f(txn):
|
||||
return self._get_members_events_txn(
|
||||
txn,
|
||||
return self.runInteraction(
|
||||
"get_room_members",
|
||||
self._get_members_events_txn,
|
||||
room_id,
|
||||
membership=membership,
|
||||
)
|
||||
|
||||
return self.runInteraction("get_room_members", f)
|
||||
).addCallback(self._get_events)
|
||||
|
||||
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
|
||||
""" Get all the rooms for this user where the membership for this user
|
||||
|
@ -190,14 +187,14 @@ class RoomMemberStore(SQLBaseStore):
|
|||
return self.runInteraction(
|
||||
"get_members_query", self._get_members_events_txn,
|
||||
where_clause, where_values
|
||||
)
|
||||
).addCallbacks(self._get_events)
|
||||
|
||||
def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
|
||||
rows = self._get_members_rows_txn(
|
||||
txn,
|
||||
room_id, membership, user_id,
|
||||
)
|
||||
return self._get_events_txn(txn, [r["event_id"] for r in rows])
|
||||
return [r["event_id"] for r in rows]
|
||||
|
||||
def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
|
||||
where_clause = "c.room_id = ?"
|
||||
|
|
|
@ -72,8 +72,6 @@ class StateStore(SQLBaseStore):
|
|||
retcol="event_id",
|
||||
)
|
||||
|
||||
# state = self._get_events_txn(txn, state_ids)
|
||||
|
||||
res[group] = state_ids
|
||||
|
||||
return res
|
||||
|
|
|
@ -224,7 +224,7 @@ class StreamStore(SQLBaseStore):
|
|||
|
||||
return self.runInteraction("get_room_events_stream", f)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def paginate_room_events(self, room_id, from_key, to_key=None,
|
||||
direction='b', limit=-1,
|
||||
with_feedback=False):
|
||||
|
@ -286,17 +286,18 @@ class StreamStore(SQLBaseStore):
|
|||
# TODO (erikj): We should work out what to do here instead.
|
||||
next_token = to_key if to_key else from_key
|
||||
|
||||
events = self._get_events_txn(
|
||||
txn,
|
||||
return rows, next_token,
|
||||
|
||||
rows, token = yield self.runInteraction("paginate_room_events", f)
|
||||
|
||||
events = yield self._get_events(
|
||||
[r["event_id"] for r in rows],
|
||||
get_prev_content=True
|
||||
)
|
||||
|
||||
self._set_before_and_after(events, rows)
|
||||
|
||||
return events, next_token,
|
||||
|
||||
return self.runInteraction("paginate_room_events", f)
|
||||
defer.returnValue((events, token))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_recent_events_for_room(self, room_id, limit, end_token,
|
||||
|
|
Loading…
Reference in New Issue