Compare commits

...

28 Commits

Author SHA1 Message Date
Amber Brown 334bfdbc90
Add some benchmarks for LruCache (#6446) 2020-04-03 16:31:52 +01:00
Andrew Morgan 07b88c546d
Convert http.HTTPStatus objects to their int equivalent (#7188) 2020-04-03 14:26:07 +01:00
Richard van der Hoff 0f05fd1530
Reduce the number of calls to `resource.getrusage` (#7183)
Let's just call `getrusage` once on each logcontext change, rather than twice.
2020-04-03 13:21:30 +01:00
Richard van der Hoff fd4c975b5b
Merge pull request #7190 from matrix-org/rav/one_bg_update_at_a_time
Only run one background update at a time
2020-04-03 13:17:30 +01:00
Richard van der Hoff bae32740da
Remove some `run_in_background` calls in replication code (#7203)
By running this stuff with `run_in_background`, it won't be correctly reported
against the relevant CPU usage stats.

Fixes #7202
2020-04-03 12:29:30 +01:00
Richard van der Hoff 6dd6a3557c Merge branch 'master' into develop 2020-04-03 11:29:43 +01:00
Richard van der Hoff 0cbb4808ed Revert "Revert "Merge pull request #7153 from matrix-org/babolivier/sso_whitelist_login_fallback""
This reverts commit 0122ef1037.
2020-04-03 11:28:49 +01:00
Richard van der Hoff 14a8e71297 Revert "Revert "Improve the UX of the login fallback when using SSO (#7152)""
This reverts commit 8d4cbdeaa9.
2020-04-03 11:28:43 +01:00
Richard van der Hoff 883ac4b1bb Synapse 1.12.3 (2020-04-03)
===========================
 
 - Remove the the pin to Pillow 7.0 which was introduced in Synapse 1.12.2, and
 correctly fix the issue with building the Debian packages. ([\#7212](https://github.com/matrix-org/synapse/issues/7212))
 -----BEGIN PGP SIGNATURE-----
 
 iQEzBAABCAAdFiEEv27Axt/F4vrTL/8QOSor00I9eP8FAl6HCicACgkQOSor00I9
 eP+TYgf+P5+hlfR1xiYt8R+qzT2YIyBmYa6eGT8QoHMQx3ndMuYF2rCm/7i6JVVO
 +40kXIkRwRyge9dQIPaFNiWWGVQTMPROkLqB1Wo4CBK2fDLGqh2mEoqkS/65ZYFX
 8ktaB0i/iyhUQg6KQCQ701mLydikh3Lr+a2RnezWa8xGJpBFdA/MYYf+O5reiorg
 LeKyEgxVOBbURxFPNBV+eBv9+/bUYUIV+TijXK+n+mywsYa5MQpPKFWK6NzCR3O9
 7LqW0lInkKeZjusUZNZuuFYtbZqKiqQKomCAxyOCiUKerENXrCXxfKLrDSVlc7l+
 doyZEZA8uNXpiz7CF5DNrheEOxDzzQ==
 =ZWNz
 -----END PGP SIGNATURE-----

Merge tag 'v1.12.3'

Synapse 1.12.3 (2020-04-03)
===========================

- Remove the the pin to Pillow 7.0 which was introduced in Synapse 1.12.2, and
correctly fix the issue with building the Debian packages. ([\#7212](https://github.com/matrix-org/synapse/issues/7212))
2020-04-03 11:25:56 +01:00
Richard van der Hoff cb40b0cb80 Synapse 1.12.2 (2020-04-02)
===========================
 
 This release fixes [an
 issue](https://github.com/matrix-org/synapse/issues/7208) with building the
 debian packages.
 
 No other significant changes since 1.12.1.
 -----BEGIN PGP SIGNATURE-----
 
 iQJHBAABCAAxFiEEgQG31Z317NrSMt0QiISIDS7+X/QFAl6GKjQTHGFuZHJld0Bh
 bW9yZ2FuLnh5egAKCRCIhIgNLv5f9IDlD/4901bArTyjasnD3tvbaf3N4Z8oatPu
 bSn2AZv9rnhiPjqLnqs4EWkFihRbOe2jM3DQ/j2h8LTxBSSTxUY8LQHi94PtcMFc
 o2Fj6Bd8UOLJPep5xdGbEOlgeqTkAxiMQpghNFP5ptmLEba7OdHDugJaF6yLDdSW
 TtX6W9PAukHuP49EvUMdfORBGLxC9nyUU0uIha0iuDZBsV/MRmlDQVhQ2UTZY9NU
 ayiEvgHH0jVw7Hy3O7kI8dFuvgAVIqefYdZnwdB71zgujNQs+/IJQnTdcCwc/qFB
 2DnJqQRJDQ5fB0IfE3uG+24vTwNx6yKwGETENIMUd1mhlh9nX9Jx66zlEjeupA2Q
 G0UxUVWLYpboP94cMI4voxpq0SM3DtJP0fZRiGkId3njjv4icIX7nqUeasF5MkVC
 i7/6OOIAYpLekmyVVsN/gyYA1W59Kr/rEyi29lThzdAwnFwZOnW2QvEjaERPQ91t
 aQJ0n92SkMW14MY2JEcu5dHSI806eFGAPJoiWFk/O/2ez2Lt3dnCjfx/DZwEvuAx
 OyFsEnLWAaJsx7rYIinoHH5zepI2ixk0PyE1IbyZyoVnUqlDsi7nl4w4ynhMu6F2
 OjyJgI2qiNqnTJFpYaHDpFqrZK6TSm1oyQOWZlHCj79YoqC7IigZzrKzgjS4A2d0
 qzgHVUxnivf5kA==
 =Uerf
 -----END PGP SIGNATURE-----

Merge tag 'v1.12.2'

Synapse 1.12.2 (2020-04-02)
===========================

This release fixes [an
issue](https://github.com/matrix-org/synapse/issues/7208) with building the
debian packages.

No other significant changes since 1.12.1.
2020-04-03 11:25:42 +01:00
Richard van der Hoff 0122ef1037 Revert "Merge pull request #7153 from matrix-org/babolivier/sso_whitelist_login_fallback"
This was incorrectly merged to master.

This reverts commit 319c41f573, reversing
changes made to 229eb81498.
2020-04-03 11:17:39 +01:00
Richard van der Hoff 8d4cbdeaa9 Revert "Improve the UX of the login fallback when using SSO (#7152)"
This was incorrectly merged to `master` instead of develop.

This reverts commit 90246344e3.
2020-04-03 11:16:41 +01:00
Richard van der Hoff 553c8a9b6b tweak changelog 2020-04-03 11:00:57 +01:00
Richard van der Hoff 29ce90358c 1.12.3 2020-04-03 10:57:07 +01:00
Richard van der Hoff fcc2de7a0c Update docstring per review comments 2020-04-03 10:51:32 +01:00
Richard van der Hoff daa1ac89a0
Fix device list update stream ids going backward (#7158)
Occasionally we could get a federation device list update transaction which
looked like:

```
[
    {'edu_type': 'm.device_list_update', 'content': {'user_id': '@user:test', 'device_id': 'D2', 'prev_id': [], 'stream_id': 12, 'deleted': True}},
    {'edu_type': 'm.device_list_update', 'content': {'user_id': '@user:test', 'device_id': 'D1', 'prev_id': [12], 'stream_id': 11, 'deleted': True}},
    {'edu_type': 'm.device_list_update', 'content': {'user_id': '@user:test', 'device_id': 'D3', 'prev_id': [11], 'stream_id': 13, 'deleted': True}}
]
```

Having `stream_ids` which are lower than `prev_ids` looks odd. It might work
(I'm not actually sure), but in any case it doesn't seem like a reasonable
thing to expect other implementations to support.
2020-04-03 10:40:22 +01:00
Richard van der Hoff 6d7cec7a57
Fix the debian build in a better way. (#7212) 2020-04-03 10:23:36 +01:00
Andrew Morgan f7d6e849b3 Fix changelog wording 2020-04-02 19:08:06 +01:00
Andrew Morgan 08edefe694 1.12.2 2020-04-02 19:02:45 +01:00
Andrew Morgan ec56620ff6 Pin Pillow>=4.3.0,<7.1.0 to fix dep issue 2020-04-02 18:58:08 +01:00
Andrew Morgan b730480abb 1.12.1 2020-04-02 18:57:31 +01:00
Richard van der Hoff af47264b78
review comment 2020-04-02 12:04:55 +01:00
Richard van der Hoff b413ab8aa6 changelog 2020-03-31 17:44:36 +01:00
Richard van der Hoff 7b608cf468 Only run one background update at a time 2020-03-31 17:43:58 +01:00
Richard van der Hoff b4c2234232 Make do_next_background_update return a bool
returning a None or an int that we don't use is confusing.
2020-03-31 17:43:58 +01:00
Richard van der Hoff 51f4d52cb4 Set a logging context while running the bg updates
This mostly just reduces the amount of "running from sentinel context" spam
during unittest setup.
2020-03-31 17:43:58 +01:00
Richard van der Hoff 26d17b9bdc Make `has_completed_background_updates` async
(Almost) everywhere that uses it is happy with an awaitable.
2020-03-31 17:43:58 +01:00
Richard van der Hoff cfe8c8ab8e Remove unused `start_background_update`
This was only used in a unit test, so let's just inline it in the test.
2020-03-31 17:24:06 +01:00
24 changed files with 349 additions and 134 deletions

View File

@ -6,6 +6,23 @@ Next version
configuration then this template will need to be duplicated into that configuration then this template will need to be duplicated into that
directory. directory.
Synapse 1.12.3 (2020-04-03)
===========================
- Remove the the pin to Pillow 7.0 which was introduced in Synapse 1.12.2, and
correctly fix the issue with building the Debian packages. ([\#7212](https://github.com/matrix-org/synapse/issues/7212))
Synapse 1.12.2 (2020-04-02)
===========================
This release works around [an
issue](https://github.com/matrix-org/synapse/issues/7208) with building the
debian packages.
No other significant changes since 1.12.1.
>>>>>>> master
Synapse 1.12.1 (2020-04-02) Synapse 1.12.1 (2020-04-02)
=========================== ===========================

1
changelog.d/6446.misc Normal file
View File

@ -0,0 +1 @@
Add benchmarks for LruCache.

1
changelog.d/7158.misc Normal file
View File

@ -0,0 +1 @@
Fix device list update stream ids going backward.

1
changelog.d/7183.misc Normal file
View File

@ -0,0 +1 @@
Clean up some LoggingContext code.

1
changelog.d/7188.misc Normal file
View File

@ -0,0 +1 @@
Fix consistency of HTTP status codes reported in log lines.

1
changelog.d/7190.misc Normal file
View File

@ -0,0 +1 @@
Only run one background database update at a time.

1
changelog.d/7203.bugfix Normal file
View File

@ -0,0 +1 @@
Fix some worker-mode replication handling not being correctly recorded in CPU usage stats.

17
debian/changelog vendored
View File

@ -1,3 +1,20 @@
matrix-synapse-py3 (1.12.3) stable; urgency=medium
[ Richard van der Hoff ]
* Update the Debian build scripts to handle the new installation paths
for the support libraries introduced by Pillow 7.1.1.
[ Synapse Packaging team ]
* New synapse release 1.12.3.
-- Synapse Packaging team <packages@matrix.org> Fri, 03 Apr 2020 10:55:03 +0100
matrix-synapse-py3 (1.12.2) stable; urgency=medium
* New synapse release 1.12.2.
-- Synapse Packaging team <packages@matrix.org> Mon, 02 Apr 2020 19:02:17 +0000
matrix-synapse-py3 (1.12.1) stable; urgency=medium matrix-synapse-py3 (1.12.1) stable; urgency=medium
* New synapse release 1.12.1. * New synapse release 1.12.1.

33
debian/rules vendored
View File

@ -15,17 +15,38 @@ override_dh_installinit:
# we don't really want to strip the symbols from our object files. # we don't really want to strip the symbols from our object files.
override_dh_strip: override_dh_strip:
# dh_shlibdeps calls dpkg-shlibdeps, which finds all the binary files
# (executables and shared libs) in the package, and looks for the shared
# libraries that they depend on. It then adds a dependency on the package that
# contains that library to the package.
#
# We make two modifications to that process...
#
override_dh_shlibdeps: override_dh_shlibdeps:
# make the postgres package's dependencies a recommendation # Firstly, postgres is not a hard dependency for us, so we want to make
# rather than a hard dependency. # the things that psycopg2 depends on (such as libpq) be
# recommendations rather than hard dependencies. We do so by
# running dpkg-shlibdeps manually on psycopg2's libs.
#
find debian/$(PACKAGE_NAME)/ -path '*/site-packages/psycopg2/*.so' | \ find debian/$(PACKAGE_NAME)/ -path '*/site-packages/psycopg2/*.so' | \
xargs dpkg-shlibdeps -Tdebian/$(PACKAGE_NAME).substvars \ xargs dpkg-shlibdeps -Tdebian/$(PACKAGE_NAME).substvars \
-pshlibs1 -dRecommends -pshlibs1 -dRecommends
# all the other dependencies can be normal 'Depends' requirements, # secondly, we exclude PIL's libraries from the process. They are known
# except for PIL's, which is self-contained and which confuses # to be self-contained, but they have interdependencies and
# dpkg-shlibdeps. # dpkg-shlibdeps doesn't know how to resolve them.
dh_shlibdeps -X site-packages/PIL/.libs -X site-packages/psycopg2 #
# As of Pillow 7.1.0, these libraries are in
# site-packages/Pillow.libs. Previously, they were in
# site-packages/PIL/.libs.
#
# (we also need to exclude psycopg2, of course, since we've already
# dealt with that.)
#
dh_shlibdeps \
-X site-packages/PIL/.libs \
-X site-packages/Pillow.libs \
-X site-packages/psycopg2
override_dh_virtualenv: override_dh_virtualenv:
./debian/build_virtualenv ./debian/build_virtualenv

View File

@ -36,7 +36,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.12.1" __version__ = "1.12.3"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View File

@ -86,7 +86,14 @@ class CodeMessageException(RuntimeError):
def __init__(self, code, msg): def __init__(self, code, msg):
super(CodeMessageException, self).__init__("%d: %s" % (code, msg)) super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
self.code = code
# Some calls to this method pass instances of http.HTTPStatus for `code`.
# While HTTPStatus is a subclass of int, it has magic __str__ methods
# which emit `HTTPStatus.FORBIDDEN` when converted to a str, instead of `403`.
# This causes inconsistency in our log lines.
#
# To eliminate this behaviour, we convert them to their integer equivalents here.
self.code = int(code)
self.msg = msg self.msg = msg

View File

@ -42,7 +42,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
@ -635,7 +635,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
await super(GenericWorkerReplicationHandler, self).on_rdata( await super(GenericWorkerReplicationHandler, self).on_rdata(
stream_name, token, rows stream_name, token, rows
) )
run_in_background(self.process_and_notify, stream_name, token, rows) await self.process_and_notify(stream_name, token, rows)
def get_streams_to_replicate(self): def get_streams_to_replicate(self):
args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate() args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
@ -650,7 +650,9 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
async def process_and_notify(self, stream_name, token, rows): async def process_and_notify(self, stream_name, token, rows):
try: try:
if self.send_handler: if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows) await self.send_handler.process_replication_rows(
stream_name, token, rows
)
if stream_name == EventsStream.NAME: if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so # We shouldn't get multiple rows per token for events stream, so
@ -782,12 +784,12 @@ class FederationSenderHandler(object):
def stream_positions(self): def stream_positions(self):
return {"federation": self.federation_position} return {"federation": self.federation_position}
def process_replication_rows(self, stream_name, token, rows): async def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g. # The federation stream contains things that we want to send out, e.g.
# presence, typing, etc. # presence, typing, etc.
if stream_name == "federation": if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows) send_queue.process_rows_for_federation(self.federation_sender, rows)
run_in_background(self.update_token, token) await self.update_token(token)
# We also need to poke the federation sender when new events happen # We also need to poke the federation sender when new events happen
elif stream_name == "events": elif stream_name == "events":
@ -795,9 +797,7 @@ class FederationSenderHandler(object):
# ... and when new receipts happen # ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME: elif stream_name == ReceiptsStream.NAME:
run_as_background_process( await self._on_new_receipts(rows)
"process_receipts_for_federation", self._on_new_receipts, rows
)
# ... as well as device updates and messages # ... as well as device updates and messages
elif stream_name == DeviceListsStream.NAME: elif stream_name == DeviceListsStream.NAME:

View File

@ -51,7 +51,7 @@ try:
is_thread_resource_usage_supported = True is_thread_resource_usage_supported = True
def get_thread_resource_usage(): def get_thread_resource_usage() -> "Optional[resource._RUsage]":
return resource.getrusage(RUSAGE_THREAD) return resource.getrusage(RUSAGE_THREAD)
@ -60,7 +60,7 @@ except Exception:
# won't track resource usage. # won't track resource usage.
is_thread_resource_usage_supported = False is_thread_resource_usage_supported = False
def get_thread_resource_usage(): def get_thread_resource_usage() -> "Optional[resource._RUsage]":
return None return None
@ -201,10 +201,10 @@ class _Sentinel(object):
record["request"] = None record["request"] = None
record["scope"] = None record["scope"] = None
def start(self): def start(self, rusage: "Optional[resource._RUsage]"):
pass pass
def stop(self): def stop(self, rusage: "Optional[resource._RUsage]"):
pass pass
def add_database_transaction(self, duration_sec): def add_database_transaction(self, duration_sec):
@ -261,7 +261,7 @@ class LoggingContext(object):
# The thread resource usage when the logcontext became active. None # The thread resource usage when the logcontext became active. None
# if the context is not currently active. # if the context is not currently active.
self.usage_start = None self.usage_start = None # type: Optional[resource._RUsage]
self.main_thread = get_thread_id() self.main_thread = get_thread_id()
self.request = None self.request = None
@ -336,7 +336,17 @@ class LoggingContext(object):
record["request"] = self.request record["request"] = self.request
record["scope"] = self.scope record["scope"] = self.scope
def start(self) -> None: def start(self, rusage: "Optional[resource._RUsage]") -> None:
"""
Record that this logcontext is currently running.
This should not be called directly: use set_current_context
Args:
rusage: the resources used by the current thread, at the point of
switching to this logcontext. May be None if this platform doesn't
support getrusuage.
"""
if get_thread_id() != self.main_thread: if get_thread_id() != self.main_thread:
logger.warning("Started logcontext %s on different thread", self) logger.warning("Started logcontext %s on different thread", self)
return return
@ -349,36 +359,48 @@ class LoggingContext(object):
if self.usage_start: if self.usage_start:
logger.warning("Re-starting already-active log context %s", self) logger.warning("Re-starting already-active log context %s", self)
else: else:
self.usage_start = get_thread_resource_usage() self.usage_start = rusage
def stop(self) -> None: def stop(self, rusage: "Optional[resource._RUsage]") -> None:
if get_thread_id() != self.main_thread: """
logger.warning("Stopped logcontext %s on different thread", self) Record that this logcontext is no longer running.
return
# When we stop, let's record the cpu used since we started This should not be called directly: use set_current_context
if not self.usage_start:
# Log a warning on platforms that support thread usage tracking Args:
if is_thread_resource_usage_supported: rusage: the resources used by the current thread, at the point of
switching away from this logcontext. May be None if this platform
doesn't support getrusuage.
"""
try:
if get_thread_id() != self.main_thread:
logger.warning("Stopped logcontext %s on different thread", self)
return
if not rusage:
return
# Record the cpu used since we started
if not self.usage_start:
logger.warning( logger.warning(
"Called stop on logcontext %s without calling start", self "Called stop on logcontext %s without recording a start rusage",
self,
) )
return return
utime_delta, stime_delta = self._get_cputime() utime_delta, stime_delta = self._get_cputime(rusage)
self._resource_usage.ru_utime += utime_delta self._resource_usage.ru_utime += utime_delta
self._resource_usage.ru_stime += stime_delta self._resource_usage.ru_stime += stime_delta
self.usage_start = None # if we have a parent, pass our CPU usage stats on
if self.parent_context:
self.parent_context._resource_usage += self._resource_usage
# if we have a parent, pass our CPU usage stats on # reset them in case we get entered again
if self.parent_context is not None and hasattr( self._resource_usage.reset()
self.parent_context, "_resource_usage" finally:
): self.usage_start = None
self.parent_context._resource_usage += self._resource_usage
# reset them in case we get entered again
self._resource_usage.reset()
def get_resource_usage(self) -> ContextResourceUsage: def get_resource_usage(self) -> ContextResourceUsage:
"""Get resources used by this logcontext so far. """Get resources used by this logcontext so far.
@ -394,24 +416,24 @@ class LoggingContext(object):
# can include resource usage so far. # can include resource usage so far.
is_main_thread = get_thread_id() == self.main_thread is_main_thread = get_thread_id() == self.main_thread
if self.usage_start and is_main_thread: if self.usage_start and is_main_thread:
utime_delta, stime_delta = self._get_cputime() rusage = get_thread_resource_usage()
assert rusage is not None
utime_delta, stime_delta = self._get_cputime(rusage)
res.ru_utime += utime_delta res.ru_utime += utime_delta
res.ru_stime += stime_delta res.ru_stime += stime_delta
return res return res
def _get_cputime(self) -> Tuple[float, float]: def _get_cputime(self, current: "resource._RUsage") -> Tuple[float, float]:
"""Get the cpu usage time so far """Get the cpu usage time between start() and the given rusage
Args:
rusage: the current resource usage
Returns: Tuple[float, float]: seconds in user mode, seconds in system mode Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
""" """
assert self.usage_start is not None assert self.usage_start is not None
current = get_thread_resource_usage()
# Indicate to mypy that we know that self.usage_start is None.
assert self.usage_start is not None
utime_delta = current.ru_utime - self.usage_start.ru_utime utime_delta = current.ru_utime - self.usage_start.ru_utime
stime_delta = current.ru_stime - self.usage_start.ru_stime stime_delta = current.ru_stime - self.usage_start.ru_stime
@ -547,9 +569,11 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
current = current_context() current = current_context()
if current is not context: if current is not context:
current.stop() rusage = get_thread_resource_usage()
current.stop(rusage)
_thread_local.current_context = context _thread_local.current_context = context
context.start() context.start(rusage)
return current return current

View File

@ -90,8 +90,10 @@ class BackgroundUpdater(object):
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.db = database self.db = database
# if a background update is currently running, its name.
self._current_background_update = None # type: Optional[str]
self._background_update_performance = {} self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {} self._background_update_handlers = {}
self._all_done = False self._all_done = False
@ -111,7 +113,7 @@ class BackgroundUpdater(object):
except Exception: except Exception:
logger.exception("Error doing update") logger.exception("Error doing update")
else: else:
if result is None: if result:
logger.info( logger.info(
"No more background updates to do." "No more background updates to do."
" Unscheduling background update task." " Unscheduling background update task."
@ -119,26 +121,25 @@ class BackgroundUpdater(object):
self._all_done = True self._all_done = True
return None return None
@defer.inlineCallbacks async def has_completed_background_updates(self) -> bool:
def has_completed_background_updates(self):
"""Check if all the background updates have completed """Check if all the background updates have completed
Returns: Returns:
Deferred[bool]: True if all background updates have completed True if all background updates have completed
""" """
# if we've previously determined that there is nothing left to do, that # if we've previously determined that there is nothing left to do, that
# is easy # is easy
if self._all_done: if self._all_done:
return True return True
# obviously, if we have things in our queue, we're not done. # obviously, if we are currently processing an update, we're not done.
if self._background_update_queue: if self._current_background_update:
return False return False
# otherwise, check if there are updates to be run. This is important, # otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates # as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen. # itself, but still wants to wait for them to happen.
updates = yield self.db.simple_select_onecol( updates = await self.db.simple_select_onecol(
"background_updates", "background_updates",
keyvalues=None, keyvalues=None,
retcol="1", retcol="1",
@ -153,11 +154,10 @@ class BackgroundUpdater(object):
async def has_completed_background_update(self, update_name) -> bool: async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running. """Check if the given background update has finished running.
""" """
if self._all_done: if self._all_done:
return True return True
if update_name in self._background_update_queue: if update_name == self._current_background_update:
return False return False
update_exists = await self.db.simple_select_one_onecol( update_exists = await self.db.simple_select_one_onecol(
@ -170,9 +170,7 @@ class BackgroundUpdater(object):
return not update_exists return not update_exists
async def do_next_background_update( async def do_next_background_update(self, desired_duration_ms: float) -> bool:
self, desired_duration_ms: float
) -> Optional[int]:
"""Does some amount of work on the next queued background update """Does some amount of work on the next queued background update
Returns once some amount of work is done. Returns once some amount of work is done.
@ -181,33 +179,51 @@ class BackgroundUpdater(object):
desired_duration_ms(float): How long we want to spend desired_duration_ms(float): How long we want to spend
updating. updating.
Returns: Returns:
None if there is no more work to do, otherwise an int True if we have finished running all the background updates, otherwise False
""" """
if not self._background_update_queue:
updates = await self.db.simple_select_list( def get_background_updates_txn(txn):
"background_updates", txn.execute(
keyvalues=None, """
retcols=("update_name", "depends_on"), SELECT update_name, depends_on FROM background_updates
ORDER BY ordering, update_name
"""
) )
in_flight = {update["update_name"] for update in updates} return self.db.cursor_to_dict(txn)
for update in updates:
if update["depends_on"] not in in_flight:
self._background_update_queue.append(update["update_name"])
if not self._background_update_queue: if not self._current_background_update:
# no work left to do all_pending_updates = await self.db.runInteraction(
return None "background_updates", get_background_updates_txn,
)
if not all_pending_updates:
# no work left to do
return True
# pop from the front, and add back to the back # find the first update which isn't dependent on another one in the queue.
update_name = self._background_update_queue.pop(0) pending = {update["update_name"] for update in all_pending_updates}
self._background_update_queue.append(update_name) for upd in all_pending_updates:
depends_on = upd["depends_on"]
if not depends_on or depends_on not in pending:
break
logger.info(
"Not starting on bg update %s until %s is done",
upd["update_name"],
depends_on,
)
else:
# if we get to the end of that for loop, there is a problem
raise Exception(
"Unable to find a background update which doesn't depend on "
"another: dependency cycle?"
)
res = await self._do_background_update(update_name, desired_duration_ms) self._current_background_update = upd["update_name"]
return res
async def _do_background_update( await self._do_background_update(desired_duration_ms)
self, update_name: str, desired_duration_ms: float return False
) -> int:
async def _do_background_update(self, desired_duration_ms: float) -> int:
update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name) logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name] update_handler = self._background_update_handlers[update_name]
@ -400,27 +416,6 @@ class BackgroundUpdater(object):
self.register_background_update_handler(update_name, updater) self.register_background_update_handler(update_name, updater)
def start_background_update(self, update_name, progress):
"""Starts a background update running.
Args:
update_name: The update to set running.
progress: The initial state of the progress of the update.
Returns:
A deferred that completes once the task has been added to the
queue.
"""
# Clear the background update queue so that we will pick up the new
# task on the next iteration of do_background_update.
self._background_update_queue = []
progress_json = json.dumps(progress)
return self.db.simple_insert(
"background_updates",
{"update_name": update_name, "progress_json": progress_json},
)
def _end_background_update(self, update_name): def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue. """Removes a completed background update task from the queue.
@ -429,9 +424,12 @@ class BackgroundUpdater(object):
Returns: Returns:
A deferred that completes once the task is removed. A deferred that completes once the task is removed.
""" """
self._background_update_queue = [ if update_name != self._current_background_update:
name for name in self._background_update_queue if name != update_name raise Exception(
] "Cannot end background update %s which isn't currently running"
% update_name
)
self._current_background_update = None
return self.db.simple_delete_one( return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name} "background_updates", keyvalues={"update_name": update_name}
) )

View File

@ -165,7 +165,6 @@ class DeviceWorkerStore(SQLBaseStore):
# the max stream_id across each set of duplicate entries # the max stream_id across each set of duplicate entries
# #
# maps (user_id, device_id) -> (stream_id, opentracing_context) # maps (user_id, device_id) -> (stream_id, opentracing_context)
# as long as their stream_id does not match that of the last row
# #
# opentracing_context contains the opentracing metadata for the request # opentracing_context contains the opentracing metadata for the request
# that created the poke # that created the poke
@ -270,7 +269,14 @@ class DeviceWorkerStore(SQLBaseStore):
prev_id = yield self._get_last_device_update_for_remote_user( prev_id = yield self._get_last_device_update_for_remote_user(
destination, user_id, from_stream_id destination, user_id, from_stream_id
) )
for device_id, device in iteritems(user_devices):
# make sure we go through the devices in stream order
device_ids = sorted(
user_devices.keys(), key=lambda i: query_map[(user_id, i)][0],
)
for device_id in device_ids:
device = user_devices[device_id]
stream_id, opentracing_context = query_map[(user_id, device_id)] stream_id, opentracing_context = query_map[(user_id, device_id)]
result = { result = {
"user_id": user_id, "user_id": user_id,

View File

@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database # Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts. # schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 57 SCHEMA_VERSION = 58
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@ -0,0 +1,19 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* add an "ordering" column to background_updates, which can be used to sort them
to achieve some level of consistency. */
ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import sys import sys
from argparse import REMAINDER
from contextlib import redirect_stderr from contextlib import redirect_stderr
from io import StringIO from io import StringIO
@ -21,7 +22,7 @@ import pyperf
from synmark import make_reactor from synmark import make_reactor
from synmark.suites import SUITES from synmark.suites import SUITES
from twisted.internet.defer import ensureDeferred from twisted.internet.defer import Deferred, ensureDeferred
from twisted.logger import globalLogBeginner, textFileLogObserver from twisted.logger import globalLogBeginner, textFileLogObserver
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -40,7 +41,8 @@ def make_test(main):
file_out = StringIO() file_out = StringIO()
with redirect_stderr(file_out): with redirect_stderr(file_out):
d = ensureDeferred(main(reactor, loops)) d = Deferred()
d.addCallback(lambda _: ensureDeferred(main(reactor, loops)))
def on_done(_): def on_done(_):
if isinstance(_, Failure): if isinstance(_, Failure):
@ -50,6 +52,7 @@ def make_test(main):
return _ return _
d.addBoth(on_done) d.addBoth(on_done)
reactor.callWhenRunning(lambda: d.callback(True))
reactor.run() reactor.run()
return d.result return d.result
@ -62,11 +65,13 @@ if __name__ == "__main__":
def add_cmdline_args(cmd, args): def add_cmdline_args(cmd, args):
if args.log: if args.log:
cmd.extend(["--log"]) cmd.extend(["--log"])
cmd.extend(args.tests)
runner = pyperf.Runner( runner = pyperf.Runner(
processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args processes=3, min_time=1.5, show_name=True, add_cmdline_args=add_cmdline_args
) )
runner.argparser.add_argument("--log", action="store_true") runner.argparser.add_argument("--log", action="store_true")
runner.argparser.add_argument("tests", nargs=REMAINDER)
runner.parse_args() runner.parse_args()
orig_loops = runner.args.loops orig_loops = runner.args.loops
@ -79,6 +84,11 @@ if __name__ == "__main__":
) )
setupdb() setupdb()
if runner.args.tests:
SUITES = list(
filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES)
)
for suite, loops in SUITES: for suite, loops in SUITES:
if loops: if loops:
runner.args.loops = loops runner.args.loops = loops

View File

@ -1,3 +1,9 @@
from . import logging from . import logging, lrucache, lrucache_evict
SUITES = [(logging, 1000), (logging, 10000), (logging, None)] SUITES = [
(logging, 1000),
(logging, 10000),
(logging, None),
(lrucache, None),
(lrucache_evict, None),
]

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyperf import perf_counter
from synapse.util.caches.lrucache import LruCache
async def main(reactor, loops):
"""
Benchmark `loops` number of insertions into LruCache without eviction.
"""
cache = LruCache(loops)
start = perf_counter()
for i in range(loops):
cache[i] = True
end = perf_counter() - start
return end

View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyperf import perf_counter
from synapse.util.caches.lrucache import LruCache
async def main(reactor, loops):
"""
Benchmark `loops` number of insertions into LruCache where half of them are
evicted.
"""
cache = LruCache(loops // 2)
start = perf_counter()
for i in range(loops):
cache[i] = True
end = perf_counter() - start
return end

View File

@ -297,6 +297,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
c = edu["content"] c = edu["content"]
if stream_id is not None: if stream_id is not None:
self.assertEqual(c["prev_id"], [stream_id]) self.assertEqual(c["prev_id"], [stream_id])
self.assertGreaterEqual(c["stream_id"], stream_id)
stream_id = c["stream_id"] stream_id = c["stream_id"]
devices = {edu["content"]["device_id"] for edu in self.edus} devices = {edu["content"]["device_id"] for edu in self.edus}
self.assertEqual({"D1", "D2"}, devices) self.assertEqual({"D1", "D2"}, devices)
@ -330,6 +331,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
c.items(), c.items(),
{"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(), {"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(),
) )
self.assertGreaterEqual(c["stream_id"], stream_id)
stream_id = c["stream_id"] stream_id = c["stream_id"]
devices = {edu["content"]["device_id"] for edu in self.edus} devices = {edu["content"]["device_id"] for edu in self.edus}
self.assertEqual({"D1", "D2", "D3"}, devices) self.assertEqual({"D1", "D2", "D3"}, devices)
@ -366,6 +368,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
self.assertEqual(edu["edu_type"], "m.device_list_update") self.assertEqual(edu["edu_type"], "m.device_list_update")
c = edu["content"] c = edu["content"]
self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else []) self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else [])
if stream_id is not None:
self.assertGreaterEqual(c["stream_id"], stream_id)
stream_id = c["stream_id"] stream_id = c["stream_id"]
devices = {edu["content"]["device_id"] for edu in self.edus} devices = {edu["content"]["device_id"] for edu in self.edus}
self.assertEqual({"D1", "D2", "D3"}, devices) self.assertEqual({"D1", "D2", "D3"}, devices)
@ -482,6 +486,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
} }
self.assertLessEqual(expected.items(), content.items()) self.assertLessEqual(expected.items(), content.items())
if prev_stream_id is not None:
self.assertGreaterEqual(content["stream_id"], prev_stream_id)
return content["stream_id"] return content["stream_id"]
def check_signing_key_update_txn(self, txn: JsonDict,) -> None: def check_signing_key_update_txn(self, txn: JsonDict,) -> None:

View File

@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver): def prepare(self, reactor, clock, homeserver):
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
# the base test class should have run the real bg updates for us # the base test class should have run the real bg updates for us
self.assertTrue(self.updates.has_completed_background_updates()) self.assertTrue(
self.get_success(self.updates.has_completed_background_updates())
)
self.update_handler = Mock() self.update_handler = Mock()
self.updates.register_background_update_handler( self.updates.register_background_update_handler(
@ -25,12 +27,20 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
# the target runtime for each bg update # the target runtime for each bg update
target_background_update_duration_ms = 50000 target_background_update_duration_ms = 50000
store = self.hs.get_datastore()
self.get_success(
store.db.simple_insert(
"background_updates",
values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
)
)
# first step: make a bit of progress # first step: make a bit of progress
@defer.inlineCallbacks @defer.inlineCallbacks
def update(progress, count): def update(progress, count):
yield self.clock.sleep((count * duration_ms) / 1000) yield self.clock.sleep((count * duration_ms) / 1000)
progress = {"my_key": progress["my_key"] + 1} progress = {"my_key": progress["my_key"] + 1}
yield self.hs.get_datastore().db.runInteraction( yield store.db.runInteraction(
"update_progress", "update_progress",
self.updates._background_update_progress_txn, self.updates._background_update_progress_txn,
"test_update", "test_update",
@ -39,10 +49,6 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
return count return count
self.update_handler.side_effect = update self.update_handler.side_effect = update
self.get_success(
self.updates.start_background_update("test_update", {"my_key": 1})
)
self.update_handler.reset_mock() self.update_handler.reset_mock()
res = self.get_success( res = self.get_success(
self.updates.do_next_background_update( self.updates.do_next_background_update(
@ -50,7 +56,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
), ),
by=0.1, by=0.1,
) )
self.assertIsNotNone(res) self.assertFalse(res)
# on the first call, we should get run with the default background update size # on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with( self.update_handler.assert_called_once_with(
@ -73,7 +79,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
result = self.get_success( result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms) self.updates.do_next_background_update(target_background_update_duration_ms)
) )
self.assertIsNotNone(result) self.assertFalse(result)
self.update_handler.assert_called_once() self.update_handler.assert_called_once()
# third step: we don't expect to be called any more # third step: we don't expect to be called any more
@ -81,5 +87,5 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
result = self.get_success( result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms) self.updates.do_next_background_update(target_background_update_duration_ms)
) )
self.assertIsNone(result) self.assertTrue(result)
self.assertFalse(self.update_handler.called) self.assertFalse(self.update_handler.called)

View File

@ -40,6 +40,7 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest, SynapseSite from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import ( from synapse.logging.context import (
SENTINEL_CONTEXT, SENTINEL_CONTEXT,
LoggingContext,
current_context, current_context,
set_current_context, set_current_context,
) )
@ -419,15 +420,17 @@ class HomeserverTestCase(TestCase):
config_obj.parse_config_dict(config, "", "") config_obj.parse_config_dict(config, "", "")
kwargs["config"] = config_obj kwargs["config"] = config_obj
async def run_bg_updates():
with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
while not await stor.db.updates.has_completed_background_updates():
await stor.db.updates.do_next_background_update(1)
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs) hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore() stor = hs.get_datastore()
# Run the database background updates, when running against "master". # Run the database background updates, when running against "master".
if hs.__class__.__name__ == "TestHomeServer": if hs.__class__.__name__ == "TestHomeServer":
while not self.get_success( self.get_success(run_bg_updates())
stor.db.updates.has_completed_background_updates()
):
self.get_success(stor.db.updates.do_next_background_update(1))
return hs return hs