Compare commits
28 Commits
61bb834364
...
334bfdbc90
Author | SHA1 | Date |
---|---|---|
Amber Brown | 334bfdbc90 | |
Andrew Morgan | 07b88c546d | |
Richard van der Hoff | 0f05fd1530 | |
Richard van der Hoff | fd4c975b5b | |
Richard van der Hoff | bae32740da | |
Richard van der Hoff | 6dd6a3557c | |
Richard van der Hoff | 0cbb4808ed | |
Richard van der Hoff | 14a8e71297 | |
Richard van der Hoff | 883ac4b1bb | |
Richard van der Hoff | cb40b0cb80 | |
Richard van der Hoff | 0122ef1037 | |
Richard van der Hoff | 8d4cbdeaa9 | |
Richard van der Hoff | 553c8a9b6b | |
Richard van der Hoff | 29ce90358c | |
Richard van der Hoff | fcc2de7a0c | |
Richard van der Hoff | daa1ac89a0 | |
Richard van der Hoff | 6d7cec7a57 | |
Andrew Morgan | f7d6e849b3 | |
Andrew Morgan | 08edefe694 | |
Andrew Morgan | ec56620ff6 | |
Andrew Morgan | b730480abb | |
Richard van der Hoff | af47264b78 | |
Richard van der Hoff | b413ab8aa6 | |
Richard van der Hoff | 7b608cf468 | |
Richard van der Hoff | b4c2234232 | |
Richard van der Hoff | 51f4d52cb4 | |
Richard van der Hoff | 26d17b9bdc | |
Richard van der Hoff | cfe8c8ab8e |
17
CHANGES.md
17
CHANGES.md
|
@ -6,6 +6,23 @@ Next version
|
||||||
configuration then this template will need to be duplicated into that
|
configuration then this template will need to be duplicated into that
|
||||||
directory.
|
directory.
|
||||||
|
|
||||||
|
Synapse 1.12.3 (2020-04-03)
|
||||||
|
===========================
|
||||||
|
|
||||||
|
- Remove the the pin to Pillow 7.0 which was introduced in Synapse 1.12.2, and
|
||||||
|
correctly fix the issue with building the Debian packages. ([\#7212](https://github.com/matrix-org/synapse/issues/7212))
|
||||||
|
|
||||||
|
Synapse 1.12.2 (2020-04-02)
|
||||||
|
===========================
|
||||||
|
|
||||||
|
This release works around [an
|
||||||
|
issue](https://github.com/matrix-org/synapse/issues/7208) with building the
|
||||||
|
debian packages.
|
||||||
|
|
||||||
|
No other significant changes since 1.12.1.
|
||||||
|
|
||||||
|
>>>>>>> master
|
||||||
|
|
||||||
Synapse 1.12.1 (2020-04-02)
|
Synapse 1.12.1 (2020-04-02)
|
||||||
===========================
|
===========================
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Add benchmarks for LruCache.
|
|
@ -0,0 +1 @@
|
||||||
|
Fix device list update stream ids going backward.
|
|
@ -0,0 +1 @@
|
||||||
|
Clean up some LoggingContext code.
|
|
@ -0,0 +1 @@
|
||||||
|
Fix consistency of HTTP status codes reported in log lines.
|
|
@ -0,0 +1 @@
|
||||||
|
Only run one background database update at a time.
|
|
@ -0,0 +1 @@
|
||||||
|
Fix some worker-mode replication handling not being correctly recorded in CPU usage stats.
|
|
@ -1,3 +1,20 @@
|
||||||
|
matrix-synapse-py3 (1.12.3) stable; urgency=medium
|
||||||
|
|
||||||
|
[ Richard van der Hoff ]
|
||||||
|
* Update the Debian build scripts to handle the new installation paths
|
||||||
|
for the support libraries introduced by Pillow 7.1.1.
|
||||||
|
|
||||||
|
[ Synapse Packaging team ]
|
||||||
|
* New synapse release 1.12.3.
|
||||||
|
|
||||||
|
-- Synapse Packaging team <packages@matrix.org> Fri, 03 Apr 2020 10:55:03 +0100
|
||||||
|
|
||||||
|
matrix-synapse-py3 (1.12.2) stable; urgency=medium
|
||||||
|
|
||||||
|
* New synapse release 1.12.2.
|
||||||
|
|
||||||
|
-- Synapse Packaging team <packages@matrix.org> Mon, 02 Apr 2020 19:02:17 +0000
|
||||||
|
|
||||||
matrix-synapse-py3 (1.12.1) stable; urgency=medium
|
matrix-synapse-py3 (1.12.1) stable; urgency=medium
|
||||||
|
|
||||||
* New synapse release 1.12.1.
|
* New synapse release 1.12.1.
|
||||||
|
|
|
@ -15,17 +15,38 @@ override_dh_installinit:
|
||||||
# we don't really want to strip the symbols from our object files.
|
# we don't really want to strip the symbols from our object files.
|
||||||
override_dh_strip:
|
override_dh_strip:
|
||||||
|
|
||||||
|
# dh_shlibdeps calls dpkg-shlibdeps, which finds all the binary files
|
||||||
|
# (executables and shared libs) in the package, and looks for the shared
|
||||||
|
# libraries that they depend on. It then adds a dependency on the package that
|
||||||
|
# contains that library to the package.
|
||||||
|
#
|
||||||
|
# We make two modifications to that process...
|
||||||
|
#
|
||||||
override_dh_shlibdeps:
|
override_dh_shlibdeps:
|
||||||
# make the postgres package's dependencies a recommendation
|
# Firstly, postgres is not a hard dependency for us, so we want to make
|
||||||
# rather than a hard dependency.
|
# the things that psycopg2 depends on (such as libpq) be
|
||||||
|
# recommendations rather than hard dependencies. We do so by
|
||||||
|
# running dpkg-shlibdeps manually on psycopg2's libs.
|
||||||
|
#
|
||||||
find debian/$(PACKAGE_NAME)/ -path '*/site-packages/psycopg2/*.so' | \
|
find debian/$(PACKAGE_NAME)/ -path '*/site-packages/psycopg2/*.so' | \
|
||||||
xargs dpkg-shlibdeps -Tdebian/$(PACKAGE_NAME).substvars \
|
xargs dpkg-shlibdeps -Tdebian/$(PACKAGE_NAME).substvars \
|
||||||
-pshlibs1 -dRecommends
|
-pshlibs1 -dRecommends
|
||||||
|
|
||||||
# all the other dependencies can be normal 'Depends' requirements,
|
# secondly, we exclude PIL's libraries from the process. They are known
|
||||||
# except for PIL's, which is self-contained and which confuses
|
# to be self-contained, but they have interdependencies and
|
||||||
# dpkg-shlibdeps.
|
# dpkg-shlibdeps doesn't know how to resolve them.
|
||||||
dh_shlibdeps -X site-packages/PIL/.libs -X site-packages/psycopg2
|
#
|
||||||
|
# As of Pillow 7.1.0, these libraries are in
|
||||||
|
# site-packages/Pillow.libs. Previously, they were in
|
||||||
|
# site-packages/PIL/.libs.
|
||||||
|
#
|
||||||
|
# (we also need to exclude psycopg2, of course, since we've already
|
||||||
|
# dealt with that.)
|
||||||
|
#
|
||||||
|
dh_shlibdeps \
|
||||||
|
-X site-packages/PIL/.libs \
|
||||||
|
-X site-packages/Pillow.libs \
|
||||||
|
-X site-packages/psycopg2
|
||||||
|
|
||||||
override_dh_virtualenv:
|
override_dh_virtualenv:
|
||||||
./debian/build_virtualenv
|
./debian/build_virtualenv
|
||||||
|
|
|
@ -36,7 +36,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
__version__ = "1.12.1"
|
__version__ = "1.12.3"
|
||||||
|
|
||||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||||
# We import here so that we don't have to install a bunch of deps when
|
# We import here so that we don't have to install a bunch of deps when
|
||||||
|
|
|
@ -86,7 +86,14 @@ class CodeMessageException(RuntimeError):
|
||||||
|
|
||||||
def __init__(self, code, msg):
|
def __init__(self, code, msg):
|
||||||
super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
|
super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
|
||||||
self.code = code
|
|
||||||
|
# Some calls to this method pass instances of http.HTTPStatus for `code`.
|
||||||
|
# While HTTPStatus is a subclass of int, it has magic __str__ methods
|
||||||
|
# which emit `HTTPStatus.FORBIDDEN` when converted to a str, instead of `403`.
|
||||||
|
# This causes inconsistency in our log lines.
|
||||||
|
#
|
||||||
|
# To eliminate this behaviour, we convert them to their integer equivalents here.
|
||||||
|
self.code = int(code)
|
||||||
self.msg = msg
|
self.msg = msg
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties
|
||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.logging.context import LoggingContext, run_in_background
|
from synapse.logging.context import LoggingContext
|
||||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
|
||||||
|
@ -635,7 +635,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
|
||||||
await super(GenericWorkerReplicationHandler, self).on_rdata(
|
await super(GenericWorkerReplicationHandler, self).on_rdata(
|
||||||
stream_name, token, rows
|
stream_name, token, rows
|
||||||
)
|
)
|
||||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
await self.process_and_notify(stream_name, token, rows)
|
||||||
|
|
||||||
def get_streams_to_replicate(self):
|
def get_streams_to_replicate(self):
|
||||||
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
|
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
|
||||||
|
@ -650,7 +650,9 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
|
||||||
async def process_and_notify(self, stream_name, token, rows):
|
async def process_and_notify(self, stream_name, token, rows):
|
||||||
try:
|
try:
|
||||||
if self.send_handler:
|
if self.send_handler:
|
||||||
self.send_handler.process_replication_rows(stream_name, token, rows)
|
await self.send_handler.process_replication_rows(
|
||||||
|
stream_name, token, rows
|
||||||
|
)
|
||||||
|
|
||||||
if stream_name == EventsStream.NAME:
|
if stream_name == EventsStream.NAME:
|
||||||
# We shouldn't get multiple rows per token for events stream, so
|
# We shouldn't get multiple rows per token for events stream, so
|
||||||
|
@ -782,12 +784,12 @@ class FederationSenderHandler(object):
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
return {"federation": self.federation_position}
|
return {"federation": self.federation_position}
|
||||||
|
|
||||||
def process_replication_rows(self, stream_name, token, rows):
|
async def process_replication_rows(self, stream_name, token, rows):
|
||||||
# The federation stream contains things that we want to send out, e.g.
|
# The federation stream contains things that we want to send out, e.g.
|
||||||
# presence, typing, etc.
|
# presence, typing, etc.
|
||||||
if stream_name == "federation":
|
if stream_name == "federation":
|
||||||
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
||||||
run_in_background(self.update_token, token)
|
await self.update_token(token)
|
||||||
|
|
||||||
# We also need to poke the federation sender when new events happen
|
# We also need to poke the federation sender when new events happen
|
||||||
elif stream_name == "events":
|
elif stream_name == "events":
|
||||||
|
@ -795,9 +797,7 @@ class FederationSenderHandler(object):
|
||||||
|
|
||||||
# ... and when new receipts happen
|
# ... and when new receipts happen
|
||||||
elif stream_name == ReceiptsStream.NAME:
|
elif stream_name == ReceiptsStream.NAME:
|
||||||
run_as_background_process(
|
await self._on_new_receipts(rows)
|
||||||
"process_receipts_for_federation", self._on_new_receipts, rows
|
|
||||||
)
|
|
||||||
|
|
||||||
# ... as well as device updates and messages
|
# ... as well as device updates and messages
|
||||||
elif stream_name == DeviceListsStream.NAME:
|
elif stream_name == DeviceListsStream.NAME:
|
||||||
|
|
|
@ -51,7 +51,7 @@ try:
|
||||||
|
|
||||||
is_thread_resource_usage_supported = True
|
is_thread_resource_usage_supported = True
|
||||||
|
|
||||||
def get_thread_resource_usage():
|
def get_thread_resource_usage() -> "Optional[resource._RUsage]":
|
||||||
return resource.getrusage(RUSAGE_THREAD)
|
return resource.getrusage(RUSAGE_THREAD)
|
||||||
|
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ except Exception:
|
||||||
# won't track resource usage.
|
# won't track resource usage.
|
||||||
is_thread_resource_usage_supported = False
|
is_thread_resource_usage_supported = False
|
||||||
|
|
||||||
def get_thread_resource_usage():
|
def get_thread_resource_usage() -> "Optional[resource._RUsage]":
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@ -201,10 +201,10 @@ class _Sentinel(object):
|
||||||
record["request"] = None
|
record["request"] = None
|
||||||
record["scope"] = None
|
record["scope"] = None
|
||||||
|
|
||||||
def start(self):
|
def start(self, rusage: "Optional[resource._RUsage]"):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def stop(self):
|
def stop(self, rusage: "Optional[resource._RUsage]"):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def add_database_transaction(self, duration_sec):
|
def add_database_transaction(self, duration_sec):
|
||||||
|
@ -261,7 +261,7 @@ class LoggingContext(object):
|
||||||
|
|
||||||
# The thread resource usage when the logcontext became active. None
|
# The thread resource usage when the logcontext became active. None
|
||||||
# if the context is not currently active.
|
# if the context is not currently active.
|
||||||
self.usage_start = None
|
self.usage_start = None # type: Optional[resource._RUsage]
|
||||||
|
|
||||||
self.main_thread = get_thread_id()
|
self.main_thread = get_thread_id()
|
||||||
self.request = None
|
self.request = None
|
||||||
|
@ -336,7 +336,17 @@ class LoggingContext(object):
|
||||||
record["request"] = self.request
|
record["request"] = self.request
|
||||||
record["scope"] = self.scope
|
record["scope"] = self.scope
|
||||||
|
|
||||||
def start(self) -> None:
|
def start(self, rusage: "Optional[resource._RUsage]") -> None:
|
||||||
|
"""
|
||||||
|
Record that this logcontext is currently running.
|
||||||
|
|
||||||
|
This should not be called directly: use set_current_context
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rusage: the resources used by the current thread, at the point of
|
||||||
|
switching to this logcontext. May be None if this platform doesn't
|
||||||
|
support getrusuage.
|
||||||
|
"""
|
||||||
if get_thread_id() != self.main_thread:
|
if get_thread_id() != self.main_thread:
|
||||||
logger.warning("Started logcontext %s on different thread", self)
|
logger.warning("Started logcontext %s on different thread", self)
|
||||||
return
|
return
|
||||||
|
@ -349,36 +359,48 @@ class LoggingContext(object):
|
||||||
if self.usage_start:
|
if self.usage_start:
|
||||||
logger.warning("Re-starting already-active log context %s", self)
|
logger.warning("Re-starting already-active log context %s", self)
|
||||||
else:
|
else:
|
||||||
self.usage_start = get_thread_resource_usage()
|
self.usage_start = rusage
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self, rusage: "Optional[resource._RUsage]") -> None:
|
||||||
if get_thread_id() != self.main_thread:
|
"""
|
||||||
logger.warning("Stopped logcontext %s on different thread", self)
|
Record that this logcontext is no longer running.
|
||||||
return
|
|
||||||
|
|
||||||
# When we stop, let's record the cpu used since we started
|
This should not be called directly: use set_current_context
|
||||||
if not self.usage_start:
|
|
||||||
# Log a warning on platforms that support thread usage tracking
|
Args:
|
||||||
if is_thread_resource_usage_supported:
|
rusage: the resources used by the current thread, at the point of
|
||||||
|
switching away from this logcontext. May be None if this platform
|
||||||
|
doesn't support getrusuage.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
if get_thread_id() != self.main_thread:
|
||||||
|
logger.warning("Stopped logcontext %s on different thread", self)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not rusage:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Record the cpu used since we started
|
||||||
|
if not self.usage_start:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Called stop on logcontext %s without calling start", self
|
"Called stop on logcontext %s without recording a start rusage",
|
||||||
|
self,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
utime_delta, stime_delta = self._get_cputime()
|
utime_delta, stime_delta = self._get_cputime(rusage)
|
||||||
self._resource_usage.ru_utime += utime_delta
|
self._resource_usage.ru_utime += utime_delta
|
||||||
self._resource_usage.ru_stime += stime_delta
|
self._resource_usage.ru_stime += stime_delta
|
||||||
|
|
||||||
self.usage_start = None
|
# if we have a parent, pass our CPU usage stats on
|
||||||
|
if self.parent_context:
|
||||||
|
self.parent_context._resource_usage += self._resource_usage
|
||||||
|
|
||||||
# if we have a parent, pass our CPU usage stats on
|
# reset them in case we get entered again
|
||||||
if self.parent_context is not None and hasattr(
|
self._resource_usage.reset()
|
||||||
self.parent_context, "_resource_usage"
|
finally:
|
||||||
):
|
self.usage_start = None
|
||||||
self.parent_context._resource_usage += self._resource_usage
|
|
||||||
|
|
||||||
# reset them in case we get entered again
|
|
||||||
self._resource_usage.reset()
|
|
||||||
|
|
||||||
def get_resource_usage(self) -> ContextResourceUsage:
|
def get_resource_usage(self) -> ContextResourceUsage:
|
||||||
"""Get resources used by this logcontext so far.
|
"""Get resources used by this logcontext so far.
|
||||||
|
@ -394,24 +416,24 @@ class LoggingContext(object):
|
||||||
# can include resource usage so far.
|
# can include resource usage so far.
|
||||||
is_main_thread = get_thread_id() == self.main_thread
|
is_main_thread = get_thread_id() == self.main_thread
|
||||||
if self.usage_start and is_main_thread:
|
if self.usage_start and is_main_thread:
|
||||||
utime_delta, stime_delta = self._get_cputime()
|
rusage = get_thread_resource_usage()
|
||||||
|
assert rusage is not None
|
||||||
|
utime_delta, stime_delta = self._get_cputime(rusage)
|
||||||
res.ru_utime += utime_delta
|
res.ru_utime += utime_delta
|
||||||
res.ru_stime += stime_delta
|
res.ru_stime += stime_delta
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def _get_cputime(self) -> Tuple[float, float]:
|
def _get_cputime(self, current: "resource._RUsage") -> Tuple[float, float]:
|
||||||
"""Get the cpu usage time so far
|
"""Get the cpu usage time between start() and the given rusage
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rusage: the current resource usage
|
||||||
|
|
||||||
Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
|
Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
|
||||||
"""
|
"""
|
||||||
assert self.usage_start is not None
|
assert self.usage_start is not None
|
||||||
|
|
||||||
current = get_thread_resource_usage()
|
|
||||||
|
|
||||||
# Indicate to mypy that we know that self.usage_start is None.
|
|
||||||
assert self.usage_start is not None
|
|
||||||
|
|
||||||
utime_delta = current.ru_utime - self.usage_start.ru_utime
|
utime_delta = current.ru_utime - self.usage_start.ru_utime
|
||||||
stime_delta = current.ru_stime - self.usage_start.ru_stime
|
stime_delta = current.ru_stime - self.usage_start.ru_stime
|
||||||
|
|
||||||
|
@ -547,9 +569,11 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
|
||||||
current = current_context()
|
current = current_context()
|
||||||
|
|
||||||
if current is not context:
|
if current is not context:
|
||||||
current.stop()
|
rusage = get_thread_resource_usage()
|
||||||
|
current.stop(rusage)
|
||||||
_thread_local.current_context = context
|
_thread_local.current_context = context
|
||||||
context.start()
|
context.start(rusage)
|
||||||
|
|
||||||
return current
|
return current
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -90,8 +90,10 @@ class BackgroundUpdater(object):
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.db = database
|
self.db = database
|
||||||
|
|
||||||
|
# if a background update is currently running, its name.
|
||||||
|
self._current_background_update = None # type: Optional[str]
|
||||||
|
|
||||||
self._background_update_performance = {}
|
self._background_update_performance = {}
|
||||||
self._background_update_queue = []
|
|
||||||
self._background_update_handlers = {}
|
self._background_update_handlers = {}
|
||||||
self._all_done = False
|
self._all_done = False
|
||||||
|
|
||||||
|
@ -111,7 +113,7 @@ class BackgroundUpdater(object):
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error doing update")
|
logger.exception("Error doing update")
|
||||||
else:
|
else:
|
||||||
if result is None:
|
if result:
|
||||||
logger.info(
|
logger.info(
|
||||||
"No more background updates to do."
|
"No more background updates to do."
|
||||||
" Unscheduling background update task."
|
" Unscheduling background update task."
|
||||||
|
@ -119,26 +121,25 @@ class BackgroundUpdater(object):
|
||||||
self._all_done = True
|
self._all_done = True
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
async def has_completed_background_updates(self) -> bool:
|
||||||
def has_completed_background_updates(self):
|
|
||||||
"""Check if all the background updates have completed
|
"""Check if all the background updates have completed
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred[bool]: True if all background updates have completed
|
True if all background updates have completed
|
||||||
"""
|
"""
|
||||||
# if we've previously determined that there is nothing left to do, that
|
# if we've previously determined that there is nothing left to do, that
|
||||||
# is easy
|
# is easy
|
||||||
if self._all_done:
|
if self._all_done:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# obviously, if we have things in our queue, we're not done.
|
# obviously, if we are currently processing an update, we're not done.
|
||||||
if self._background_update_queue:
|
if self._current_background_update:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# otherwise, check if there are updates to be run. This is important,
|
# otherwise, check if there are updates to be run. This is important,
|
||||||
# as we may be running on a worker which doesn't perform the bg updates
|
# as we may be running on a worker which doesn't perform the bg updates
|
||||||
# itself, but still wants to wait for them to happen.
|
# itself, but still wants to wait for them to happen.
|
||||||
updates = yield self.db.simple_select_onecol(
|
updates = await self.db.simple_select_onecol(
|
||||||
"background_updates",
|
"background_updates",
|
||||||
keyvalues=None,
|
keyvalues=None,
|
||||||
retcol="1",
|
retcol="1",
|
||||||
|
@ -153,11 +154,10 @@ class BackgroundUpdater(object):
|
||||||
async def has_completed_background_update(self, update_name) -> bool:
|
async def has_completed_background_update(self, update_name) -> bool:
|
||||||
"""Check if the given background update has finished running.
|
"""Check if the given background update has finished running.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self._all_done:
|
if self._all_done:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if update_name in self._background_update_queue:
|
if update_name == self._current_background_update:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
update_exists = await self.db.simple_select_one_onecol(
|
update_exists = await self.db.simple_select_one_onecol(
|
||||||
|
@ -170,9 +170,7 @@ class BackgroundUpdater(object):
|
||||||
|
|
||||||
return not update_exists
|
return not update_exists
|
||||||
|
|
||||||
async def do_next_background_update(
|
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
|
||||||
self, desired_duration_ms: float
|
|
||||||
) -> Optional[int]:
|
|
||||||
"""Does some amount of work on the next queued background update
|
"""Does some amount of work on the next queued background update
|
||||||
|
|
||||||
Returns once some amount of work is done.
|
Returns once some amount of work is done.
|
||||||
|
@ -181,33 +179,51 @@ class BackgroundUpdater(object):
|
||||||
desired_duration_ms(float): How long we want to spend
|
desired_duration_ms(float): How long we want to spend
|
||||||
updating.
|
updating.
|
||||||
Returns:
|
Returns:
|
||||||
None if there is no more work to do, otherwise an int
|
True if we have finished running all the background updates, otherwise False
|
||||||
"""
|
"""
|
||||||
if not self._background_update_queue:
|
|
||||||
updates = await self.db.simple_select_list(
|
def get_background_updates_txn(txn):
|
||||||
"background_updates",
|
txn.execute(
|
||||||
keyvalues=None,
|
"""
|
||||||
retcols=("update_name", "depends_on"),
|
SELECT update_name, depends_on FROM background_updates
|
||||||
|
ORDER BY ordering, update_name
|
||||||
|
"""
|
||||||
)
|
)
|
||||||
in_flight = {update["update_name"] for update in updates}
|
return self.db.cursor_to_dict(txn)
|
||||||
for update in updates:
|
|
||||||
if update["depends_on"] not in in_flight:
|
|
||||||
self._background_update_queue.append(update["update_name"])
|
|
||||||
|
|
||||||
if not self._background_update_queue:
|
if not self._current_background_update:
|
||||||
# no work left to do
|
all_pending_updates = await self.db.runInteraction(
|
||||||
return None
|
"background_updates", get_background_updates_txn,
|
||||||
|
)
|
||||||
|
if not all_pending_updates:
|
||||||
|
# no work left to do
|
||||||
|
return True
|
||||||
|
|
||||||
# pop from the front, and add back to the back
|
# find the first update which isn't dependent on another one in the queue.
|
||||||
update_name = self._background_update_queue.pop(0)
|
pending = {update["update_name"] for update in all_pending_updates}
|
||||||
self._background_update_queue.append(update_name)
|
for upd in all_pending_updates:
|
||||||
|
depends_on = upd["depends_on"]
|
||||||
|
if not depends_on or depends_on not in pending:
|
||||||
|
break
|
||||||
|
logger.info(
|
||||||
|
"Not starting on bg update %s until %s is done",
|
||||||
|
upd["update_name"],
|
||||||
|
depends_on,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# if we get to the end of that for loop, there is a problem
|
||||||
|
raise Exception(
|
||||||
|
"Unable to find a background update which doesn't depend on "
|
||||||
|
"another: dependency cycle?"
|
||||||
|
)
|
||||||
|
|
||||||
res = await self._do_background_update(update_name, desired_duration_ms)
|
self._current_background_update = upd["update_name"]
|
||||||
return res
|
|
||||||
|
|
||||||
async def _do_background_update(
|
await self._do_background_update(desired_duration_ms)
|
||||||
self, update_name: str, desired_duration_ms: float
|
return False
|
||||||
) -> int:
|
|
||||||
|
async def _do_background_update(self, desired_duration_ms: float) -> int:
|
||||||
|
update_name = self._current_background_update
|
||||||
logger.info("Starting update batch on background update '%s'", update_name)
|
logger.info("Starting update batch on background update '%s'", update_name)
|
||||||
|
|
||||||
update_handler = self._background_update_handlers[update_name]
|
update_handler = self._background_update_handlers[update_name]
|
||||||
|
@ -400,27 +416,6 @@ class BackgroundUpdater(object):
|
||||||
|
|
||||||
self.register_background_update_handler(update_name, updater)
|
self.register_background_update_handler(update_name, updater)
|
||||||
|
|
||||||
def start_background_update(self, update_name, progress):
|
|
||||||
"""Starts a background update running.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
update_name: The update to set running.
|
|
||||||
progress: The initial state of the progress of the update.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A deferred that completes once the task has been added to the
|
|
||||||
queue.
|
|
||||||
"""
|
|
||||||
# Clear the background update queue so that we will pick up the new
|
|
||||||
# task on the next iteration of do_background_update.
|
|
||||||
self._background_update_queue = []
|
|
||||||
progress_json = json.dumps(progress)
|
|
||||||
|
|
||||||
return self.db.simple_insert(
|
|
||||||
"background_updates",
|
|
||||||
{"update_name": update_name, "progress_json": progress_json},
|
|
||||||
)
|
|
||||||
|
|
||||||
def _end_background_update(self, update_name):
|
def _end_background_update(self, update_name):
|
||||||
"""Removes a completed background update task from the queue.
|
"""Removes a completed background update task from the queue.
|
||||||
|
|
||||||
|
@ -429,9 +424,12 @@ class BackgroundUpdater(object):
|
||||||
Returns:
|
Returns:
|
||||||
A deferred that completes once the task is removed.
|
A deferred that completes once the task is removed.
|
||||||
"""
|
"""
|
||||||
self._background_update_queue = [
|
if update_name != self._current_background_update:
|
||||||
name for name in self._background_update_queue if name != update_name
|
raise Exception(
|
||||||
]
|
"Cannot end background update %s which isn't currently running"
|
||||||
|
% update_name
|
||||||
|
)
|
||||||
|
self._current_background_update = None
|
||||||
return self.db.simple_delete_one(
|
return self.db.simple_delete_one(
|
||||||
"background_updates", keyvalues={"update_name": update_name}
|
"background_updates", keyvalues={"update_name": update_name}
|
||||||
)
|
)
|
||||||
|
|
|
@ -165,7 +165,6 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||||
# the max stream_id across each set of duplicate entries
|
# the max stream_id across each set of duplicate entries
|
||||||
#
|
#
|
||||||
# maps (user_id, device_id) -> (stream_id, opentracing_context)
|
# maps (user_id, device_id) -> (stream_id, opentracing_context)
|
||||||
# as long as their stream_id does not match that of the last row
|
|
||||||
#
|
#
|
||||||
# opentracing_context contains the opentracing metadata for the request
|
# opentracing_context contains the opentracing metadata for the request
|
||||||
# that created the poke
|
# that created the poke
|
||||||
|
@ -270,7 +269,14 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||||
prev_id = yield self._get_last_device_update_for_remote_user(
|
prev_id = yield self._get_last_device_update_for_remote_user(
|
||||||
destination, user_id, from_stream_id
|
destination, user_id, from_stream_id
|
||||||
)
|
)
|
||||||
for device_id, device in iteritems(user_devices):
|
|
||||||
|
# make sure we go through the devices in stream order
|
||||||
|
device_ids = sorted(
|
||||||
|
user_devices.keys(), key=lambda i: query_map[(user_id, i)][0],
|
||||||
|
)
|
||||||
|
|
||||||
|
for device_id in device_ids:
|
||||||
|
device = user_devices[device_id]
|
||||||
stream_id, opentracing_context = query_map[(user_id, device_id)]
|
stream_id, opentracing_context = query_map[(user_id, device_id)]
|
||||||
result = {
|
result = {
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
|
|
|
@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Remember to update this number every time a change is made to database
|
# Remember to update this number every time a change is made to database
|
||||||
# schema files, so the users will be informed on server restarts.
|
# schema files, so the users will be informed on server restarts.
|
||||||
SCHEMA_VERSION = 57
|
SCHEMA_VERSION = 58
|
||||||
|
|
||||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
/* Copyright 2020 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* add an "ordering" column to background_updates, which can be used to sort them
|
||||||
|
to achieve some level of consistency. */
|
||||||
|
|
||||||
|
ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;
|
|
@ -14,6 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
from argparse import REMAINDER
|
||||||
from contextlib import redirect_stderr
|
from contextlib import redirect_stderr
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
|
|
||||||
|
@ -21,7 +22,7 @@ import pyperf
|
||||||
from synmark import make_reactor
|
from synmark import make_reactor
|
||||||
from synmark.suites import SUITES
|
from synmark.suites import SUITES
|
||||||
|
|
||||||
from twisted.internet.defer import ensureDeferred
|
from twisted.internet.defer import Deferred, ensureDeferred
|
||||||
from twisted.logger import globalLogBeginner, textFileLogObserver
|
from twisted.logger import globalLogBeginner, textFileLogObserver
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
|
@ -40,7 +41,8 @@ def make_test(main):
|
||||||
file_out = StringIO()
|
file_out = StringIO()
|
||||||
with redirect_stderr(file_out):
|
with redirect_stderr(file_out):
|
||||||
|
|
||||||
d = ensureDeferred(main(reactor, loops))
|
d = Deferred()
|
||||||
|
d.addCallback(lambda _: ensureDeferred(main(reactor, loops)))
|
||||||
|
|
||||||
def on_done(_):
|
def on_done(_):
|
||||||
if isinstance(_, Failure):
|
if isinstance(_, Failure):
|
||||||
|
@ -50,6 +52,7 @@ def make_test(main):
|
||||||
return _
|
return _
|
||||||
|
|
||||||
d.addBoth(on_done)
|
d.addBoth(on_done)
|
||||||
|
reactor.callWhenRunning(lambda: d.callback(True))
|
||||||
reactor.run()
|
reactor.run()
|
||||||
|
|
||||||
return d.result
|
return d.result
|
||||||
|
@ -62,11 +65,13 @@ if __name__ == "__main__":
|
||||||
def add_cmdline_args(cmd, args):
|
def add_cmdline_args(cmd, args):
|
||||||
if args.log:
|
if args.log:
|
||||||
cmd.extend(["--log"])
|
cmd.extend(["--log"])
|
||||||
|
cmd.extend(args.tests)
|
||||||
|
|
||||||
runner = pyperf.Runner(
|
runner = pyperf.Runner(
|
||||||
processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args
|
processes=3, min_time=1.5, show_name=True, add_cmdline_args=add_cmdline_args
|
||||||
)
|
)
|
||||||
runner.argparser.add_argument("--log", action="store_true")
|
runner.argparser.add_argument("--log", action="store_true")
|
||||||
|
runner.argparser.add_argument("tests", nargs=REMAINDER)
|
||||||
runner.parse_args()
|
runner.parse_args()
|
||||||
|
|
||||||
orig_loops = runner.args.loops
|
orig_loops = runner.args.loops
|
||||||
|
@ -79,6 +84,11 @@ if __name__ == "__main__":
|
||||||
)
|
)
|
||||||
setupdb()
|
setupdb()
|
||||||
|
|
||||||
|
if runner.args.tests:
|
||||||
|
SUITES = list(
|
||||||
|
filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES)
|
||||||
|
)
|
||||||
|
|
||||||
for suite, loops in SUITES:
|
for suite, loops in SUITES:
|
||||||
if loops:
|
if loops:
|
||||||
runner.args.loops = loops
|
runner.args.loops = loops
|
||||||
|
|
|
@ -1,3 +1,9 @@
|
||||||
from . import logging
|
from . import logging, lrucache, lrucache_evict
|
||||||
|
|
||||||
SUITES = [(logging, 1000), (logging, 10000), (logging, None)]
|
SUITES = [
|
||||||
|
(logging, 1000),
|
||||||
|
(logging, 10000),
|
||||||
|
(logging, None),
|
||||||
|
(lrucache, None),
|
||||||
|
(lrucache_evict, None),
|
||||||
|
]
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from pyperf import perf_counter
|
||||||
|
|
||||||
|
from synapse.util.caches.lrucache import LruCache
|
||||||
|
|
||||||
|
|
||||||
|
async def main(reactor, loops):
|
||||||
|
"""
|
||||||
|
Benchmark `loops` number of insertions into LruCache without eviction.
|
||||||
|
"""
|
||||||
|
cache = LruCache(loops)
|
||||||
|
|
||||||
|
start = perf_counter()
|
||||||
|
|
||||||
|
for i in range(loops):
|
||||||
|
cache[i] = True
|
||||||
|
|
||||||
|
end = perf_counter() - start
|
||||||
|
|
||||||
|
return end
|
|
@ -0,0 +1,35 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from pyperf import perf_counter
|
||||||
|
|
||||||
|
from synapse.util.caches.lrucache import LruCache
|
||||||
|
|
||||||
|
|
||||||
|
async def main(reactor, loops):
|
||||||
|
"""
|
||||||
|
Benchmark `loops` number of insertions into LruCache where half of them are
|
||||||
|
evicted.
|
||||||
|
"""
|
||||||
|
cache = LruCache(loops // 2)
|
||||||
|
|
||||||
|
start = perf_counter()
|
||||||
|
|
||||||
|
for i in range(loops):
|
||||||
|
cache[i] = True
|
||||||
|
|
||||||
|
end = perf_counter() - start
|
||||||
|
|
||||||
|
return end
|
|
@ -297,6 +297,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
|
||||||
c = edu["content"]
|
c = edu["content"]
|
||||||
if stream_id is not None:
|
if stream_id is not None:
|
||||||
self.assertEqual(c["prev_id"], [stream_id])
|
self.assertEqual(c["prev_id"], [stream_id])
|
||||||
|
self.assertGreaterEqual(c["stream_id"], stream_id)
|
||||||
stream_id = c["stream_id"]
|
stream_id = c["stream_id"]
|
||||||
devices = {edu["content"]["device_id"] for edu in self.edus}
|
devices = {edu["content"]["device_id"] for edu in self.edus}
|
||||||
self.assertEqual({"D1", "D2"}, devices)
|
self.assertEqual({"D1", "D2"}, devices)
|
||||||
|
@ -330,6 +331,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
|
||||||
c.items(),
|
c.items(),
|
||||||
{"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(),
|
{"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(),
|
||||||
)
|
)
|
||||||
|
self.assertGreaterEqual(c["stream_id"], stream_id)
|
||||||
stream_id = c["stream_id"]
|
stream_id = c["stream_id"]
|
||||||
devices = {edu["content"]["device_id"] for edu in self.edus}
|
devices = {edu["content"]["device_id"] for edu in self.edus}
|
||||||
self.assertEqual({"D1", "D2", "D3"}, devices)
|
self.assertEqual({"D1", "D2", "D3"}, devices)
|
||||||
|
@ -366,6 +368,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
|
||||||
self.assertEqual(edu["edu_type"], "m.device_list_update")
|
self.assertEqual(edu["edu_type"], "m.device_list_update")
|
||||||
c = edu["content"]
|
c = edu["content"]
|
||||||
self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else [])
|
self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else [])
|
||||||
|
if stream_id is not None:
|
||||||
|
self.assertGreaterEqual(c["stream_id"], stream_id)
|
||||||
stream_id = c["stream_id"]
|
stream_id = c["stream_id"]
|
||||||
devices = {edu["content"]["device_id"] for edu in self.edus}
|
devices = {edu["content"]["device_id"] for edu in self.edus}
|
||||||
self.assertEqual({"D1", "D2", "D3"}, devices)
|
self.assertEqual({"D1", "D2", "D3"}, devices)
|
||||||
|
@ -482,6 +486,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
|
||||||
}
|
}
|
||||||
|
|
||||||
self.assertLessEqual(expected.items(), content.items())
|
self.assertLessEqual(expected.items(), content.items())
|
||||||
|
if prev_stream_id is not None:
|
||||||
|
self.assertGreaterEqual(content["stream_id"], prev_stream_id)
|
||||||
return content["stream_id"]
|
return content["stream_id"]
|
||||||
|
|
||||||
def check_signing_key_update_txn(self, txn: JsonDict,) -> None:
|
def check_signing_key_update_txn(self, txn: JsonDict,) -> None:
|
||||||
|
|
|
@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
def prepare(self, reactor, clock, homeserver):
|
def prepare(self, reactor, clock, homeserver):
|
||||||
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
|
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
|
||||||
# the base test class should have run the real bg updates for us
|
# the base test class should have run the real bg updates for us
|
||||||
self.assertTrue(self.updates.has_completed_background_updates())
|
self.assertTrue(
|
||||||
|
self.get_success(self.updates.has_completed_background_updates())
|
||||||
|
)
|
||||||
|
|
||||||
self.update_handler = Mock()
|
self.update_handler = Mock()
|
||||||
self.updates.register_background_update_handler(
|
self.updates.register_background_update_handler(
|
||||||
|
@ -25,12 +27,20 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
# the target runtime for each bg update
|
# the target runtime for each bg update
|
||||||
target_background_update_duration_ms = 50000
|
target_background_update_duration_ms = 50000
|
||||||
|
|
||||||
|
store = self.hs.get_datastore()
|
||||||
|
self.get_success(
|
||||||
|
store.db.simple_insert(
|
||||||
|
"background_updates",
|
||||||
|
values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# first step: make a bit of progress
|
# first step: make a bit of progress
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update(progress, count):
|
def update(progress, count):
|
||||||
yield self.clock.sleep((count * duration_ms) / 1000)
|
yield self.clock.sleep((count * duration_ms) / 1000)
|
||||||
progress = {"my_key": progress["my_key"] + 1}
|
progress = {"my_key": progress["my_key"] + 1}
|
||||||
yield self.hs.get_datastore().db.runInteraction(
|
yield store.db.runInteraction(
|
||||||
"update_progress",
|
"update_progress",
|
||||||
self.updates._background_update_progress_txn,
|
self.updates._background_update_progress_txn,
|
||||||
"test_update",
|
"test_update",
|
||||||
|
@ -39,10 +49,6 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
return count
|
return count
|
||||||
|
|
||||||
self.update_handler.side_effect = update
|
self.update_handler.side_effect = update
|
||||||
|
|
||||||
self.get_success(
|
|
||||||
self.updates.start_background_update("test_update", {"my_key": 1})
|
|
||||||
)
|
|
||||||
self.update_handler.reset_mock()
|
self.update_handler.reset_mock()
|
||||||
res = self.get_success(
|
res = self.get_success(
|
||||||
self.updates.do_next_background_update(
|
self.updates.do_next_background_update(
|
||||||
|
@ -50,7 +56,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
),
|
),
|
||||||
by=0.1,
|
by=0.1,
|
||||||
)
|
)
|
||||||
self.assertIsNotNone(res)
|
self.assertFalse(res)
|
||||||
|
|
||||||
# on the first call, we should get run with the default background update size
|
# on the first call, we should get run with the default background update size
|
||||||
self.update_handler.assert_called_once_with(
|
self.update_handler.assert_called_once_with(
|
||||||
|
@ -73,7 +79,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
result = self.get_success(
|
result = self.get_success(
|
||||||
self.updates.do_next_background_update(target_background_update_duration_ms)
|
self.updates.do_next_background_update(target_background_update_duration_ms)
|
||||||
)
|
)
|
||||||
self.assertIsNotNone(result)
|
self.assertFalse(result)
|
||||||
self.update_handler.assert_called_once()
|
self.update_handler.assert_called_once()
|
||||||
|
|
||||||
# third step: we don't expect to be called any more
|
# third step: we don't expect to be called any more
|
||||||
|
@ -81,5 +87,5 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
result = self.get_success(
|
result = self.get_success(
|
||||||
self.updates.do_next_background_update(target_background_update_duration_ms)
|
self.updates.do_next_background_update(target_background_update_duration_ms)
|
||||||
)
|
)
|
||||||
self.assertIsNone(result)
|
self.assertTrue(result)
|
||||||
self.assertFalse(self.update_handler.called)
|
self.assertFalse(self.update_handler.called)
|
||||||
|
|
|
@ -40,6 +40,7 @@ from synapse.http.server import JsonResource
|
||||||
from synapse.http.site import SynapseRequest, SynapseSite
|
from synapse.http.site import SynapseRequest, SynapseSite
|
||||||
from synapse.logging.context import (
|
from synapse.logging.context import (
|
||||||
SENTINEL_CONTEXT,
|
SENTINEL_CONTEXT,
|
||||||
|
LoggingContext,
|
||||||
current_context,
|
current_context,
|
||||||
set_current_context,
|
set_current_context,
|
||||||
)
|
)
|
||||||
|
@ -419,15 +420,17 @@ class HomeserverTestCase(TestCase):
|
||||||
config_obj.parse_config_dict(config, "", "")
|
config_obj.parse_config_dict(config, "", "")
|
||||||
kwargs["config"] = config_obj
|
kwargs["config"] = config_obj
|
||||||
|
|
||||||
|
async def run_bg_updates():
|
||||||
|
with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
|
||||||
|
while not await stor.db.updates.has_completed_background_updates():
|
||||||
|
await stor.db.updates.do_next_background_update(1)
|
||||||
|
|
||||||
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
|
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
|
||||||
stor = hs.get_datastore()
|
stor = hs.get_datastore()
|
||||||
|
|
||||||
# Run the database background updates, when running against "master".
|
# Run the database background updates, when running against "master".
|
||||||
if hs.__class__.__name__ == "TestHomeServer":
|
if hs.__class__.__name__ == "TestHomeServer":
|
||||||
while not self.get_success(
|
self.get_success(run_bg_updates())
|
||||||
stor.db.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(stor.db.updates.do_next_background_update(1))
|
|
||||||
|
|
||||||
return hs
|
return hs
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue