Instrument `state` and `state_group` storage related things (tracing) (#15610)
Instrument `state` and `state_group` storage related things (tracing) so it's a little more clear where these database transactions are coming from as there is a lot of wires crossing in these functions. Part of `/messages` performance investigation: https://github.com/matrix-org/synapse/issues/13356pull/15646/head
parent
ca3c07e833
commit
703a8f9c67
|
@ -0,0 +1 @@
|
||||||
|
Instrument `state` and `state_group` storage-related operations to better picture what's happening when tracing.
|
|
@ -19,6 +19,7 @@ from immutabledict import immutabledict
|
||||||
|
|
||||||
from synapse.appservice import ApplicationService
|
from synapse.appservice import ApplicationService
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
|
from synapse.logging.opentracing import tag_args, trace
|
||||||
from synapse.types import JsonDict, StateMap
|
from synapse.types import JsonDict, StateMap
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -242,6 +243,8 @@ class EventContext(UnpersistedEventContextBase):
|
||||||
|
|
||||||
return self._state_group
|
return self._state_group
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_current_state_ids(
|
async def get_current_state_ids(
|
||||||
self, state_filter: Optional["StateFilter"] = None
|
self, state_filter: Optional["StateFilter"] = None
|
||||||
) -> Optional[StateMap[str]]:
|
) -> Optional[StateMap[str]]:
|
||||||
|
@ -275,6 +278,8 @@ class EventContext(UnpersistedEventContextBase):
|
||||||
|
|
||||||
return prev_state_ids
|
return prev_state_ids
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_prev_state_ids(
|
async def get_prev_state_ids(
|
||||||
self, state_filter: Optional["StateFilter"] = None
|
self, state_filter: Optional["StateFilter"] = None
|
||||||
) -> StateMap[str]:
|
) -> StateMap[str]:
|
||||||
|
|
|
@ -45,6 +45,7 @@ from synapse.events.snapshot import (
|
||||||
UnpersistedEventContextBase,
|
UnpersistedEventContextBase,
|
||||||
)
|
)
|
||||||
from synapse.logging.context import ContextResourceUsage
|
from synapse.logging.context import ContextResourceUsage
|
||||||
|
from synapse.logging.opentracing import tag_args, trace
|
||||||
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
|
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
|
||||||
from synapse.state import v1, v2
|
from synapse.state import v1, v2
|
||||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||||
|
@ -270,6 +271,8 @@ class StateHandler:
|
||||||
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
|
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
|
||||||
return await self.store.get_joined_hosts(room_id, state, entry)
|
return await self.store.get_joined_hosts(room_id, state, entry)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def calculate_context_info(
|
async def calculate_context_info(
|
||||||
self,
|
self,
|
||||||
event: EventBase,
|
event: EventBase,
|
||||||
|
@ -465,6 +468,7 @@ class StateHandler:
|
||||||
|
|
||||||
return await unpersisted_context.persist(event)
|
return await unpersisted_context.persist(event)
|
||||||
|
|
||||||
|
@trace
|
||||||
@measure_func()
|
@measure_func()
|
||||||
async def resolve_state_groups_for_events(
|
async def resolve_state_groups_for_events(
|
||||||
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
|
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
|
||||||
|
|
|
@ -67,6 +67,8 @@ class StateStorageController:
|
||||||
"""
|
"""
|
||||||
self._partial_state_room_tracker.notify_un_partial_stated(room_id)
|
self._partial_state_room_tracker.notify_un_partial_stated(room_id)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_group_delta(
|
async def get_state_group_delta(
|
||||||
self, state_group: int
|
self, state_group: int
|
||||||
) -> Tuple[Optional[int], Optional[StateMap[str]]]:
|
) -> Tuple[Optional[int], Optional[StateMap[str]]]:
|
||||||
|
@ -84,6 +86,8 @@ class StateStorageController:
|
||||||
state_group_delta = await self.stores.state.get_state_group_delta(state_group)
|
state_group_delta = await self.stores.state.get_state_group_delta(state_group)
|
||||||
return state_group_delta.prev_group, state_group_delta.delta_ids
|
return state_group_delta.prev_group, state_group_delta.delta_ids
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_groups_ids(
|
async def get_state_groups_ids(
|
||||||
self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
|
self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
|
||||||
) -> Dict[int, MutableStateMap[str]]:
|
) -> Dict[int, MutableStateMap[str]]:
|
||||||
|
@ -114,6 +118,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return group_to_state
|
return group_to_state
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_ids_for_group(
|
async def get_state_ids_for_group(
|
||||||
self, state_group: int, state_filter: Optional[StateFilter] = None
|
self, state_group: int, state_filter: Optional[StateFilter] = None
|
||||||
) -> StateMap[str]:
|
) -> StateMap[str]:
|
||||||
|
@ -130,6 +136,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return group_to_state[state_group]
|
return group_to_state[state_group]
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_groups(
|
async def get_state_groups(
|
||||||
self, room_id: str, event_ids: Collection[str]
|
self, room_id: str, event_ids: Collection[str]
|
||||||
) -> Dict[int, List[EventBase]]:
|
) -> Dict[int, List[EventBase]]:
|
||||||
|
@ -165,6 +173,8 @@ class StateStorageController:
|
||||||
for group, event_id_map in group_to_ids.items()
|
for group, event_id_map in group_to_ids.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
def _get_state_groups_from_groups(
|
def _get_state_groups_from_groups(
|
||||||
self, groups: List[int], state_filter: StateFilter
|
self, groups: List[int], state_filter: StateFilter
|
||||||
) -> Awaitable[Dict[int, StateMap[str]]]:
|
) -> Awaitable[Dict[int, StateMap[str]]]:
|
||||||
|
@ -183,6 +193,7 @@ class StateStorageController:
|
||||||
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
|
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_for_events(
|
async def get_state_for_events(
|
||||||
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
|
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
|
||||||
) -> Dict[str, StateMap[EventBase]]:
|
) -> Dict[str, StateMap[EventBase]]:
|
||||||
|
@ -280,6 +291,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return {event: event_to_state[event] for event in event_ids}
|
return {event: event_to_state[event] for event in event_ids}
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_for_event(
|
async def get_state_for_event(
|
||||||
self, event_id: str, state_filter: Optional[StateFilter] = None
|
self, event_id: str, state_filter: Optional[StateFilter] = None
|
||||||
) -> StateMap[EventBase]:
|
) -> StateMap[EventBase]:
|
||||||
|
@ -303,6 +316,7 @@ class StateStorageController:
|
||||||
return state_map[event_id]
|
return state_map[event_id]
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_state_ids_for_event(
|
async def get_state_ids_for_event(
|
||||||
self,
|
self,
|
||||||
event_id: str,
|
event_id: str,
|
||||||
|
@ -333,6 +347,8 @@ class StateStorageController:
|
||||||
)
|
)
|
||||||
return state_map[event_id]
|
return state_map[event_id]
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
def get_state_for_groups(
|
def get_state_for_groups(
|
||||||
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
|
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
|
||||||
) -> Awaitable[Dict[int, MutableStateMap[str]]]:
|
) -> Awaitable[Dict[int, MutableStateMap[str]]]:
|
||||||
|
@ -402,6 +418,8 @@ class StateStorageController:
|
||||||
event_id, room_id, prev_group, delta_ids, current_state_ids
|
event_id, room_id, prev_group, delta_ids, current_state_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
@cancellable
|
@cancellable
|
||||||
async def get_current_state_ids(
|
async def get_current_state_ids(
|
||||||
self,
|
self,
|
||||||
|
@ -442,6 +460,8 @@ class StateStorageController:
|
||||||
room_id, on_invalidate=on_invalidate
|
room_id, on_invalidate=on_invalidate
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:
|
async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:
|
||||||
"""Get canonical alias for room, if any
|
"""Get canonical alias for room, if any
|
||||||
|
|
||||||
|
@ -466,6 +486,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return event.content.get("canonical_alias")
|
return event.content.get("canonical_alias")
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_current_state_deltas(
|
async def get_current_state_deltas(
|
||||||
self, prev_stream_id: int, max_stream_id: int
|
self, prev_stream_id: int, max_stream_id: int
|
||||||
) -> Tuple[int, List[Dict[str, Any]]]:
|
) -> Tuple[int, List[Dict[str, Any]]]:
|
||||||
|
@ -500,6 +522,7 @@ class StateStorageController:
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_current_state(
|
async def get_current_state(
|
||||||
self, room_id: str, state_filter: Optional[StateFilter] = None
|
self, room_id: str, state_filter: Optional[StateFilter] = None
|
||||||
) -> StateMap[EventBase]:
|
) -> StateMap[EventBase]:
|
||||||
|
@ -516,6 +539,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return state_map
|
return state_map
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_current_state_event(
|
async def get_current_state_event(
|
||||||
self, room_id: str, event_type: str, state_key: str
|
self, room_id: str, event_type: str, state_key: str
|
||||||
) -> Optional[EventBase]:
|
) -> Optional[EventBase]:
|
||||||
|
@ -527,6 +552,8 @@ class StateStorageController:
|
||||||
)
|
)
|
||||||
return state_map.get(key)
|
return state_map.get(key)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]:
|
async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]:
|
||||||
"""Get current hosts in room based on current state.
|
"""Get current hosts in room based on current state.
|
||||||
|
|
||||||
|
@ -538,6 +565,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return await self.stores.main.get_current_hosts_in_room(room_id)
|
return await self.stores.main.get_current_hosts_in_room(room_id)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:
|
async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:
|
||||||
"""Get current hosts in room based on current state.
|
"""Get current hosts in room based on current state.
|
||||||
|
|
||||||
|
@ -553,6 +582,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return await self.stores.main.get_current_hosts_in_room_ordered(room_id)
|
return await self.stores.main.get_current_hosts_in_room_ordered(room_id)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_current_hosts_in_room_or_partial_state_approximation(
|
async def get_current_hosts_in_room_or_partial_state_approximation(
|
||||||
self, room_id: str
|
self, room_id: str
|
||||||
) -> Collection[str]:
|
) -> Collection[str]:
|
||||||
|
@ -582,6 +613,8 @@ class StateStorageController:
|
||||||
|
|
||||||
return hosts
|
return hosts
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_users_in_room_with_profiles(
|
async def get_users_in_room_with_profiles(
|
||||||
self, room_id: str
|
self, room_id: str
|
||||||
) -> Mapping[str, ProfileInfo]:
|
) -> Mapping[str, ProfileInfo]:
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
|
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
|
||||||
|
|
||||||
|
from synapse.logging.opentracing import tag_args, trace
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import (
|
from synapse.storage.database import (
|
||||||
DatabasePool,
|
DatabasePool,
|
||||||
|
@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||||
updates.
|
updates.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
def _count_state_group_hops_txn(
|
def _count_state_group_hops_txn(
|
||||||
self, txn: LoggingTransaction, state_group: int
|
self, txn: LoggingTransaction, state_group: int
|
||||||
) -> int:
|
) -> int:
|
||||||
|
@ -83,6 +86,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||||
|
|
||||||
return count
|
return count
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
def _get_state_groups_from_groups_txn(
|
def _get_state_groups_from_groups_txn(
|
||||||
self,
|
self,
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
|
|
|
@ -20,6 +20,7 @@ import attr
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
|
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
|
||||||
|
from synapse.logging.opentracing import tag_args, trace
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import (
|
from synapse.storage.database import (
|
||||||
DatabasePool,
|
DatabasePool,
|
||||||
|
@ -159,6 +160,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
"get_state_group_delta", _get_state_group_delta_txn
|
"get_state_group_delta", _get_state_group_delta_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
@cancellable
|
@cancellable
|
||||||
async def _get_state_groups_from_groups(
|
async def _get_state_groups_from_groups(
|
||||||
self, groups: List[int], state_filter: StateFilter
|
self, groups: List[int], state_filter: StateFilter
|
||||||
|
@ -187,6 +190,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
def _get_state_for_group_using_cache(
|
def _get_state_for_group_using_cache(
|
||||||
self,
|
self,
|
||||||
cache: DictionaryCache[int, StateKey, str],
|
cache: DictionaryCache[int, StateKey, str],
|
||||||
|
@ -239,6 +244,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
|
|
||||||
return state_filter.filter_state(state_dict_ids), not missing_types
|
return state_filter.filter_state(state_dict_ids), not missing_types
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
@cancellable
|
@cancellable
|
||||||
async def _get_state_for_groups(
|
async def _get_state_for_groups(
|
||||||
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
|
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
|
||||||
|
@ -305,6 +312,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
|
|
||||||
return state
|
return state
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
def _get_state_for_groups_using_cache(
|
def _get_state_for_groups_using_cache(
|
||||||
self,
|
self,
|
||||||
groups: Iterable[int],
|
groups: Iterable[int],
|
||||||
|
@ -403,6 +412,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
fetched_keys=non_member_types,
|
fetched_keys=non_member_types,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def store_state_deltas_for_batched(
|
async def store_state_deltas_for_batched(
|
||||||
self,
|
self,
|
||||||
events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]],
|
events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]],
|
||||||
|
@ -520,6 +531,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
prev_group,
|
prev_group,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def store_state_group(
|
async def store_state_group(
|
||||||
self,
|
self,
|
||||||
event_id: str,
|
event_id: str,
|
||||||
|
@ -772,6 +785,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
((sg,) for sg in state_groups_to_delete),
|
((sg,) for sg in state_groups_to_delete),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
@tag_args
|
||||||
async def get_previous_state_groups(
|
async def get_previous_state_groups(
|
||||||
self, state_groups: Iterable[int]
|
self, state_groups: Iterable[int]
|
||||||
) -> Dict[int, int]:
|
) -> Dict[int, int]:
|
||||||
|
|
Loading…
Reference in New Issue