Merge pull request #906 from matrix-org/markjh/faster_events_around
Use a query that postgresql optimises better for get_events_aroundpull/910/head
commit
04dee11e97
|
@ -16,6 +16,8 @@
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||||
|
from synapse.types import RoomStreamToken
|
||||||
|
from .stream import lower_bound
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import ujson as json
|
import ujson as json
|
||||||
|
@ -73,6 +75,9 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
|
|
||||||
stream_ordering = results[0][0]
|
stream_ordering = results[0][0]
|
||||||
topological_ordering = results[0][1]
|
topological_ordering = results[0][1]
|
||||||
|
token = RoomStreamToken(
|
||||||
|
topological_ordering, stream_ordering
|
||||||
|
)
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT sum(notif), sum(highlight)"
|
"SELECT sum(notif), sum(highlight)"
|
||||||
|
@ -80,15 +85,10 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
" WHERE"
|
" WHERE"
|
||||||
" user_id = ?"
|
" user_id = ?"
|
||||||
" AND room_id = ?"
|
" AND room_id = ?"
|
||||||
" AND ("
|
" AND %s"
|
||||||
" topological_ordering > ?"
|
) % (lower_bound(token, self.database_engine, inclusive=False),)
|
||||||
" OR (topological_ordering = ? AND stream_ordering > ?)"
|
|
||||||
")"
|
txn.execute(sql, (user_id, room_id))
|
||||||
)
|
|
||||||
txn.execute(sql, (
|
|
||||||
user_id, room_id,
|
|
||||||
topological_ordering, topological_ordering, stream_ordering
|
|
||||||
))
|
|
||||||
row = txn.fetchone()
|
row = txn.fetchone()
|
||||||
if row:
|
if row:
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -40,6 +40,7 @@ from synapse.util.caches.descriptors import cached
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import preserve_fn
|
||||||
|
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
@ -54,25 +55,43 @@ _STREAM_TOKEN = "stream"
|
||||||
_TOPOLOGICAL_TOKEN = "topological"
|
_TOPOLOGICAL_TOKEN = "topological"
|
||||||
|
|
||||||
|
|
||||||
def lower_bound(token):
|
def lower_bound(token, engine, inclusive=False):
|
||||||
|
inclusive = "=" if inclusive else ""
|
||||||
if token.topological is None:
|
if token.topological is None:
|
||||||
return "(%d < %s)" % (token.stream, "stream_ordering")
|
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
|
||||||
else:
|
else:
|
||||||
return "(%d < %s OR (%d = %s AND %d < %s))" % (
|
if isinstance(engine, PostgresEngine):
|
||||||
|
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
|
||||||
|
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
|
||||||
|
# use the later form when running against postgres.
|
||||||
|
return "((%d,%d) <%s (%s,%s))" % (
|
||||||
|
token.topological, token.stream, inclusive,
|
||||||
|
"topological_ordering", "stream_ordering",
|
||||||
|
)
|
||||||
|
return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.stream, "stream_ordering",
|
token.stream, inclusive, "stream_ordering",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def upper_bound(token):
|
def upper_bound(token, engine, inclusive=True):
|
||||||
|
inclusive = "=" if inclusive else ""
|
||||||
if token.topological is None:
|
if token.topological is None:
|
||||||
return "(%d >= %s)" % (token.stream, "stream_ordering")
|
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
|
||||||
else:
|
else:
|
||||||
return "(%d > %s OR (%d = %s AND %d >= %s))" % (
|
if isinstance(engine, PostgresEngine):
|
||||||
|
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
|
||||||
|
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
|
||||||
|
# use the later form when running against postgres.
|
||||||
|
return "((%d,%d) >%s (%s,%s))" % (
|
||||||
|
token.topological, token.stream, inclusive,
|
||||||
|
"topological_ordering", "stream_ordering",
|
||||||
|
)
|
||||||
|
return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.topological, "topological_ordering",
|
token.topological, "topological_ordering",
|
||||||
token.stream, "stream_ordering",
|
token.stream, inclusive, "stream_ordering",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -308,18 +327,22 @@ class StreamStore(SQLBaseStore):
|
||||||
args = [False, room_id]
|
args = [False, room_id]
|
||||||
if direction == 'b':
|
if direction == 'b':
|
||||||
order = "DESC"
|
order = "DESC"
|
||||||
bounds = upper_bound(RoomStreamToken.parse(from_key))
|
bounds = upper_bound(
|
||||||
|
RoomStreamToken.parse(from_key), self.database_engine
|
||||||
|
)
|
||||||
if to_key:
|
if to_key:
|
||||||
bounds = "%s AND %s" % (
|
bounds = "%s AND %s" % (bounds, lower_bound(
|
||||||
bounds, lower_bound(RoomStreamToken.parse(to_key))
|
RoomStreamToken.parse(to_key), self.database_engine
|
||||||
)
|
))
|
||||||
else:
|
else:
|
||||||
order = "ASC"
|
order = "ASC"
|
||||||
bounds = lower_bound(RoomStreamToken.parse(from_key))
|
bounds = lower_bound(
|
||||||
|
RoomStreamToken.parse(from_key), self.database_engine
|
||||||
|
)
|
||||||
if to_key:
|
if to_key:
|
||||||
bounds = "%s AND %s" % (
|
bounds = "%s AND %s" % (bounds, upper_bound(
|
||||||
bounds, upper_bound(RoomStreamToken.parse(to_key))
|
RoomStreamToken.parse(to_key), self.database_engine
|
||||||
)
|
))
|
||||||
|
|
||||||
if int(limit) > 0:
|
if int(limit) > 0:
|
||||||
args.append(int(limit))
|
args.append(int(limit))
|
||||||
|
@ -586,32 +609,60 @@ class StreamStore(SQLBaseStore):
|
||||||
retcols=["stream_ordering", "topological_ordering"],
|
retcols=["stream_ordering", "topological_ordering"],
|
||||||
)
|
)
|
||||||
|
|
||||||
stream_ordering = results["stream_ordering"]
|
token = RoomStreamToken(
|
||||||
topological_ordering = results["topological_ordering"]
|
results["topological_ordering"],
|
||||||
|
results["stream_ordering"],
|
||||||
query_before = (
|
|
||||||
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" WHERE room_id = ? AND (topological_ordering < ?"
|
|
||||||
" OR (topological_ordering = ? AND stream_ordering < ?))"
|
|
||||||
" ORDER BY topological_ordering DESC, stream_ordering DESC"
|
|
||||||
" LIMIT ?"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
query_after = (
|
if isinstance(self.database_engine, Sqlite3Engine):
|
||||||
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
# SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
|
||||||
" WHERE room_id = ? AND (topological_ordering > ?"
|
# So we give pass it to SQLite3 as the UNION ALL of the two queries.
|
||||||
" OR (topological_ordering = ? AND stream_ordering > ?))"
|
|
||||||
" ORDER BY topological_ordering ASC, stream_ordering ASC"
|
|
||||||
" LIMIT ?"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(
|
query_before = (
|
||||||
query_before,
|
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
||||||
(
|
" WHERE room_id = ? AND topological_ordering < ?"
|
||||||
room_id, topological_ordering, topological_ordering,
|
" UNION ALL"
|
||||||
stream_ordering, before_limit,
|
" SELECT topological_ordering, stream_ordering, event_id FROM events"
|
||||||
|
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
|
||||||
|
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
|
||||||
)
|
)
|
||||||
)
|
before_args = (
|
||||||
|
room_id, token.topological,
|
||||||
|
room_id, token.topological, token.stream,
|
||||||
|
before_limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
query_after = (
|
||||||
|
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
||||||
|
" WHERE room_id = ? AND topological_ordering > ?"
|
||||||
|
" UNION ALL"
|
||||||
|
" SELECT topological_ordering, stream_ordering, event_id FROM events"
|
||||||
|
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
|
||||||
|
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
|
||||||
|
)
|
||||||
|
after_args = (
|
||||||
|
room_id, token.topological,
|
||||||
|
room_id, token.topological, token.stream,
|
||||||
|
after_limit,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
query_before = (
|
||||||
|
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
||||||
|
" WHERE room_id = ? AND %s"
|
||||||
|
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
|
||||||
|
) % (upper_bound(token, self.database_engine, inclusive=False),)
|
||||||
|
|
||||||
|
before_args = (room_id, before_limit)
|
||||||
|
|
||||||
|
query_after = (
|
||||||
|
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
||||||
|
" WHERE room_id = ? AND %s"
|
||||||
|
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
|
||||||
|
) % (lower_bound(token, self.database_engine, inclusive=False),)
|
||||||
|
|
||||||
|
after_args = (room_id, after_limit)
|
||||||
|
|
||||||
|
txn.execute(query_before, before_args)
|
||||||
|
|
||||||
rows = self.cursor_to_dict(txn)
|
rows = self.cursor_to_dict(txn)
|
||||||
events_before = [r["event_id"] for r in rows]
|
events_before = [r["event_id"] for r in rows]
|
||||||
|
@ -623,17 +674,11 @@ class StreamStore(SQLBaseStore):
|
||||||
))
|
))
|
||||||
else:
|
else:
|
||||||
start_token = str(RoomStreamToken(
|
start_token = str(RoomStreamToken(
|
||||||
topological_ordering,
|
token.topological,
|
||||||
stream_ordering - 1,
|
token.stream - 1,
|
||||||
))
|
))
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(query_after, after_args)
|
||||||
query_after,
|
|
||||||
(
|
|
||||||
room_id, topological_ordering, topological_ordering,
|
|
||||||
stream_ordering, after_limit,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
rows = self.cursor_to_dict(txn)
|
rows = self.cursor_to_dict(txn)
|
||||||
events_after = [r["event_id"] for r in rows]
|
events_after = [r["event_id"] for r in rows]
|
||||||
|
@ -644,10 +689,7 @@ class StreamStore(SQLBaseStore):
|
||||||
rows[-1]["stream_ordering"],
|
rows[-1]["stream_ordering"],
|
||||||
))
|
))
|
||||||
else:
|
else:
|
||||||
end_token = str(RoomStreamToken(
|
end_token = str(token)
|
||||||
topological_ordering,
|
|
||||||
stream_ordering,
|
|
||||||
))
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"before": {
|
"before": {
|
||||||
|
|
Loading…
Reference in New Issue