Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
commit
33551be61b
|
@ -0,0 +1 @@
|
||||||
|
Log exceptions in looping calls
|
|
@ -0,0 +1 @@
|
||||||
|
Add metric to count number of non-empty sync responses
|
|
@ -0,0 +1 @@
|
||||||
|
Workers now start on Python 3.
|
|
@ -0,0 +1,2 @@
|
||||||
|
Synapse now starts on Python 3.7.
|
||||||
|
_All_ workers now start on Python 3.
|
|
@ -17,6 +17,7 @@ import gc
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
import psutil
|
||||||
from daemonize import Daemonize
|
from daemonize import Daemonize
|
||||||
|
|
||||||
from twisted.internet import error, reactor
|
from twisted.internet import error, reactor
|
||||||
|
@ -24,12 +25,6 @@ from twisted.internet import error, reactor
|
||||||
from synapse.util import PreserveLoggingContext
|
from synapse.util import PreserveLoggingContext
|
||||||
from synapse.util.rlimit import change_resource_limit
|
from synapse.util.rlimit import change_resource_limit
|
||||||
|
|
||||||
try:
|
|
||||||
import affinity
|
|
||||||
except Exception:
|
|
||||||
affinity = None
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -89,15 +84,20 @@ def start_reactor(
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
logger.info("Running")
|
logger.info("Running")
|
||||||
if cpu_affinity is not None:
|
if cpu_affinity is not None:
|
||||||
if not affinity:
|
# Turn the bitmask into bits, reverse it so we go from 0 up
|
||||||
quit_with_error(
|
mask_to_bits = bin(cpu_affinity)[2:][::-1]
|
||||||
"Missing package 'affinity' required for cpu_affinity\n"
|
|
||||||
"option\n\n"
|
cpus = []
|
||||||
"Install by running:\n\n"
|
cpu_num = 0
|
||||||
" pip install affinity\n\n"
|
|
||||||
)
|
for i in mask_to_bits:
|
||||||
logger.info("Setting CPU affinity to %s" % cpu_affinity)
|
if i == "1":
|
||||||
affinity.set_process_affinity_mask(0, cpu_affinity)
|
cpus.append(cpu_num)
|
||||||
|
cpu_num += 1
|
||||||
|
|
||||||
|
p = psutil.Process()
|
||||||
|
p.cpu_affinity(cpus)
|
||||||
|
|
||||||
change_resource_limit(soft_file_limit)
|
change_resource_limit(soft_file_limit)
|
||||||
if gc_thresholds:
|
if gc_thresholds:
|
||||||
gc.set_threshold(*gc_thresholds)
|
gc.set_threshold(*gc_thresholds)
|
||||||
|
|
|
@ -178,6 +178,9 @@ def start(config_options):
|
||||||
|
|
||||||
setup_logging(config, use_worker_options=True)
|
setup_logging(config, use_worker_options=True)
|
||||||
|
|
||||||
|
# This should only be done on the user directory worker or the master
|
||||||
|
config.update_user_directory = False
|
||||||
|
|
||||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||||
|
|
||||||
database_engine = create_engine(config.database_config)
|
database_engine = create_engine(config.database_config)
|
||||||
|
|
|
@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
|
||||||
"Authorization": auth_headers,
|
"Authorization": auth_headers,
|
||||||
}
|
}
|
||||||
result = yield self.http_client.get_json(
|
result = yield self.http_client.get_json(
|
||||||
self.main_uri + request.uri,
|
self.main_uri + request.uri.decode('ascii'),
|
||||||
headers=headers,
|
headers=headers,
|
||||||
)
|
)
|
||||||
defer.returnValue((200, result))
|
defer.returnValue((200, result))
|
||||||
|
@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet):
|
||||||
"Authorization": auth_headers,
|
"Authorization": auth_headers,
|
||||||
}
|
}
|
||||||
result = yield self.http_client.post_json_get_json(
|
result = yield self.http_client.post_json_get_json(
|
||||||
self.main_uri + request.uri,
|
self.main_uri + request.uri.decode('ascii'),
|
||||||
body,
|
body,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
)
|
)
|
||||||
|
|
|
@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.metrics import RegistryProxy
|
from synapse.metrics import RegistryProxy
|
||||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||||
|
from synapse.replication.slave.storage._base import __func__
|
||||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||||
|
@ -49,31 +50,31 @@ class PusherSlaveStore(
|
||||||
SlavedAccountDataStore
|
SlavedAccountDataStore
|
||||||
):
|
):
|
||||||
update_pusher_last_stream_ordering_and_success = (
|
update_pusher_last_stream_ordering_and_success = (
|
||||||
DataStore.update_pusher_last_stream_ordering_and_success.__func__
|
__func__(DataStore.update_pusher_last_stream_ordering_and_success)
|
||||||
)
|
)
|
||||||
|
|
||||||
update_pusher_failing_since = (
|
update_pusher_failing_since = (
|
||||||
DataStore.update_pusher_failing_since.__func__
|
__func__(DataStore.update_pusher_failing_since)
|
||||||
)
|
)
|
||||||
|
|
||||||
update_pusher_last_stream_ordering = (
|
update_pusher_last_stream_ordering = (
|
||||||
DataStore.update_pusher_last_stream_ordering.__func__
|
__func__(DataStore.update_pusher_last_stream_ordering)
|
||||||
)
|
)
|
||||||
|
|
||||||
get_throttle_params_by_room = (
|
get_throttle_params_by_room = (
|
||||||
DataStore.get_throttle_params_by_room.__func__
|
__func__(DataStore.get_throttle_params_by_room)
|
||||||
)
|
)
|
||||||
|
|
||||||
set_throttle_params = (
|
set_throttle_params = (
|
||||||
DataStore.set_throttle_params.__func__
|
__func__(DataStore.set_throttle_params)
|
||||||
)
|
)
|
||||||
|
|
||||||
get_time_of_last_push_action_before = (
|
get_time_of_last_push_action_before = (
|
||||||
DataStore.get_time_of_last_push_action_before.__func__
|
__func__(DataStore.get_time_of_last_push_action_before)
|
||||||
)
|
)
|
||||||
|
|
||||||
get_profile_displayname = (
|
get_profile_displayname = (
|
||||||
DataStore.get_profile_displayname.__func__
|
__func__(DataStore.get_profile_displayname)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.metrics import RegistryProxy
|
from synapse.metrics import RegistryProxy
|
||||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
|
@ -147,7 +147,7 @@ class SynchrotronPresence(object):
|
||||||
and haven't come back yet. If there are poke the master about them.
|
and haven't come back yet. If there are poke the master about them.
|
||||||
"""
|
"""
|
||||||
now = self.clock.time_msec()
|
now = self.clock.time_msec()
|
||||||
for user_id, last_sync_ms in self.users_going_offline.items():
|
for user_id, last_sync_ms in list(self.users_going_offline.items()):
|
||||||
if now - last_sync_ms > 10 * 1000:
|
if now - last_sync_ms > 10 * 1000:
|
||||||
self.users_going_offline.pop(user_id, None)
|
self.users_going_offline.pop(user_id, None)
|
||||||
self.send_user_sync(user_id, False, last_sync_ms)
|
self.send_user_sync(user_id, False, last_sync_ms)
|
||||||
|
@ -156,9 +156,9 @@ class SynchrotronPresence(object):
|
||||||
# TODO Hows this supposed to work?
|
# TODO Hows this supposed to work?
|
||||||
pass
|
pass
|
||||||
|
|
||||||
get_states = PresenceHandler.get_states.__func__
|
get_states = __func__(PresenceHandler.get_states)
|
||||||
get_state = PresenceHandler.get_state.__func__
|
get_state = __func__(PresenceHandler.get_state)
|
||||||
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
|
||||||
|
|
||||||
def user_syncing(self, user_id, affect_presence):
|
def user_syncing(self, user_id, affect_presence):
|
||||||
if affect_presence:
|
if affect_presence:
|
||||||
|
@ -208,7 +208,7 @@ class SynchrotronPresence(object):
|
||||||
) for row in rows]
|
) for row in rows]
|
||||||
|
|
||||||
for state in states:
|
for state in states:
|
||||||
self.user_to_current_state[row.user_id] = state
|
self.user_to_current_state[state.user_id] = state
|
||||||
|
|
||||||
stream_id = token
|
stream_id = token
|
||||||
yield self.notify_from_replication(states, stream_id)
|
yield self.notify_from_replication(states, stream_id)
|
||||||
|
|
|
@ -28,6 +28,7 @@ from synapse.metrics import (
|
||||||
event_processing_loop_room_count,
|
event_processing_loop_room_count,
|
||||||
)
|
)
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
from synapse.util import log_failure
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
|
@ -36,17 +37,6 @@ logger = logging.getLogger(__name__)
|
||||||
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
||||||
|
|
||||||
|
|
||||||
def log_failure(failure):
|
|
||||||
logger.error(
|
|
||||||
"Application Services Failure",
|
|
||||||
exc_info=(
|
|
||||||
failure.type,
|
|
||||||
failure.value,
|
|
||||||
failure.getTracebackObject()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ApplicationServicesHandler(object):
|
class ApplicationServicesHandler(object):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -112,7 +102,10 @@ class ApplicationServicesHandler(object):
|
||||||
|
|
||||||
if not self.started_scheduler:
|
if not self.started_scheduler:
|
||||||
def start_scheduler():
|
def start_scheduler():
|
||||||
return self.scheduler.start().addErrback(log_failure)
|
return self.scheduler.start().addErrback(
|
||||||
|
log_failure, "Application Services Failure",
|
||||||
|
)
|
||||||
|
|
||||||
run_as_background_process("as_scheduler", start_scheduler)
|
run_as_background_process("as_scheduler", start_scheduler)
|
||||||
self.started_scheduler = True
|
self.started_scheduler = True
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@ import logging
|
||||||
|
|
||||||
from six import iteritems, itervalues
|
from six import iteritems, itervalues
|
||||||
|
|
||||||
|
from prometheus_client import Counter
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
|
@ -38,6 +40,18 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
|
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
|
||||||
|
|
||||||
|
# Counts the number of times we returned a non-empty sync. `type` is one of
|
||||||
|
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
|
||||||
|
# "true" or "false" depending on if the request asked for lazy loaded members or
|
||||||
|
# not.
|
||||||
|
non_empty_sync_counter = Counter(
|
||||||
|
"synapse_handlers_sync_nonempty_total",
|
||||||
|
"Count of non empty sync responses. type is initial_sync/full_state_sync"
|
||||||
|
"/incremental_sync. lazy_loaded indicates if lazy loaded members were "
|
||||||
|
"enabled for that request.",
|
||||||
|
["type", "lazy_loaded"],
|
||||||
|
)
|
||||||
|
|
||||||
# Store the cache that tracks which lazy-loaded members have been sent to a given
|
# Store the cache that tracks which lazy-loaded members have been sent to a given
|
||||||
# client for no more than 30 minutes.
|
# client for no more than 30 minutes.
|
||||||
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
|
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
|
||||||
|
@ -231,14 +245,16 @@ class SyncHandler(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
||||||
full_state):
|
full_state):
|
||||||
|
if since_token is None:
|
||||||
|
sync_type = "initial_sync"
|
||||||
|
elif full_state:
|
||||||
|
sync_type = "full_state_sync"
|
||||||
|
else:
|
||||||
|
sync_type = "incremental_sync"
|
||||||
|
|
||||||
context = LoggingContext.current_context()
|
context = LoggingContext.current_context()
|
||||||
if context:
|
if context:
|
||||||
if since_token is None:
|
context.tag = sync_type
|
||||||
context.tag = "initial_sync"
|
|
||||||
elif full_state:
|
|
||||||
context.tag = "full_state_sync"
|
|
||||||
else:
|
|
||||||
context.tag = "incremental_sync"
|
|
||||||
|
|
||||||
if timeout == 0 or since_token is None or full_state:
|
if timeout == 0 or since_token is None or full_state:
|
||||||
# we are going to return immediately, so don't bother calling
|
# we are going to return immediately, so don't bother calling
|
||||||
|
@ -246,7 +262,6 @@ class SyncHandler(object):
|
||||||
result = yield self.current_sync_for_user(
|
result = yield self.current_sync_for_user(
|
||||||
sync_config, since_token, full_state=full_state,
|
sync_config, since_token, full_state=full_state,
|
||||||
)
|
)
|
||||||
defer.returnValue(result)
|
|
||||||
else:
|
else:
|
||||||
def current_sync_callback(before_token, after_token):
|
def current_sync_callback(before_token, after_token):
|
||||||
return self.current_sync_for_user(sync_config, since_token)
|
return self.current_sync_for_user(sync_config, since_token)
|
||||||
|
@ -255,6 +270,14 @@ class SyncHandler(object):
|
||||||
sync_config.user.to_string(), timeout, current_sync_callback,
|
sync_config.user.to_string(), timeout, current_sync_callback,
|
||||||
from_token=since_token,
|
from_token=since_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if result:
|
||||||
|
if sync_config.filter_collection.lazy_load_members():
|
||||||
|
lazy_loaded = "true"
|
||||||
|
else:
|
||||||
|
lazy_loaded = "false"
|
||||||
|
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
def current_sync_for_user(self, sync_config, since_token=None,
|
def current_sync_for_user(self, sync_config, since_token=None,
|
||||||
|
|
|
@ -82,9 +82,6 @@ CONDITIONAL_REQUIREMENTS = {
|
||||||
"psutil": {
|
"psutil": {
|
||||||
"psutil>=2.0.0": ["psutil>=2.0.0"],
|
"psutil>=2.0.0": ["psutil>=2.0.0"],
|
||||||
},
|
},
|
||||||
"affinity": {
|
|
||||||
"affinity": ["affinity"],
|
|
||||||
},
|
|
||||||
"postgres": {
|
"postgres": {
|
||||||
"psycopg2>=2.6": ["psycopg2"]
|
"psycopg2>=2.6": ["psycopg2"]
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
|
||||||
|
@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def __func__(inp):
|
||||||
|
if six.PY3:
|
||||||
|
return inp
|
||||||
|
else:
|
||||||
|
return inp.__func__
|
||||||
|
|
||||||
|
|
||||||
class BaseSlavedStore(SQLBaseStore):
|
class BaseSlavedStore(SQLBaseStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(BaseSlavedStore, self).__init__(db_conn, hs)
|
super(BaseSlavedStore, self).__init__(db_conn, hs)
|
||||||
|
|
|
@ -17,7 +17,7 @@ from synapse.storage import DataStore
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
|
@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
|
||||||
expiry_ms=30 * 60 * 1000,
|
expiry_ms=30 * 60 * 1000,
|
||||||
)
|
)
|
||||||
|
|
||||||
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
|
get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token)
|
||||||
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
|
get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device)
|
||||||
get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
|
get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote)
|
||||||
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
|
delete_messages_for_device = __func__(DataStore.delete_messages_for_device)
|
||||||
delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
|
delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote)
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedDeviceInboxStore, self).stream_positions()
|
result = super(SlavedDeviceInboxStore, self).stream_positions()
|
||||||
|
|
|
@ -13,23 +13,14 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import six
|
|
||||||
|
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.end_to_end_keys import EndToEndKeyStore
|
from synapse.storage.end_to_end_keys import EndToEndKeyStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
def __func__(inp):
|
|
||||||
if six.PY3:
|
|
||||||
return inp
|
|
||||||
else:
|
|
||||||
return inp.__func__
|
|
||||||
|
|
||||||
|
|
||||||
class SlavedDeviceStore(BaseSlavedStore):
|
class SlavedDeviceStore(BaseSlavedStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(SlavedDeviceStore, self).__init__(db_conn, hs)
|
super(SlavedDeviceStore, self).__init__(db_conn, hs)
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
|
@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore):
|
||||||
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
|
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
|
||||||
)
|
)
|
||||||
|
|
||||||
get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
|
get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
|
||||||
get_group_stream_token = DataStore.get_group_stream_token.__func__
|
get_group_stream_token = __func__(DataStore.get_group_stream_token)
|
||||||
get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
|
get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedGroupServerStore, self).stream_positions()
|
result = super(SlavedGroupServerStore, self).stream_positions()
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.keys import KeyStore
|
from synapse.storage.keys import KeyStore
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
|
|
||||||
|
|
||||||
class SlavedKeyStore(BaseSlavedStore):
|
class SlavedKeyStore(BaseSlavedStore):
|
||||||
|
@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore):
|
||||||
"_get_server_verify_key"
|
"_get_server_verify_key"
|
||||||
]
|
]
|
||||||
|
|
||||||
get_server_verify_keys = DataStore.get_server_verify_keys.__func__
|
get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
|
||||||
store_server_verify_key = DataStore.store_server_verify_key.__func__
|
store_server_verify_key = __func__(DataStore.store_server_verify_key)
|
||||||
|
|
||||||
get_server_certificate = DataStore.get_server_certificate.__func__
|
get_server_certificate = __func__(DataStore.get_server_certificate)
|
||||||
store_server_certificate = DataStore.store_server_certificate.__func__
|
store_server_certificate = __func__(DataStore.store_server_certificate)
|
||||||
|
|
||||||
get_server_keys_json = DataStore.get_server_keys_json.__func__
|
get_server_keys_json = __func__(DataStore.get_server_keys_json)
|
||||||
store_server_keys_json = DataStore.store_server_keys_json.__func__
|
store_server_keys_json = __func__(DataStore.store_server_keys_json)
|
||||||
|
|
|
@ -17,7 +17,7 @@ from synapse.storage import DataStore
|
||||||
from synapse.storage.presence import PresenceStore
|
from synapse.storage.presence import PresenceStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore, __func__
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore):
|
||||||
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
|
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
|
||||||
)
|
)
|
||||||
|
|
||||||
_get_active_presence = DataStore._get_active_presence.__func__
|
_get_active_presence = __func__(DataStore._get_active_presence)
|
||||||
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
|
take_presence_startup_info = __func__(DataStore.take_presence_startup_info)
|
||||||
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
|
||||||
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
|
||||||
|
|
||||||
|
|
|
@ -630,7 +630,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_all_new_events_stream(self, from_id, current_id, limit):
|
def get_all_new_events_stream(self, from_id, current_id, limit):
|
||||||
"""Get all new events"""
|
"""Get all new events
|
||||||
|
|
||||||
|
Returns all events with from_id < stream_ordering <= current_id.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
from_id (int): the stream_ordering of the last event we processed
|
||||||
|
current_id (int): the stream_ordering of the most recently processed event
|
||||||
|
limit (int): the maximum number of events to return
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[Tuple[int, list[FrozenEvent]]]: A tuple of (next_id, events), where
|
||||||
|
`next_id` is the next value to pass as `from_id` (it will either be the
|
||||||
|
stream_ordering of the last returned event, or, if fewer than `limit` events
|
||||||
|
were found, `current_id`.
|
||||||
|
"""
|
||||||
|
|
||||||
def get_all_new_events_stream_txn(txn):
|
def get_all_new_events_stream_txn(txn):
|
||||||
sql = (
|
sql = (
|
||||||
|
|
|
@ -68,7 +68,10 @@ class Clock(object):
|
||||||
"""
|
"""
|
||||||
call = task.LoopingCall(f)
|
call = task.LoopingCall(f)
|
||||||
call.clock = self._reactor
|
call.clock = self._reactor
|
||||||
call.start(msec / 1000.0, now=False)
|
d = call.start(msec / 1000.0, now=False)
|
||||||
|
d.addErrback(
|
||||||
|
log_failure, "Looping call died", consumeErrors=False,
|
||||||
|
)
|
||||||
return call
|
return call
|
||||||
|
|
||||||
def call_later(self, delay, callback, *args, **kwargs):
|
def call_later(self, delay, callback, *args, **kwargs):
|
||||||
|
@ -109,3 +112,29 @@ def batch_iter(iterable, size):
|
||||||
sourceiter = iter(iterable)
|
sourceiter = iter(iterable)
|
||||||
# call islice until it returns an empty tuple
|
# call islice until it returns an empty tuple
|
||||||
return iter(lambda: tuple(islice(sourceiter, size)), ())
|
return iter(lambda: tuple(islice(sourceiter, size)), ())
|
||||||
|
|
||||||
|
|
||||||
|
def log_failure(failure, msg, consumeErrors=True):
|
||||||
|
"""Creates a function suitable for passing to `Deferred.addErrback` that
|
||||||
|
logs any failures that occur.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (str): Message to log
|
||||||
|
consumeErrors (bool): If true consumes the failure, otherwise passes
|
||||||
|
on down the callback chain
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
func(Failure)
|
||||||
|
"""
|
||||||
|
|
||||||
|
logger.error(
|
||||||
|
msg,
|
||||||
|
exc_info=(
|
||||||
|
failure.type,
|
||||||
|
failure.value,
|
||||||
|
failure.getTracebackObject()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if not consumeErrors:
|
||||||
|
return failure
|
||||||
|
|
|
@ -219,7 +219,7 @@ def filter_events_for_server(store, server_name, events):
|
||||||
# Whatever else we do, we need to check for senders which have requested
|
# Whatever else we do, we need to check for senders which have requested
|
||||||
# erasure of their data.
|
# erasure of their data.
|
||||||
erased_senders = yield store.are_users_erased(
|
erased_senders = yield store.are_users_erased(
|
||||||
e.sender for e in events,
|
(e.sender for e in events),
|
||||||
)
|
)
|
||||||
|
|
||||||
def redact_disallowed(event, state):
|
def redact_disallowed(event, state):
|
||||||
|
|
2
synctl
2
synctl
|
@ -280,7 +280,7 @@ def main():
|
||||||
if worker.cache_factor:
|
if worker.cache_factor:
|
||||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
|
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
|
||||||
|
|
||||||
for cache_name, factor in worker.cache_factors.iteritems():
|
for cache_name, factor in iteritems(worker.cache_factors):
|
||||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||||
|
|
||||||
start_worker(worker.app, configfile, worker.configfile)
|
start_worker(worker.app, configfile, worker.configfile)
|
||||||
|
|
Loading…
Reference in New Issue