Cache tags and account data

pull/536/head
Erik Johnston 2016-01-28 16:37:41 +00:00
parent c23a8c7833
commit 00cb3eb24b
7 changed files with 137 additions and 102 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from twisted.internet import defer
import ujson as json
@ -23,6 +24,13 @@ logger = logging.getLogger(__name__)
class AccountDataStore(SQLBaseStore):
def __init__(self, hs):
super(AccountDataStore, self).__init__(hs)
self._account_data_stream_cache = StreamChangeCache(
"AccountDataChangeCache", self._account_data_id_gen.get_max_token(None),
max_size=1000,
)
def get_account_data_for_user(self, user_id):
"""Get all the client account_data for a user.
@ -83,7 +91,7 @@ class AccountDataStore(SQLBaseStore):
"get_account_data_for_room", get_account_data_for_room_txn
)
def get_updated_account_data_for_user(self, user_id, stream_id):
def get_updated_account_data_for_user(self, user_id, stream_id, room_ids=None):
"""Get all the client account_data for a that's changed.
Args:
@ -120,6 +128,12 @@ class AccountDataStore(SQLBaseStore):
return (global_account_data, account_data_by_room)
changed = self._account_data_stream_cache.get_entity_has_changed(
user_id, int(stream_id)
)
if not changed:
defer.returnValue(({}, {}))
return self.runInteraction(
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn
)
@ -186,6 +200,10 @@ class AccountDataStore(SQLBaseStore):
"content": content_json,
}
)
txn.call_after(
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
self._update_max_stream_id(txn, next_id)
with (yield self._account_data_id_gen.get_next(self)) as next_id:

View File

@ -212,7 +212,7 @@ class EventsStore(SQLBaseStore):
if not backfilled:
txn.call_after(
self._events_stream_cache.room_has_changed,
self._events_stream_cache.entity_has_changed,
event.room_id, event.internal_metadata.stream_ordering,
)

View File

@ -15,7 +15,7 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
from synapse.util.caches.room_change_cache import RoomStreamChangeCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from twisted.internet import defer
@ -30,7 +30,7 @@ class ReceiptsStore(SQLBaseStore):
def __init__(self, hs):
super(ReceiptsStore, self).__init__(hs)
self._receipts_stream_cache = RoomStreamChangeCache(
self._receipts_stream_cache = StreamChangeCache(
"ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
)
@ -77,7 +77,7 @@ class ReceiptsStore(SQLBaseStore):
room_ids = set(room_ids)
if from_key:
room_ids = yield self._receipts_stream_cache.get_rooms_changed(
room_ids = yield self._receipts_stream_cache.get_entities_changed(
room_ids, from_key
)
@ -222,7 +222,7 @@ class ReceiptsStore(SQLBaseStore):
txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
txn.call_after(
self._receipts_stream_cache.room_has_changed,
self._receipts_stream_cache.entity_has_changed,
room_id, stream_id
)

View File

@ -37,7 +37,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.room_change_cache import RoomStreamChangeCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
@ -81,7 +81,7 @@ class StreamStore(SQLBaseStore):
def __init__(self, hs):
super(StreamStore, self).__init__(hs)
self._events_stream_cache = RoomStreamChangeCache(
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
)
@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore):
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
from_id = RoomStreamToken.parse_stream_token(from_key).stream
room_ids = yield self._events_stream_cache.get_rooms_changed(
room_ids = yield self._events_stream_cache.get_entities_changed(
room_ids, from_id
)
@ -200,7 +200,7 @@ class StreamStore(SQLBaseStore):
defer.returnValue(([], from_key))
if from_id:
has_changed = yield self._events_stream_cache.get_room_has_changed(
has_changed = yield self._events_stream_cache.get_entity_has_changed(
room_id, from_id
)

View File

@ -15,6 +15,7 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
from twisted.internet import defer
import ujson as json
@ -24,6 +25,13 @@ logger = logging.getLogger(__name__)
class TagsStore(SQLBaseStore):
def __init__(self, hs):
super(TagsStore, self).__init__(hs)
self._tags_stream_cache = StreamChangeCache(
"TagsChangeCache", self._account_data_id_gen.get_max_token(None),
max_size=1000,
)
def get_max_account_data_stream_id(self):
"""Get the current max stream id for the private user data stream
@ -80,6 +88,10 @@ class TagsStore(SQLBaseStore):
room_ids = [row[0] for row in txn.fetchall()]
return room_ids
changed = self._tags_stream_cache.get_entity_has_changed(user_id, int(stream_id))
if not changed:
defer.returnValue({})
room_ids = yield self.runInteraction(
"get_updated_tags", get_updated_tags_txn
)
@ -177,6 +189,8 @@ class TagsStore(SQLBaseStore):
next_id(int): The the revision to advance to.
"""
txn.call_after(self._tags_stream_cache.entity_has_changed, user_id, next_id)
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"

View File

@ -1,92 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name
from blist import sorteddict
import logging
logger = logging.getLogger(__name__)
class RoomStreamChangeCache(object):
"""Keeps track of the stream_id of the latest change in rooms.
Given a list of rooms and stream key, it will give a subset of rooms that
may have changed since that key. If the key is too old then the cache
will simply return all rooms.
"""
def __init__(self, name, current_key, size_of_cache=10000):
self._size_of_cache = size_of_cache
self._room_to_key = {}
self._cache = sorteddict()
self._earliest_known_key = current_key
self.name = name
caches_by_name[self.name] = self._cache
def get_room_has_changed(self, room_id, key):
assert type(key) is int
if key <= self._earliest_known_key:
return True
room_key = self._room_to_key.get(room_id, None)
if room_key is None:
return True
if key < room_key:
return True
return False
def get_rooms_changed(self, room_ids, key):
"""Returns subset of room ids that have had new things since the
given key. If the key is too old it will just return the given list.
"""
assert type(key) is int
if key > self._earliest_known_key:
keys = self._cache.keys()
i = keys.bisect_right(key)
result = set(
self._cache[k] for k in keys[i:]
).intersection(room_ids)
cache_counter.inc_hits(self.name)
else:
result = room_ids
cache_counter.inc_misses(self.name)
return result
def room_has_changed(self, room_id, key):
"""Informs the cache that the room has been changed at the given key.
"""
assert type(key) is int
if key > self._earliest_known_key:
old_key = self._room_to_key.get(room_id, None)
if old_key:
key = max(key, old_key)
self._cache.pop(old_key, None)
self._cache[key] = room_id
while len(self._cache) > self._size_of_cache:
k, r = self._cache.popitem()
self._earliest_key = max(k, self._earliest_key)
self._room_to_key.pop(r, None)

View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name
from blist import sorteddict
import logging
logger = logging.getLogger(__name__)
class StreamChangeCache(object):
"""Keeps track of the stream positions of the latest change in a set of entities.
Typically the entity will be a room or user id.
Given a list of entities and a stream position, it will give a subset of
entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities.
"""
def __init__(self, name, current_stream_pos, max_size=10000):
self._max_size = max_size
self._entity_to_key = {}
self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
caches_by_name[self.name] = self._cache
def get_entity_has_changed(self, entity, stream_pos):
assert type(stream_pos) is int
if stream_pos <= self._earliest_known_stream_pos:
return True
latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None:
return True
if stream_pos < latest_entity_change_pos:
return True
return False
def get_entities_changed(self, entities, stream_pos):
"""Returns subset of entities that have had new things since the
given position. If the position is too old it will just return the given list.
"""
assert type(stream_pos) is int
if stream_pos > self._earliest_known_stream_pos:
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
result = set(
self._cache[k] for k in keys[i:]
).intersection(entities)
cache_counter.inc_hits(self.name)
else:
result = entities
cache_counter.inc_misses(self.name)
return result
def entity_has_changed(self, entitiy, stream_pos):
"""Informs the cache that the entitiy has been changed at the given
position.
"""
assert type(stream_pos) is int
if stream_pos > self._earliest_known_stream_pos:
old_pos = self._entity_to_key.get(entitiy, None)
if old_pos:
stream_pos = max(stream_pos, old_pos)
self._cache.pop(old_pos, None)
self._cache[stream_pos] = entitiy
while len(self._cache) > self._max_size:
k, r = self._cache.popitem()
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
self._entity_to_key.pop(r, None)