Compare commits
15 Commits
05ee048f2c
...
e3debf9682
Author | SHA1 | Date |
---|---|---|
Erik Johnston | e3debf9682 | |
Erik Johnston | ec10bdd32b | |
Patrick Cloke | 62894673e6 | |
Richard van der Hoff | 462e681c79 | |
Richard van der Hoff | 9de6e9e249 | |
Richard van der Hoff | 8672642225 | |
Richard van der Hoff | 6a8fd03acb | |
Richard van der Hoff | f6c526ce67 | |
Richard van der Hoff | 73d93039ff | |
Patrick Cloke | 3bd2a2cbb1 | |
Erik Johnston | 695240d34a | |
Patrick Cloke | 34ff8da83b | |
Richard van der Hoff | 3bd3707cb9 | |
Erik Johnston | 6c5d5e507e | |
Patrick Cloke | 61aaf36a1c |
17
CHANGES.md
17
CHANGES.md
|
@ -1,3 +1,20 @@
|
|||
Synapse 1.21.0rc2 (2020-10-02)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Convert additional templates from inline HTML to Jinja2 templates. ([\#8444](https://github.com/matrix-org/synapse/issues/8444))
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a regression in v1.21.0rc1 which broke thumbnails of remote media. ([\#8438](https://github.com/matrix-org/synapse/issues/8438))
|
||||
- Do not expose the experimental `uk.half-shot.msc2778.login.application_service` flow in the login API, which caused a compatibility problem with Element iOS. ([\#8440](https://github.com/matrix-org/synapse/issues/8440))
|
||||
- Fix malformed log line in new federation "catch up" logic. ([\#8442](https://github.com/matrix-org/synapse/issues/8442))
|
||||
- Fix DB query on startup for negative streams which caused long start up times. Introduced in [\#8374](https://github.com/matrix-org/synapse/issues/8374). ([\#8447](https://github.com/matrix-org/synapse/issues/8447))
|
||||
|
||||
|
||||
Synapse 1.21.0rc1 (2020-10-01)
|
||||
==============================
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Allow running background tasks in a separate worker process.
|
|
@ -0,0 +1 @@
|
|||
Add unit test for event persister sharding.
|
|
@ -0,0 +1 @@
|
|||
Configure `public_baseurl` when using demo scripts.
|
|
@ -0,0 +1 @@
|
|||
Add SQL logging on queries that happen during startup.
|
|
@ -0,0 +1 @@
|
|||
Speed up unit tests when using PostgreSQL.
|
|
@ -30,6 +30,8 @@ for port in 8080 8081 8082; do
|
|||
if ! grep -F "Customisation made by demo/start.sh" -q $DIR/etc/$port.config; then
|
||||
printf '\n\n# Customisation made by demo/start.sh\n' >> $DIR/etc/$port.config
|
||||
|
||||
echo "public_baseurl: http://localhost:$port/" >> $DIR/etc/$port.config
|
||||
|
||||
echo 'enable_registration: true' >> $DIR/etc/$port.config
|
||||
|
||||
# Warning, this heredoc depends on the interaction of tabs and spaces. Please don't
|
||||
|
|
|
@ -2504,6 +2504,11 @@ opentracing:
|
|||
# events: worker1
|
||||
# typing: worker1
|
||||
|
||||
# The worker that is used to run background tasks (e.g. cleaning up expired
|
||||
# data). If not provided this defaults to the main process.
|
||||
#
|
||||
#run_background_tasks_on: worker1
|
||||
|
||||
|
||||
# Configuration for Redis when using workers. This *must* be enabled when
|
||||
# using workers (unless using old style direct TCP configuration).
|
||||
|
|
|
@ -319,6 +319,23 @@ stream_writers:
|
|||
events: event_persister1
|
||||
```
|
||||
|
||||
#### Background tasks
|
||||
|
||||
There is also *experimental* support for moving background tasks to a separate
|
||||
worker. Background tasks are run periodically or started via replication. Exactly
|
||||
which tasks are configured to run depends on your Synapse configuration (e.g. if
|
||||
stats is enabled).
|
||||
|
||||
To enable this, the worker must have a `worker_name` and can be configured to run
|
||||
background tasks. For example, to move background tasks to a dedicated worker,
|
||||
the shared configuration would include:
|
||||
|
||||
```yaml
|
||||
run_background_tasks_on: background_worker
|
||||
```
|
||||
|
||||
You might also wish to investigate the `update_user_directory` and
|
||||
`media_instance_running_background_jobs` settings.
|
||||
|
||||
### `synapse.app.pusher`
|
||||
|
||||
|
|
3
mypy.ini
3
mypy.ini
|
@ -143,3 +143,6 @@ ignore_missing_imports = True
|
|||
|
||||
[mypy-nacl.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-hiredis]
|
||||
ignore_missing_imports = True
|
||||
|
|
|
@ -489,7 +489,7 @@ class Porter(object):
|
|||
|
||||
hs = MockHomeserver(self.hs_config)
|
||||
|
||||
with make_conn(db_config, engine) as db_conn:
|
||||
with make_conn(db_config, engine, "portdb") as db_conn:
|
||||
engine.check_database(
|
||||
db_conn, allow_outdated_version=allow_outdated_version
|
||||
)
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
"""Contains *incomplete* type hints for txredisapi.
|
||||
"""
|
||||
|
||||
from typing import List, Optional, Union
|
||||
from typing import List, Optional, Union, Type
|
||||
|
||||
class RedisProtocol:
|
||||
def publish(self, channel: str, message: bytes): ...
|
||||
|
@ -42,3 +42,21 @@ def lazyConnection(
|
|||
|
||||
class SubscriberFactory:
|
||||
def buildProtocol(self, addr): ...
|
||||
|
||||
class ConnectionHandler: ...
|
||||
|
||||
class RedisFactory:
|
||||
continueTrying: bool
|
||||
handler: RedisProtocol
|
||||
def __init__(
|
||||
self,
|
||||
uuid: str,
|
||||
dbid: Optional[int],
|
||||
poolsize: int,
|
||||
isLazy: bool = False,
|
||||
handler: Type = ConnectionHandler,
|
||||
charset: str = "utf-8",
|
||||
password: Optional[str] = None,
|
||||
replyTimeout: Optional[int] = None,
|
||||
convertNumbers: Optional[int] = True,
|
||||
): ...
|
||||
|
|
|
@ -48,7 +48,7 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.21.0rc1"
|
||||
__version__ = "1.21.0rc2"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
|
|
@ -28,6 +28,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
|
|||
|
||||
import synapse
|
||||
from synapse.app import check_bind_error
|
||||
from synapse.app.phone_stats_home import start_phone_stats_home
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
|
@ -271,9 +272,19 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
|||
hs.get_datastore().db_pool.start_profiling()
|
||||
hs.get_pusherpool().start()
|
||||
|
||||
# Log when we start the shut down process.
|
||||
hs.get_reactor().addSystemEventTrigger(
|
||||
"before", "shutdown", logger.info, "Shutting down..."
|
||||
)
|
||||
|
||||
setup_sentry(hs)
|
||||
setup_sdnotify(hs)
|
||||
|
||||
# If background tasks are running on the main process, start collecting the
|
||||
# phone home stats.
|
||||
if hs.config.run_background_tasks:
|
||||
start_phone_stats_home(hs)
|
||||
|
||||
# We now freeze all allocated objects in the hopes that (almost)
|
||||
# everything currently allocated are things that will be used for the
|
||||
# rest of time. Doing so means less work each GC (hopefully).
|
||||
|
|
|
@ -208,6 +208,7 @@ def start(config_options):
|
|||
|
||||
# Explicitly disable background processes
|
||||
config.update_user_directory = False
|
||||
config.run_background_tasks = False
|
||||
config.start_pushers = False
|
||||
config.send_federation = False
|
||||
|
||||
|
|
|
@ -128,11 +128,13 @@ from synapse.rest.key.v2 import KeyApiV2Resource
|
|||
from synapse.server import HomeServer, cache_in_self
|
||||
from synapse.storage.databases.main.censor_events import CensorEventsStore
|
||||
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
|
||||
from synapse.storage.databases.main.metrics import ServerMetricsStore
|
||||
from synapse.storage.databases.main.monthly_active_users import (
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
)
|
||||
from synapse.storage.databases.main.presence import UserPresenceState
|
||||
from synapse.storage.databases.main.search import SearchWorkerStore
|
||||
from synapse.storage.databases.main.stats import StatsStore
|
||||
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
|
||||
from synapse.storage.databases.main.user_directory import UserDirectoryStore
|
||||
from synapse.types import ReadReceipt
|
||||
|
@ -454,6 +456,7 @@ class GenericWorkerSlavedStore(
|
|||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
UserDirectoryStore,
|
||||
StatsStore,
|
||||
UIAuthWorkerStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
|
@ -476,6 +479,7 @@ class GenericWorkerSlavedStore(
|
|||
SlavedFilteringStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
MediaRepositoryStore,
|
||||
ServerMetricsStore,
|
||||
SearchWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
|
|
|
@ -17,14 +17,10 @@
|
|||
|
||||
import gc
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import resource
|
||||
import sys
|
||||
from typing import Iterable
|
||||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from twisted.application import service
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.python.failure import Failure
|
||||
|
@ -60,7 +56,6 @@ from synapse.http.server import (
|
|||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.python_dependencies import check_requirements
|
||||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||
|
@ -334,20 +329,6 @@ class SynapseHomeServer(HomeServer):
|
|||
logger.warning("Unrecognized listener type: %s", listener.type)
|
||||
|
||||
|
||||
# Gauges to expose monthly active user control metrics
|
||||
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
||||
current_mau_by_service_gauge = Gauge(
|
||||
"synapse_admin_mau_current_mau_by_service",
|
||||
"Current MAU by service",
|
||||
["app_service"],
|
||||
)
|
||||
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
||||
registered_reserved_users_mau_gauge = Gauge(
|
||||
"synapse_admin_mau:registered_reserved_users",
|
||||
"Registered users with reserved threepids",
|
||||
)
|
||||
|
||||
|
||||
def setup(config_options):
|
||||
"""
|
||||
Args:
|
||||
|
@ -389,8 +370,6 @@ def setup(config_options):
|
|||
except UpgradeDatabaseException as e:
|
||||
quit_with_error("Failed to upgrade database: %s" % (e,))
|
||||
|
||||
hs.setup_master()
|
||||
|
||||
async def do_acme() -> bool:
|
||||
"""
|
||||
Reprovision an ACME certificate, if it's required.
|
||||
|
@ -486,92 +465,6 @@ class SynapseService(service.Service):
|
|||
return self._port.stopListening()
|
||||
|
||||
|
||||
# Contains the list of processes we will be monitoring
|
||||
# currently either 0 or 1
|
||||
_stats_process = []
|
||||
|
||||
|
||||
async def phone_stats_home(hs, stats, stats_process=_stats_process):
|
||||
logger.info("Gathering stats for reporting")
|
||||
now = int(hs.get_clock().time())
|
||||
uptime = int(now - hs.start_time)
|
||||
if uptime < 0:
|
||||
uptime = 0
|
||||
|
||||
#
|
||||
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
|
||||
#
|
||||
old = stats_process[0]
|
||||
new = (now, resource.getrusage(resource.RUSAGE_SELF))
|
||||
stats_process[0] = new
|
||||
|
||||
# Get RSS in bytes
|
||||
stats["memory_rss"] = new[1].ru_maxrss
|
||||
|
||||
# Get CPU time in % of a single core, not % of all cores
|
||||
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
|
||||
old[1].ru_utime + old[1].ru_stime
|
||||
)
|
||||
if used_cpu_time == 0 or new[0] == old[0]:
|
||||
stats["cpu_average"] = 0
|
||||
else:
|
||||
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
|
||||
|
||||
#
|
||||
# General statistics
|
||||
#
|
||||
|
||||
stats["homeserver"] = hs.config.server_name
|
||||
stats["server_context"] = hs.config.server_context
|
||||
stats["timestamp"] = now
|
||||
stats["uptime_seconds"] = uptime
|
||||
version = sys.version_info
|
||||
stats["python_version"] = "{}.{}.{}".format(
|
||||
version.major, version.minor, version.micro
|
||||
)
|
||||
stats["total_users"] = await hs.get_datastore().count_all_users()
|
||||
|
||||
total_nonbridged_users = await hs.get_datastore().count_nonbridged_users()
|
||||
stats["total_nonbridged_users"] = total_nonbridged_users
|
||||
|
||||
daily_user_type_results = await hs.get_datastore().count_daily_user_type()
|
||||
for name, count in daily_user_type_results.items():
|
||||
stats["daily_user_type_" + name] = count
|
||||
|
||||
room_count = await hs.get_datastore().get_room_count()
|
||||
stats["total_room_count"] = room_count
|
||||
|
||||
stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
|
||||
stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
|
||||
stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
|
||||
stats["daily_messages"] = await hs.get_datastore().count_daily_messages()
|
||||
|
||||
r30_results = await hs.get_datastore().count_r30_users()
|
||||
for name, count in r30_results.items():
|
||||
stats["r30_users_" + name] = count
|
||||
|
||||
daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
|
||||
stats["daily_sent_messages"] = daily_sent_messages
|
||||
stats["cache_factor"] = hs.config.caches.global_factor
|
||||
stats["event_cache_size"] = hs.config.caches.event_cache_size
|
||||
|
||||
#
|
||||
# Database version
|
||||
#
|
||||
|
||||
# This only reports info about the *main* database.
|
||||
stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__
|
||||
stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version
|
||||
|
||||
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
|
||||
try:
|
||||
await hs.get_proxied_http_client().put_json(
|
||||
hs.config.report_stats_endpoint, stats
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Error reporting stats: %s", e)
|
||||
|
||||
|
||||
def run(hs):
|
||||
PROFILE_SYNAPSE = False
|
||||
if PROFILE_SYNAPSE:
|
||||
|
@ -597,81 +490,6 @@ def run(hs):
|
|||
ThreadPool._worker = profile(ThreadPool._worker)
|
||||
reactor.run = profile(reactor.run)
|
||||
|
||||
clock = hs.get_clock()
|
||||
|
||||
stats = {}
|
||||
|
||||
def performance_stats_init():
|
||||
_stats_process.clear()
|
||||
_stats_process.append(
|
||||
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
|
||||
)
|
||||
|
||||
def start_phone_stats_home():
|
||||
return run_as_background_process(
|
||||
"phone_stats_home", phone_stats_home, hs, stats
|
||||
)
|
||||
|
||||
def generate_user_daily_visit_stats():
|
||||
return run_as_background_process(
|
||||
"generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
|
||||
)
|
||||
|
||||
# Rather than update on per session basis, batch up the requests.
|
||||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
|
||||
|
||||
# monthly active user limiting functionality
|
||||
def reap_monthly_active_users():
|
||||
return run_as_background_process(
|
||||
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
|
||||
)
|
||||
|
||||
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
|
||||
reap_monthly_active_users()
|
||||
|
||||
async def generate_monthly_active_users():
|
||||
current_mau_count = 0
|
||||
current_mau_count_by_service = {}
|
||||
reserved_users = ()
|
||||
store = hs.get_datastore()
|
||||
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
||||
current_mau_count = await store.get_monthly_active_count()
|
||||
current_mau_count_by_service = (
|
||||
await store.get_monthly_active_count_by_service()
|
||||
)
|
||||
reserved_users = await store.get_registered_reserved_users()
|
||||
current_mau_gauge.set(float(current_mau_count))
|
||||
|
||||
for app_service, count in current_mau_count_by_service.items():
|
||||
current_mau_by_service_gauge.labels(app_service).set(float(count))
|
||||
|
||||
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
|
||||
max_mau_gauge.set(float(hs.config.max_mau_value))
|
||||
|
||||
def start_generate_monthly_active_users():
|
||||
return run_as_background_process(
|
||||
"generate_monthly_active_users", generate_monthly_active_users
|
||||
)
|
||||
|
||||
start_generate_monthly_active_users()
|
||||
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
||||
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
|
||||
# End of monthly active user settings
|
||||
|
||||
if hs.config.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
clock.call_later(0, performance_stats_init)
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(5 * 60, start_phone_stats_home)
|
||||
|
||||
_base.start_reactor(
|
||||
"synapse-homeserver",
|
||||
soft_file_limit=hs.config.soft_file_limit,
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import math
|
||||
import resource
|
||||
import sys
|
||||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
|
||||
logger = logging.getLogger("synapse.app.homeserver")
|
||||
|
||||
# Contains the list of processes we will be monitoring
|
||||
# currently either 0 or 1
|
||||
_stats_process = []
|
||||
|
||||
# Gauges to expose monthly active user control metrics
|
||||
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
||||
current_mau_by_service_gauge = Gauge(
|
||||
"synapse_admin_mau_current_mau_by_service",
|
||||
"Current MAU by service",
|
||||
["app_service"],
|
||||
)
|
||||
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
||||
registered_reserved_users_mau_gauge = Gauge(
|
||||
"synapse_admin_mau:registered_reserved_users",
|
||||
"Registered users with reserved threepids",
|
||||
)
|
||||
|
||||
|
||||
async def phone_stats_home(hs, stats, stats_process=_stats_process):
|
||||
logger.info("Gathering stats for reporting")
|
||||
now = int(hs.get_clock().time())
|
||||
uptime = int(now - hs.start_time)
|
||||
if uptime < 0:
|
||||
uptime = 0
|
||||
|
||||
#
|
||||
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
|
||||
#
|
||||
old = stats_process[0]
|
||||
new = (now, resource.getrusage(resource.RUSAGE_SELF))
|
||||
stats_process[0] = new
|
||||
|
||||
# Get RSS in bytes
|
||||
stats["memory_rss"] = new[1].ru_maxrss
|
||||
|
||||
# Get CPU time in % of a single core, not % of all cores
|
||||
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
|
||||
old[1].ru_utime + old[1].ru_stime
|
||||
)
|
||||
if used_cpu_time == 0 or new[0] == old[0]:
|
||||
stats["cpu_average"] = 0
|
||||
else:
|
||||
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
|
||||
|
||||
#
|
||||
# General statistics
|
||||
#
|
||||
|
||||
stats["homeserver"] = hs.config.server_name
|
||||
stats["server_context"] = hs.config.server_context
|
||||
stats["timestamp"] = now
|
||||
stats["uptime_seconds"] = uptime
|
||||
version = sys.version_info
|
||||
stats["python_version"] = "{}.{}.{}".format(
|
||||
version.major, version.minor, version.micro
|
||||
)
|
||||
stats["total_users"] = await hs.get_datastore().count_all_users()
|
||||
|
||||
total_nonbridged_users = await hs.get_datastore().count_nonbridged_users()
|
||||
stats["total_nonbridged_users"] = total_nonbridged_users
|
||||
|
||||
daily_user_type_results = await hs.get_datastore().count_daily_user_type()
|
||||
for name, count in daily_user_type_results.items():
|
||||
stats["daily_user_type_" + name] = count
|
||||
|
||||
room_count = await hs.get_datastore().get_room_count()
|
||||
stats["total_room_count"] = room_count
|
||||
|
||||
stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
|
||||
stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
|
||||
stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
|
||||
stats["daily_messages"] = await hs.get_datastore().count_daily_messages()
|
||||
|
||||
r30_results = await hs.get_datastore().count_r30_users()
|
||||
for name, count in r30_results.items():
|
||||
stats["r30_users_" + name] = count
|
||||
|
||||
daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
|
||||
stats["daily_sent_messages"] = daily_sent_messages
|
||||
stats["cache_factor"] = hs.config.caches.global_factor
|
||||
stats["event_cache_size"] = hs.config.caches.event_cache_size
|
||||
|
||||
#
|
||||
# Database version
|
||||
#
|
||||
|
||||
# This only reports info about the *main* database.
|
||||
stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__
|
||||
stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version
|
||||
|
||||
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
|
||||
try:
|
||||
await hs.get_proxied_http_client().put_json(
|
||||
hs.config.report_stats_endpoint, stats
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Error reporting stats: %s", e)
|
||||
|
||||
|
||||
def start_phone_stats_home(hs):
|
||||
"""
|
||||
Start the background tasks which report phone home stats.
|
||||
"""
|
||||
clock = hs.get_clock()
|
||||
|
||||
stats = {}
|
||||
|
||||
def performance_stats_init():
|
||||
_stats_process.clear()
|
||||
_stats_process.append(
|
||||
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
|
||||
)
|
||||
|
||||
def start_phone_stats_home():
|
||||
return run_as_background_process(
|
||||
"phone_stats_home", phone_stats_home, hs, stats
|
||||
)
|
||||
|
||||
def generate_user_daily_visit_stats():
|
||||
return run_as_background_process(
|
||||
"generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
|
||||
)
|
||||
|
||||
# Rather than update on per session basis, batch up the requests.
|
||||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
|
||||
|
||||
# monthly active user limiting functionality
|
||||
def reap_monthly_active_users():
|
||||
return run_as_background_process(
|
||||
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
|
||||
)
|
||||
|
||||
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
|
||||
reap_monthly_active_users()
|
||||
|
||||
async def generate_monthly_active_users():
|
||||
current_mau_count = 0
|
||||
current_mau_count_by_service = {}
|
||||
reserved_users = ()
|
||||
store = hs.get_datastore()
|
||||
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
||||
current_mau_count = await store.get_monthly_active_count()
|
||||
current_mau_count_by_service = (
|
||||
await store.get_monthly_active_count_by_service()
|
||||
)
|
||||
reserved_users = await store.get_registered_reserved_users()
|
||||
current_mau_gauge.set(float(current_mau_count))
|
||||
|
||||
for app_service, count in current_mau_count_by_service.items():
|
||||
current_mau_by_service_gauge.labels(app_service).set(float(count))
|
||||
|
||||
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
|
||||
max_mau_gauge.set(float(hs.config.max_mau_value))
|
||||
|
||||
def start_generate_monthly_active_users():
|
||||
return run_as_background_process(
|
||||
"generate_monthly_active_users", generate_monthly_active_users
|
||||
)
|
||||
|
||||
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
||||
start_generate_monthly_active_users()
|
||||
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
|
||||
# End of monthly active user settings
|
||||
|
||||
if hs.config.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
clock.call_later(0, performance_stats_init)
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(5 * 60, start_phone_stats_home)
|
|
@ -242,12 +242,11 @@ class Config:
|
|||
env = jinja2.Environment(loader=loader, autoescape=autoescape)
|
||||
|
||||
# Update the environment with our custom filters
|
||||
env.filters.update(
|
||||
{
|
||||
"format_ts": _format_ts_filter,
|
||||
"mxc_to_http": _create_mxc_to_http_filter(self.public_baseurl),
|
||||
}
|
||||
)
|
||||
env.filters.update({"format_ts": _format_ts_filter})
|
||||
if self.public_baseurl:
|
||||
env.filters.update(
|
||||
{"mxc_to_http": _create_mxc_to_http_filter(self.public_baseurl)}
|
||||
)
|
||||
|
||||
for filename in filenames:
|
||||
# Load the template
|
||||
|
|
|
@ -28,6 +28,9 @@ class CaptchaConfig(Config):
|
|||
"recaptcha_siteverify_api",
|
||||
"https://www.recaptcha.net/recaptcha/api/siteverify",
|
||||
)
|
||||
self.recaptcha_template = self.read_templates(
|
||||
["recaptcha.html"], autoescape=True
|
||||
)[0]
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
|
|
|
@ -89,6 +89,8 @@ class ConsentConfig(Config):
|
|||
|
||||
def read_config(self, config, **kwargs):
|
||||
consent_config = config.get("user_consent")
|
||||
self.terms_template = self.read_templates(["terms.html"], autoescape=True)[0]
|
||||
|
||||
if consent_config is None:
|
||||
return
|
||||
self.user_consent_version = str(consent_config["version"])
|
||||
|
|
|
@ -187,6 +187,11 @@ class RegistrationConfig(Config):
|
|||
session_lifetime = self.parse_duration(session_lifetime)
|
||||
self.session_lifetime = session_lifetime
|
||||
|
||||
# The success template used during fallback auth.
|
||||
self.fallback_success_template = self.read_templates(
|
||||
["auth_success.html"], autoescape=True
|
||||
)[0]
|
||||
|
||||
def generate_config_section(self, generate_secrets=False, **kwargs):
|
||||
if generate_secrets:
|
||||
registration_shared_secret = 'registration_shared_secret: "%s"' % (
|
||||
|
|
|
@ -132,6 +132,19 @@ class WorkerConfig(Config):
|
|||
|
||||
self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
|
||||
|
||||
# Whether this worker should run background tasks or not.
|
||||
#
|
||||
# As a note for developers, the background tasks guarded by this should
|
||||
# be able to run on only a single instance (meaning that they don't
|
||||
# depend on any in-memory state of a particular worker).
|
||||
#
|
||||
# No effort is made to ensure only a single instance of these tasks is
|
||||
# running.
|
||||
background_tasks_instance = config.get("run_background_tasks_on") or "master"
|
||||
self.run_background_tasks = (
|
||||
self.worker_name is None and background_tasks_instance == "master"
|
||||
) or self.worker_name == background_tasks_instance
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
return """\
|
||||
## Workers ##
|
||||
|
@ -167,6 +180,11 @@ class WorkerConfig(Config):
|
|||
#stream_writers:
|
||||
# events: worker1
|
||||
# typing: worker1
|
||||
|
||||
# The worker that is used to run background tasks (e.g. cleaning up expired
|
||||
# data). If not provided this defaults to the main process.
|
||||
#
|
||||
#run_background_tasks_on: worker1
|
||||
"""
|
||||
|
||||
def read_arguments(self, args):
|
||||
|
|
|
@ -490,7 +490,7 @@ class PerDestinationQueue:
|
|||
)
|
||||
|
||||
if logger.isEnabledFor(logging.INFO):
|
||||
rooms = (p.room_id for p in catchup_pdus)
|
||||
rooms = [p.room_id for p in catchup_pdus]
|
||||
logger.info("Catching up rooms to %s: %r", self._destination, rooms)
|
||||
|
||||
success = await self._transaction_manager.send_new_transaction(
|
||||
|
|
|
@ -212,7 +212,7 @@ class AuthHandler(BaseHandler):
|
|||
self._clock = self.hs.get_clock()
|
||||
|
||||
# Expire old UI auth sessions after a period of time.
|
||||
if hs.config.worker_app is None:
|
||||
if hs.config.run_background_tasks:
|
||||
self._clock.looping_call(
|
||||
run_as_background_process,
|
||||
5 * 60 * 1000,
|
||||
|
|
|
@ -49,7 +49,7 @@ class StatsHandler:
|
|||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
|
||||
if hs.config.stats_enabled:
|
||||
if self.stats_enabled and hs.config.run_background_tasks:
|
||||
self.notifier.add_replication_callback(self.notify_new_event)
|
||||
|
||||
# We kick this off so that we don't have to wait for a change before
|
||||
|
|
|
@ -251,10 +251,9 @@ class ReplicationCommandHandler:
|
|||
using TCP.
|
||||
"""
|
||||
if hs.config.redis.redis_enabled:
|
||||
import txredisapi
|
||||
|
||||
from synapse.replication.tcp.redis import (
|
||||
RedisDirectTcpReplicationClientFactory,
|
||||
lazyConnection,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
|
@ -271,7 +270,8 @@ class ReplicationCommandHandler:
|
|||
# connection after SUBSCRIBE is called).
|
||||
|
||||
# First create the connection for sending commands.
|
||||
outbound_redis_connection = txredisapi.lazyConnection(
|
||||
outbound_redis_connection = lazyConnection(
|
||||
reactor=hs.get_reactor(),
|
||||
host=hs.config.redis_host,
|
||||
port=hs.config.redis_port,
|
||||
password=hs.config.redis.redis_password,
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
import logging
|
||||
from inspect import isawaitable
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
import txredisapi
|
||||
|
||||
|
@ -228,3 +228,41 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
|
|||
p.password = self.password
|
||||
|
||||
return p
|
||||
|
||||
|
||||
def lazyConnection(
|
||||
reactor,
|
||||
host: str = "localhost",
|
||||
port: int = 6379,
|
||||
dbid: Optional[int] = None,
|
||||
reconnect: bool = True,
|
||||
charset: str = "utf-8",
|
||||
password: Optional[str] = None,
|
||||
connectTimeout: Optional[int] = None,
|
||||
replyTimeout: Optional[int] = None,
|
||||
convertNumbers: bool = True,
|
||||
) -> txredisapi.RedisProtocol:
|
||||
"""Equivalent to `txredisapi.lazyConnection`, except allows specifying a
|
||||
reactor.
|
||||
"""
|
||||
|
||||
isLazy = True
|
||||
poolsize = 1
|
||||
|
||||
uuid = "%s:%d" % (host, port)
|
||||
factory = txredisapi.RedisFactory(
|
||||
uuid,
|
||||
dbid,
|
||||
poolsize,
|
||||
isLazy,
|
||||
txredisapi.ConnectionHandler,
|
||||
charset,
|
||||
password,
|
||||
replyTimeout,
|
||||
convertNumbers,
|
||||
)
|
||||
factory.continueTrying = reconnect
|
||||
for x in range(poolsize):
|
||||
reactor.connectTCP(host, port, factory, connectTimeout)
|
||||
|
||||
return factory.handler
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
<html>
|
||||
<head>
|
||||
<title>Success!</title>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1,
|
||||
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
|
||||
<script>
|
||||
if (window.onAuthDone) {
|
||||
window.onAuthDone();
|
||||
} else if (window.opener && window.opener.postMessage) {
|
||||
window.opener.postMessage("authDone", "*");
|
||||
}
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
<div>
|
||||
<p>Thank you</p>
|
||||
<p>You may now close this window and return to the application</p>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,38 @@
|
|||
<html>
|
||||
<head>
|
||||
<title>Authentication</title>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1,
|
||||
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<script src="https://www.recaptcha.net/recaptcha/api.js"
|
||||
async defer></script>
|
||||
<script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
|
||||
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
|
||||
<script>
|
||||
function captchaDone() {
|
||||
$('#registrationForm').submit();
|
||||
}
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
<form id="registrationForm" method="post" action="{{ myurl }}">
|
||||
<div>
|
||||
<p>
|
||||
Hello! We need to prevent computer programs and other automated
|
||||
things from creating accounts on this server.
|
||||
</p>
|
||||
<p>
|
||||
Please verify that you're not a robot.
|
||||
</p>
|
||||
<input type="hidden" name="session" value="{{ session }}" />
|
||||
<div class="g-recaptcha"
|
||||
data-sitekey="{{ sitekey }}"
|
||||
data-callback="captchaDone">
|
||||
</div>
|
||||
<noscript>
|
||||
<input type="submit" value="All Done" />
|
||||
</noscript>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,20 @@
|
|||
<html>
|
||||
<head>
|
||||
<title>Authentication</title>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1,
|
||||
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
|
||||
</head>
|
||||
<body>
|
||||
<form id="registrationForm" method="post" action="{{ myurl }}">
|
||||
<div>
|
||||
<p>
|
||||
Please click the button below if you agree to the
|
||||
<a href="{{ terms_url }}">privacy policy of this homeserver.</a>
|
||||
</p>
|
||||
<input type="hidden" name="session" value="{{ session }}" />
|
||||
<input type="submit" value="Agree" />
|
||||
</div>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
|
@ -111,8 +111,6 @@ class LoginRestServlet(RestServlet):
|
|||
({"type": t} for t in self.auth_handler.get_supported_login_types())
|
||||
)
|
||||
|
||||
flows.append({"type": LoginRestServlet.APPSERVICE_TYPE})
|
||||
|
||||
return 200, {"flows": flows}
|
||||
|
||||
def on_OPTIONS(self, request: SynapseRequest):
|
||||
|
|
|
@ -25,94 +25,6 @@ from ._base import client_patterns
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
RECAPTCHA_TEMPLATE = """
|
||||
<html>
|
||||
<head>
|
||||
<title>Authentication</title>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1,
|
||||
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<script src="https://www.recaptcha.net/recaptcha/api.js"
|
||||
async defer></script>
|
||||
<script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
|
||||
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
|
||||
<script>
|
||||
function captchaDone() {
|
||||
$('#registrationForm').submit();
|
||||
}
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
<form id="registrationForm" method="post" action="%(myurl)s">
|
||||
<div>
|
||||
<p>
|
||||
Hello! We need to prevent computer programs and other automated
|
||||
things from creating accounts on this server.
|
||||
</p>
|
||||
<p>
|
||||
Please verify that you're not a robot.
|
||||
</p>
|
||||
<input type="hidden" name="session" value="%(session)s" />
|
||||
<div class="g-recaptcha"
|
||||
data-sitekey="%(sitekey)s"
|
||||
data-callback="captchaDone">
|
||||
</div>
|
||||
<noscript>
|
||||
<input type="submit" value="All Done" />
|
||||
</noscript>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
TERMS_TEMPLATE = """
|
||||
<html>
|
||||
<head>
|
||||
<title>Authentication</title>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1,
|
||||
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
|
||||
</head>
|
||||
<body>
|
||||
<form id="registrationForm" method="post" action="%(myurl)s">
|
||||
<div>
|
||||
<p>
|
||||
Please click the button below if you agree to the
|
||||
<a href="%(terms_url)s">privacy policy of this homeserver.</a>
|
||||
</p>
|
||||
<input type="hidden" name="session" value="%(session)s" />
|
||||
<input type="submit" value="Agree" />
|
||||
</div>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
SUCCESS_TEMPLATE = """
|
||||
<html>
|
||||
<head>
|
||||
<title>Success!</title>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1,
|
||||
user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
|
||||
<script>
|
||||
if (window.onAuthDone) {
|
||||
window.onAuthDone();
|
||||
} else if (window.opener && window.opener.postMessage) {
|
||||
window.opener.postMessage("authDone", "*");
|
||||
}
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
<div>
|
||||
<p>Thank you</p>
|
||||
<p>You may now close this window and return to the application</p>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
|
||||
class AuthRestServlet(RestServlet):
|
||||
"""
|
||||
|
@ -145,26 +57,30 @@ class AuthRestServlet(RestServlet):
|
|||
self._cas_server_url = hs.config.cas_server_url
|
||||
self._cas_service_url = hs.config.cas_service_url
|
||||
|
||||
self.recaptcha_template = hs.config.recaptcha_template
|
||||
self.terms_template = hs.config.terms_template
|
||||
self.success_template = hs.config.fallback_success_template
|
||||
|
||||
async def on_GET(self, request, stagetype):
|
||||
session = parse_string(request, "session")
|
||||
if not session:
|
||||
raise SynapseError(400, "No session supplied")
|
||||
|
||||
if stagetype == LoginType.RECAPTCHA:
|
||||
html = RECAPTCHA_TEMPLATE % {
|
||||
"session": session,
|
||||
"myurl": "%s/r0/auth/%s/fallback/web"
|
||||
html = self.recaptcha_template.render(
|
||||
session=session,
|
||||
myurl="%s/r0/auth/%s/fallback/web"
|
||||
% (CLIENT_API_PREFIX, LoginType.RECAPTCHA),
|
||||
"sitekey": self.hs.config.recaptcha_public_key,
|
||||
}
|
||||
sitekey=self.hs.config.recaptcha_public_key,
|
||||
)
|
||||
elif stagetype == LoginType.TERMS:
|
||||
html = TERMS_TEMPLATE % {
|
||||
"session": session,
|
||||
"terms_url": "%s_matrix/consent?v=%s"
|
||||
html = self.terms_template.render(
|
||||
session=session,
|
||||
terms_url="%s_matrix/consent?v=%s"
|
||||
% (self.hs.config.public_baseurl, self.hs.config.user_consent_version),
|
||||
"myurl": "%s/r0/auth/%s/fallback/web"
|
||||
myurl="%s/r0/auth/%s/fallback/web"
|
||||
% (CLIENT_API_PREFIX, LoginType.TERMS),
|
||||
}
|
||||
)
|
||||
|
||||
elif stagetype == LoginType.SSO:
|
||||
# Display a confirmation page which prompts the user to
|
||||
|
@ -222,14 +138,14 @@ class AuthRestServlet(RestServlet):
|
|||
)
|
||||
|
||||
if success:
|
||||
html = SUCCESS_TEMPLATE
|
||||
html = self.success_template.render()
|
||||
else:
|
||||
html = RECAPTCHA_TEMPLATE % {
|
||||
"session": session,
|
||||
"myurl": "%s/r0/auth/%s/fallback/web"
|
||||
html = self.recaptcha_template.render(
|
||||
session=session,
|
||||
myurl="%s/r0/auth/%s/fallback/web"
|
||||
% (CLIENT_API_PREFIX, LoginType.RECAPTCHA),
|
||||
"sitekey": self.hs.config.recaptcha_public_key,
|
||||
}
|
||||
sitekey=self.hs.config.recaptcha_public_key,
|
||||
)
|
||||
elif stagetype == LoginType.TERMS:
|
||||
authdict = {"session": session}
|
||||
|
||||
|
@ -238,18 +154,18 @@ class AuthRestServlet(RestServlet):
|
|||
)
|
||||
|
||||
if success:
|
||||
html = SUCCESS_TEMPLATE
|
||||
html = self.success_template.render()
|
||||
else:
|
||||
html = TERMS_TEMPLATE % {
|
||||
"session": session,
|
||||
"terms_url": "%s_matrix/consent?v=%s"
|
||||
html = self.terms_template.render(
|
||||
session=session,
|
||||
terms_url="%s_matrix/consent?v=%s"
|
||||
% (
|
||||
self.hs.config.public_baseurl,
|
||||
self.hs.config.user_consent_version,
|
||||
),
|
||||
"myurl": "%s/r0/auth/%s/fallback/web"
|
||||
myurl="%s/r0/auth/%s/fallback/web"
|
||||
% (CLIENT_API_PREFIX, LoginType.TERMS),
|
||||
}
|
||||
)
|
||||
elif stagetype == LoginType.SSO:
|
||||
# The SSO fallback workflow should not post here,
|
||||
raise SynapseError(404, "Fallback SSO auth does not support POST requests.")
|
||||
|
|
|
@ -141,31 +141,34 @@ class MediaStorage:
|
|||
Returns:
|
||||
Returns a Responder if the file was found, otherwise None.
|
||||
"""
|
||||
paths = [self._file_info_to_path(file_info)]
|
||||
|
||||
path = self._file_info_to_path(file_info)
|
||||
local_path = os.path.join(self.local_media_directory, path)
|
||||
if os.path.exists(local_path):
|
||||
return FileResponder(open(local_path, "rb"))
|
||||
|
||||
# Fallback for paths without method names
|
||||
# Should be removed in the future
|
||||
# fallback for remote thumbnails with no method in the filename
|
||||
if file_info.thumbnail and file_info.server_name:
|
||||
legacy_path = self.filepaths.remote_media_thumbnail_rel_legacy(
|
||||
server_name=file_info.server_name,
|
||||
file_id=file_info.file_id,
|
||||
width=file_info.thumbnail_width,
|
||||
height=file_info.thumbnail_height,
|
||||
content_type=file_info.thumbnail_type,
|
||||
paths.append(
|
||||
self.filepaths.remote_media_thumbnail_rel_legacy(
|
||||
server_name=file_info.server_name,
|
||||
file_id=file_info.file_id,
|
||||
width=file_info.thumbnail_width,
|
||||
height=file_info.thumbnail_height,
|
||||
content_type=file_info.thumbnail_type,
|
||||
)
|
||||
)
|
||||
legacy_local_path = os.path.join(self.local_media_directory, legacy_path)
|
||||
if os.path.exists(legacy_local_path):
|
||||
return FileResponder(open(legacy_local_path, "rb"))
|
||||
|
||||
for path in paths:
|
||||
local_path = os.path.join(self.local_media_directory, path)
|
||||
if os.path.exists(local_path):
|
||||
logger.debug("responding with local file %s", local_path)
|
||||
return FileResponder(open(local_path, "rb"))
|
||||
logger.debug("local file %s did not exist", local_path)
|
||||
|
||||
for provider in self.storage_providers:
|
||||
res = await provider.fetch(path, file_info) # type: Any
|
||||
if res:
|
||||
logger.debug("Streaming %s from %s", path, provider)
|
||||
return res
|
||||
for path in paths:
|
||||
res = await provider.fetch(path, file_info) # type: Any
|
||||
if res:
|
||||
logger.debug("Streaming %s from %s", path, provider)
|
||||
return res
|
||||
logger.debug("%s not found on %s", path, provider)
|
||||
|
||||
return None
|
||||
|
||||
|
|
|
@ -185,7 +185,10 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||
we are listening on to provide HTTP services.
|
||||
"""
|
||||
|
||||
REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
|
||||
REQUIRED_ON_BACKGROUND_TASK_STARTUP = [
|
||||
"auth",
|
||||
"stats",
|
||||
]
|
||||
|
||||
# This is overridden in derived application classes
|
||||
# (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
|
||||
|
@ -251,14 +254,20 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||
self.datastores = Databases(self.DATASTORE_CLASS, self)
|
||||
logger.info("Finished setting up.")
|
||||
|
||||
def setup_master(self) -> None:
|
||||
# Register background tasks required by this server. This must be done
|
||||
# somewhat manually due to the background tasks not being registered
|
||||
# unless handlers are instantiated.
|
||||
if self.config.run_background_tasks:
|
||||
self.setup_background_tasks()
|
||||
|
||||
def setup_background_tasks(self) -> None:
|
||||
"""
|
||||
Some handlers have side effects on instantiation (like registering
|
||||
background updates). This function causes them to be fetched, and
|
||||
therefore instantiated, to run those side effects.
|
||||
"""
|
||||
for i in self.REQUIRED_ON_MASTER_STARTUP:
|
||||
getattr(self, "get_" + i)()
|
||||
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
|
||||
getattr(self, "get_" + i + "_handler")()
|
||||
|
||||
def get_reactor(self) -> twisted.internet.base.ReactorBase:
|
||||
"""
|
||||
|
|
|
@ -32,6 +32,7 @@ from typing import (
|
|||
overload,
|
||||
)
|
||||
|
||||
import attr
|
||||
from prometheus_client import Histogram
|
||||
from typing_extensions import Literal
|
||||
|
||||
|
@ -90,13 +91,17 @@ def make_pool(
|
|||
return adbapi.ConnectionPool(
|
||||
db_config.config["name"],
|
||||
cp_reactor=reactor,
|
||||
cp_openfun=engine.on_new_connection,
|
||||
cp_openfun=lambda conn: engine.on_new_connection(
|
||||
LoggingDatabaseConnection(conn, engine, "on_new_connection")
|
||||
),
|
||||
**db_config.config.get("args", {})
|
||||
)
|
||||
|
||||
|
||||
def make_conn(
|
||||
db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
|
||||
db_config: DatabaseConnectionConfig,
|
||||
engine: BaseDatabaseEngine,
|
||||
default_txn_name: str,
|
||||
) -> Connection:
|
||||
"""Make a new connection to the database and return it.
|
||||
|
||||
|
@ -109,11 +114,60 @@ def make_conn(
|
|||
for k, v in db_config.config.get("args", {}).items()
|
||||
if not k.startswith("cp_")
|
||||
}
|
||||
db_conn = engine.module.connect(**db_params)
|
||||
native_db_conn = engine.module.connect(**db_params)
|
||||
db_conn = LoggingDatabaseConnection(native_db_conn, engine, default_txn_name)
|
||||
|
||||
engine.on_new_connection(db_conn)
|
||||
return db_conn
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class LoggingDatabaseConnection:
|
||||
"""A wrapper around a database connection that returns `LoggingTransaction`
|
||||
as its cursor class.
|
||||
|
||||
This is mainly used on startup to ensure that queries get logged correctly
|
||||
"""
|
||||
|
||||
conn = attr.ib(type=Connection)
|
||||
engine = attr.ib(type=BaseDatabaseEngine)
|
||||
default_txn_name = attr.ib(type=str)
|
||||
|
||||
def cursor(
|
||||
self, *, txn_name=None, after_callbacks=None, exception_callbacks=None
|
||||
) -> "LoggingTransaction":
|
||||
if not txn_name:
|
||||
txn_name = self.default_txn_name
|
||||
|
||||
return LoggingTransaction(
|
||||
self.conn.cursor(),
|
||||
name=txn_name,
|
||||
database_engine=self.engine,
|
||||
after_callbacks=after_callbacks,
|
||||
exception_callbacks=exception_callbacks,
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
self.conn.close()
|
||||
|
||||
def commit(self) -> None:
|
||||
self.conn.commit()
|
||||
|
||||
def rollback(self, *args, **kwargs) -> None:
|
||||
self.conn.rollback(*args, **kwargs)
|
||||
|
||||
def __enter__(self) -> "Connection":
|
||||
self.conn.__enter__()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback) -> bool:
|
||||
return self.conn.__exit__(exc_type, exc_value, traceback)
|
||||
|
||||
# Proxy through any unknown lookups to the DB conn class.
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.conn, name)
|
||||
|
||||
|
||||
# The type of entry which goes on our after_callbacks and exception_callbacks lists.
|
||||
#
|
||||
# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so
|
||||
|
@ -247,6 +301,12 @@ class LoggingTransaction:
|
|||
def close(self) -> None:
|
||||
self.txn.close()
|
||||
|
||||
def __enter__(self) -> "LoggingTransaction":
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
class PerformanceCounters:
|
||||
def __init__(self):
|
||||
|
@ -395,7 +455,7 @@ class DatabasePool:
|
|||
|
||||
def new_transaction(
|
||||
self,
|
||||
conn: Connection,
|
||||
conn: LoggingDatabaseConnection,
|
||||
desc: str,
|
||||
after_callbacks: List[_CallbackListEntry],
|
||||
exception_callbacks: List[_CallbackListEntry],
|
||||
|
@ -418,12 +478,10 @@ class DatabasePool:
|
|||
i = 0
|
||||
N = 5
|
||||
while True:
|
||||
cursor = LoggingTransaction(
|
||||
conn.cursor(),
|
||||
name,
|
||||
self.engine,
|
||||
after_callbacks,
|
||||
exception_callbacks,
|
||||
cursor = conn.cursor(
|
||||
txn_name=name,
|
||||
after_callbacks=after_callbacks,
|
||||
exception_callbacks=exception_callbacks,
|
||||
)
|
||||
try:
|
||||
r = func(cursor, *args, **kwargs)
|
||||
|
@ -584,7 +642,10 @@ class DatabasePool:
|
|||
logger.debug("Reconnecting closed database connection")
|
||||
conn.reconnect()
|
||||
|
||||
return func(conn, *args, **kwargs)
|
||||
db_conn = LoggingDatabaseConnection(
|
||||
conn, self.engine, "runWithConnection"
|
||||
)
|
||||
return func(db_conn, *args, **kwargs)
|
||||
|
||||
return await make_deferred_yieldable(
|
||||
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
||||
|
@ -1621,7 +1682,7 @@ class DatabasePool:
|
|||
|
||||
def get_cache_dict(
|
||||
self,
|
||||
db_conn: Connection,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
table: str,
|
||||
entity_column: str,
|
||||
stream_column: str,
|
||||
|
@ -1642,9 +1703,7 @@ class DatabasePool:
|
|||
"limit": limit,
|
||||
}
|
||||
|
||||
sql = self.engine.convert_param_style(sql)
|
||||
|
||||
txn = db_conn.cursor()
|
||||
txn = db_conn.cursor(txn_name="get_cache_dict")
|
||||
txn.execute(sql, (int(max_value),))
|
||||
|
||||
cache = {row[0]: int(row[1]) for row in txn}
|
||||
|
|
|
@ -46,7 +46,7 @@ class Databases:
|
|||
db_name = database_config.name
|
||||
engine = create_engine(database_config.config)
|
||||
|
||||
with make_conn(database_config, engine) as db_conn:
|
||||
with make_conn(database_config, engine, "startup") as db_conn:
|
||||
logger.info("[database config %r]: Checking database server", db_name)
|
||||
engine.check_database(db_conn)
|
||||
|
||||
|
|
|
@ -15,9 +15,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import calendar
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import PresenceState
|
||||
|
@ -268,9 +266,6 @@ class DataStore(
|
|||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||
|
||||
# Used in _generate_user_daily_visits to keep track of progress
|
||||
self._last_user_visit_update = self._get_start_of_day()
|
||||
|
||||
def get_device_stream_token(self) -> int:
|
||||
return self._device_list_id_gen.get_current_token()
|
||||
|
||||
|
@ -289,7 +284,6 @@ class DataStore(
|
|||
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
|
||||
" WHERE state != ?"
|
||||
)
|
||||
sql = self.database_engine.convert_param_style(sql)
|
||||
|
||||
txn = db_conn.cursor()
|
||||
txn.execute(sql, (PresenceState.OFFLINE,))
|
||||
|
@ -301,192 +295,6 @@ class DataStore(
|
|||
|
||||
return [UserPresenceState(**row) for row in rows]
|
||||
|
||||
async def count_daily_users(self) -> int:
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 24 hours.
|
||||
"""
|
||||
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
|
||||
return await self.db_pool.runInteraction(
|
||||
"count_daily_users", self._count_users, yesterday
|
||||
)
|
||||
|
||||
async def count_monthly_users(self) -> int:
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 30 days.
|
||||
Note this method is intended for phonehome metrics only and is different
|
||||
from the mau figure in synapse.storage.monthly_active_users which,
|
||||
amongst other things, includes a 3 day grace period before a user counts.
|
||||
"""
|
||||
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||
return await self.db_pool.runInteraction(
|
||||
"count_monthly_users", self._count_users, thirty_days_ago
|
||||
)
|
||||
|
||||
def _count_users(self, txn, time_from):
|
||||
"""
|
||||
Returns number of users seen in the past time_from period
|
||||
"""
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT user_id FROM user_ips
|
||||
WHERE last_seen > ?
|
||||
GROUP BY user_id
|
||||
) u
|
||||
"""
|
||||
txn.execute(sql, (time_from,))
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
async def count_r30_users(self) -> Dict[str, int]:
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days ago
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days apart
|
||||
|
||||
Returns:
|
||||
A mapping of counts globally as well as broken out by platform.
|
||||
"""
|
||||
|
||||
def _count_r30_users(txn):
|
||||
thirty_days_in_secs = 86400 * 30
|
||||
now = int(self._clock.time())
|
||||
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
||||
|
||||
sql = """
|
||||
SELECT platform, COALESCE(count(*), 0) FROM (
|
||||
SELECT
|
||||
users.name, platform, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen,
|
||||
CASE
|
||||
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
||||
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
||||
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
||||
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
||||
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
||||
ELSE 'unknown'
|
||||
END
|
||||
AS platform
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND users.appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, platform, users.creation_ts
|
||||
) u GROUP BY platform
|
||||
"""
|
||||
|
||||
results = {}
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
for row in txn:
|
||||
if row[0] == "unknown":
|
||||
pass
|
||||
results[row[0]] = row[1]
|
||||
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT users.name, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, users.creation_ts
|
||||
) u
|
||||
"""
|
||||
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
(count,) = txn.fetchone()
|
||||
results["all"] = count
|
||||
|
||||
return results
|
||||
|
||||
return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
|
||||
|
||||
def _get_start_of_day(self):
|
||||
"""
|
||||
Returns millisecond unixtime for start of UTC day.
|
||||
"""
|
||||
now = time.gmtime()
|
||||
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
|
||||
return today_start * 1000
|
||||
|
||||
async def generate_user_daily_visits(self) -> None:
|
||||
"""
|
||||
Generates daily visit data for use in cohort/ retention analysis
|
||||
"""
|
||||
|
||||
def _generate_user_daily_visits(txn):
|
||||
logger.info("Calling _generate_user_daily_visits")
|
||||
today_start = self._get_start_of_day()
|
||||
a_day_in_milliseconds = 24 * 60 * 60 * 1000
|
||||
now = self.clock.time_msec()
|
||||
|
||||
sql = """
|
||||
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
||||
SELECT u.user_id, u.device_id, ?
|
||||
FROM user_ips AS u
|
||||
LEFT JOIN (
|
||||
SELECT user_id, device_id, timestamp FROM user_daily_visits
|
||||
WHERE timestamp = ?
|
||||
) udv
|
||||
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
|
||||
INNER JOIN users ON users.name=u.user_id
|
||||
WHERE last_seen > ? AND last_seen <= ?
|
||||
AND udv.timestamp IS NULL AND users.is_guest=0
|
||||
AND users.appservice_id IS NULL
|
||||
GROUP BY u.user_id, u.device_id
|
||||
"""
|
||||
|
||||
# This means that the day has rolled over but there could still
|
||||
# be entries from the previous day. There is an edge case
|
||||
# where if the user logs in at 23:59 and overwrites their
|
||||
# last_seen at 00:01 then they will not be counted in the
|
||||
# previous day's stats - it is important that the query is run
|
||||
# often to minimise this case.
|
||||
if today_start > self._last_user_visit_update:
|
||||
yesterday_start = today_start - a_day_in_milliseconds
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
yesterday_start,
|
||||
yesterday_start,
|
||||
self._last_user_visit_update,
|
||||
today_start,
|
||||
),
|
||||
)
|
||||
self._last_user_visit_update = today_start
|
||||
|
||||
txn.execute(
|
||||
sql, (today_start, today_start, self._last_user_visit_update, now)
|
||||
)
|
||||
# Update _last_user_visit_update to now. The reason to do this
|
||||
# rather just clamping to the beginning of the day is to limit
|
||||
# the size of the join - meaning that the query can be run more
|
||||
# frequently
|
||||
self._last_user_visit_update = now
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"generate_user_daily_visits", _generate_user_daily_visits
|
||||
)
|
||||
|
||||
async def get_users(self) -> List[Dict[str, Any]]:
|
||||
"""Function to retrieve a list of users in users table.
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ from typing import Dict, List, Optional, Tuple, Union
|
|||
import attr
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
@ -74,11 +74,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
self.stream_ordering_month_ago = None
|
||||
self.stream_ordering_day_ago = None
|
||||
|
||||
cur = LoggingTransaction(
|
||||
db_conn.cursor(),
|
||||
name="_find_stream_orderings_for_times_txn",
|
||||
database_engine=self.database_engine,
|
||||
)
|
||||
cur = db_conn.cursor(txn_name="_find_stream_orderings_for_times_txn")
|
||||
self._find_stream_orderings_for_times_txn(cur)
|
||||
cur.close()
|
||||
|
||||
|
|
|
@ -74,6 +74,13 @@ class EventRedactBehaviour(Names):
|
|||
|
||||
|
||||
class EventsWorkerStore(SQLBaseStore):
|
||||
# Whether to use dedicated DB threads for event fetching. This is only used
|
||||
# if there are multiple DB threads available. When used will lock the DB
|
||||
# thread for periods of time (so unit tests want to disable this when they
|
||||
# run DB transactions on the main thread). See EVENT_QUEUE_* for more
|
||||
# options controlling this.
|
||||
USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING = True
|
||||
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
|
@ -522,7 +529,11 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
if not event_list:
|
||||
single_threaded = self.database_engine.single_threaded
|
||||
if single_threaded or i > EVENT_QUEUE_ITERATIONS:
|
||||
if (
|
||||
not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
|
||||
or single_threaded
|
||||
or i > EVENT_QUEUE_ITERATIONS
|
||||
):
|
||||
self._event_fetch_ongoing -= 1
|
||||
return
|
||||
else:
|
||||
|
|
|
@ -12,6 +12,10 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import calendar
|
||||
import logging
|
||||
import time
|
||||
from typing import Dict
|
||||
|
||||
from synapse.metrics import GaugeBucketCollector
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
|
@ -21,6 +25,8 @@ from synapse.storage.databases.main.event_push_actions import (
|
|||
EventPushActionsWorkerStore,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Collect metrics on the number of forward extremities that exist.
|
||||
_extremities_collecter = GaugeBucketCollector(
|
||||
"synapse_forward_extremities",
|
||||
|
@ -60,6 +66,9 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
|||
|
||||
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
|
||||
|
||||
# Used in _generate_user_daily_visits to keep track of progress
|
||||
self._last_user_visit_update = self._get_start_of_day()
|
||||
|
||||
async def _read_forward_extremities(self):
|
||||
def fetch(txn):
|
||||
txn.execute(
|
||||
|
@ -137,3 +146,189 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
|||
return count
|
||||
|
||||
return await self.db_pool.runInteraction("count_daily_active_rooms", _count)
|
||||
|
||||
async def count_daily_users(self) -> int:
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 24 hours.
|
||||
"""
|
||||
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
|
||||
return await self.db_pool.runInteraction(
|
||||
"count_daily_users", self._count_users, yesterday
|
||||
)
|
||||
|
||||
async def count_monthly_users(self) -> int:
|
||||
"""
|
||||
Counts the number of users who used this homeserver in the last 30 days.
|
||||
Note this method is intended for phonehome metrics only and is different
|
||||
from the mau figure in synapse.storage.monthly_active_users which,
|
||||
amongst other things, includes a 3 day grace period before a user counts.
|
||||
"""
|
||||
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||
return await self.db_pool.runInteraction(
|
||||
"count_monthly_users", self._count_users, thirty_days_ago
|
||||
)
|
||||
|
||||
def _count_users(self, txn, time_from):
|
||||
"""
|
||||
Returns number of users seen in the past time_from period
|
||||
"""
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT user_id FROM user_ips
|
||||
WHERE last_seen > ?
|
||||
GROUP BY user_id
|
||||
) u
|
||||
"""
|
||||
txn.execute(sql, (time_from,))
|
||||
(count,) = txn.fetchone()
|
||||
return count
|
||||
|
||||
async def count_r30_users(self) -> Dict[str, int]:
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days ago
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days apart
|
||||
|
||||
Returns:
|
||||
A mapping of counts globally as well as broken out by platform.
|
||||
"""
|
||||
|
||||
def _count_r30_users(txn):
|
||||
thirty_days_in_secs = 86400 * 30
|
||||
now = int(self._clock.time())
|
||||
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
||||
|
||||
sql = """
|
||||
SELECT platform, COALESCE(count(*), 0) FROM (
|
||||
SELECT
|
||||
users.name, platform, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen,
|
||||
CASE
|
||||
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
||||
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
||||
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
||||
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
||||
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
||||
ELSE 'unknown'
|
||||
END
|
||||
AS platform
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND users.appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, platform, users.creation_ts
|
||||
) u GROUP BY platform
|
||||
"""
|
||||
|
||||
results = {}
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
for row in txn:
|
||||
if row[0] == "unknown":
|
||||
pass
|
||||
results[row[0]] = row[1]
|
||||
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT users.name, users.creation_ts * 1000,
|
||||
MAX(uip.last_seen)
|
||||
FROM users
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
last_seen
|
||||
FROM user_ips
|
||||
) uip
|
||||
ON users.name = uip.user_id
|
||||
AND appservice_id is NULL
|
||||
AND users.creation_ts < ?
|
||||
AND uip.last_seen/1000 > ?
|
||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||
GROUP BY users.name, users.creation_ts
|
||||
) u
|
||||
"""
|
||||
|
||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||
|
||||
(count,) = txn.fetchone()
|
||||
results["all"] = count
|
||||
|
||||
return results
|
||||
|
||||
return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
|
||||
|
||||
def _get_start_of_day(self):
|
||||
"""
|
||||
Returns millisecond unixtime for start of UTC day.
|
||||
"""
|
||||
now = time.gmtime()
|
||||
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
|
||||
return today_start * 1000
|
||||
|
||||
async def generate_user_daily_visits(self) -> None:
|
||||
"""
|
||||
Generates daily visit data for use in cohort/ retention analysis
|
||||
"""
|
||||
|
||||
def _generate_user_daily_visits(txn):
|
||||
logger.info("Calling _generate_user_daily_visits")
|
||||
today_start = self._get_start_of_day()
|
||||
a_day_in_milliseconds = 24 * 60 * 60 * 1000
|
||||
now = self._clock.time_msec()
|
||||
|
||||
sql = """
|
||||
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
||||
SELECT u.user_id, u.device_id, ?
|
||||
FROM user_ips AS u
|
||||
LEFT JOIN (
|
||||
SELECT user_id, device_id, timestamp FROM user_daily_visits
|
||||
WHERE timestamp = ?
|
||||
) udv
|
||||
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
|
||||
INNER JOIN users ON users.name=u.user_id
|
||||
WHERE last_seen > ? AND last_seen <= ?
|
||||
AND udv.timestamp IS NULL AND users.is_guest=0
|
||||
AND users.appservice_id IS NULL
|
||||
GROUP BY u.user_id, u.device_id
|
||||
"""
|
||||
|
||||
# This means that the day has rolled over but there could still
|
||||
# be entries from the previous day. There is an edge case
|
||||
# where if the user logs in at 23:59 and overwrites their
|
||||
# last_seen at 00:01 then they will not be counted in the
|
||||
# previous day's stats - it is important that the query is run
|
||||
# often to minimise this case.
|
||||
if today_start > self._last_user_visit_update:
|
||||
yesterday_start = today_start - a_day_in_milliseconds
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
yesterday_start,
|
||||
yesterday_start,
|
||||
self._last_user_visit_update,
|
||||
today_start,
|
||||
),
|
||||
)
|
||||
self._last_user_visit_update = today_start
|
||||
|
||||
txn.execute(
|
||||
sql, (today_start, today_start, self._last_user_visit_update, now)
|
||||
)
|
||||
# Update _last_user_visit_update to now. The reason to do this
|
||||
# rather just clamping to the beginning of the day is to limit
|
||||
# the size of the join - meaning that the query can be run more
|
||||
# frequently
|
||||
self._last_user_visit_update = now
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"generate_user_daily_visits", _generate_user_daily_visits
|
||||
)
|
||||
|
|
|
@ -32,6 +32,9 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
|
|||
self._clock = hs.get_clock()
|
||||
self.hs = hs
|
||||
|
||||
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
|
||||
self._max_mau_value = hs.config.max_mau_value
|
||||
|
||||
@cached(num_args=0)
|
||||
async def get_monthly_active_count(self) -> int:
|
||||
"""Generates current count of monthly active users
|
||||
|
@ -124,60 +127,6 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
|
|||
desc="user_last_seen_monthly_active",
|
||||
)
|
||||
|
||||
|
||||
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
|
||||
self._mau_stats_only = hs.config.mau_stats_only
|
||||
self._max_mau_value = hs.config.max_mau_value
|
||||
|
||||
# Do not add more reserved users than the total allowable number
|
||||
# cur = LoggingTransaction(
|
||||
self.db_pool.new_transaction(
|
||||
db_conn,
|
||||
"initialise_mau_threepids",
|
||||
[],
|
||||
[],
|
||||
self._initialise_reserved_users,
|
||||
hs.config.mau_limits_reserved_threepids[: self._max_mau_value],
|
||||
)
|
||||
|
||||
def _initialise_reserved_users(self, txn, threepids):
|
||||
"""Ensures that reserved threepids are accounted for in the MAU table, should
|
||||
be called on start up.
|
||||
|
||||
Args:
|
||||
txn (cursor):
|
||||
threepids (list[dict]): List of threepid dicts to reserve
|
||||
"""
|
||||
|
||||
# XXX what is this function trying to achieve? It upserts into
|
||||
# monthly_active_users for each *registered* reserved mau user, but why?
|
||||
#
|
||||
# - shouldn't there already be an entry for each reserved user (at least
|
||||
# if they have been active recently)?
|
||||
#
|
||||
# - if it's important that the timestamp is kept up to date, why do we only
|
||||
# run this at startup?
|
||||
|
||||
for tp in threepids:
|
||||
user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
|
||||
|
||||
if user_id:
|
||||
is_support = self.is_support_user_txn(txn, user_id)
|
||||
if not is_support:
|
||||
# We do this manually here to avoid hitting #6791
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="monthly_active_users",
|
||||
keyvalues={"user_id": user_id},
|
||||
values={"timestamp": int(self._clock.time_msec())},
|
||||
)
|
||||
else:
|
||||
logger.warning("mau limit reserved threepid %s not found in db" % tp)
|
||||
|
||||
async def reap_monthly_active_users(self):
|
||||
"""Cleans out monthly active user table to ensure that no stale
|
||||
entries exist.
|
||||
|
@ -257,6 +206,57 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
|
|||
"reap_monthly_active_users", _reap_users, reserved_users
|
||||
)
|
||||
|
||||
|
||||
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self._mau_stats_only = hs.config.mau_stats_only
|
||||
|
||||
# Do not add more reserved users than the total allowable number
|
||||
self.db_pool.new_transaction(
|
||||
db_conn,
|
||||
"initialise_mau_threepids",
|
||||
[],
|
||||
[],
|
||||
self._initialise_reserved_users,
|
||||
hs.config.mau_limits_reserved_threepids[: self._max_mau_value],
|
||||
)
|
||||
|
||||
def _initialise_reserved_users(self, txn, threepids):
|
||||
"""Ensures that reserved threepids are accounted for in the MAU table, should
|
||||
be called on start up.
|
||||
|
||||
Args:
|
||||
txn (cursor):
|
||||
threepids (list[dict]): List of threepid dicts to reserve
|
||||
"""
|
||||
|
||||
# XXX what is this function trying to achieve? It upserts into
|
||||
# monthly_active_users for each *registered* reserved mau user, but why?
|
||||
#
|
||||
# - shouldn't there already be an entry for each reserved user (at least
|
||||
# if they have been active recently)?
|
||||
#
|
||||
# - if it's important that the timestamp is kept up to date, why do we only
|
||||
# run this at startup?
|
||||
|
||||
for tp in threepids:
|
||||
user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
|
||||
|
||||
if user_id:
|
||||
is_support = self.is_support_user_txn(txn, user_id)
|
||||
if not is_support:
|
||||
# We do this manually here to avoid hitting #6791
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="monthly_active_users",
|
||||
keyvalues={"user_id": user_id},
|
||||
values={"timestamp": int(self._clock.time_msec())},
|
||||
)
|
||||
else:
|
||||
logger.warning("mau limit reserved threepid %s not found in db" % tp)
|
||||
|
||||
async def upsert_monthly_active_user(self, user_id: str) -> None:
|
||||
"""Updates or inserts the user into the monthly active user table, which
|
||||
is used to track the current MAU usage of the server
|
||||
|
|
|
@ -192,6 +192,18 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
"count_public_rooms", _count_public_rooms_txn
|
||||
)
|
||||
|
||||
async def get_room_count(self) -> int:
|
||||
"""Retrieve the total number of rooms.
|
||||
"""
|
||||
|
||||
def f(txn):
|
||||
sql = "SELECT count(*) FROM rooms"
|
||||
txn.execute(sql)
|
||||
row = txn.fetchone()
|
||||
return row[0] or 0
|
||||
|
||||
return await self.db_pool.runInteraction("get_rooms", f)
|
||||
|
||||
async def get_largest_public_rooms(
|
||||
self,
|
||||
network_tuple: Optional[ThirdPartyInstanceID],
|
||||
|
@ -1292,18 +1304,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
|||
)
|
||||
self.hs.get_notifier().on_new_replication_data()
|
||||
|
||||
async def get_room_count(self) -> int:
|
||||
"""Retrieve the total number of rooms.
|
||||
"""
|
||||
|
||||
def f(txn):
|
||||
sql = "SELECT count(*) FROM rooms"
|
||||
txn.execute(sql)
|
||||
row = txn.fetchone()
|
||||
return row[0] or 0
|
||||
|
||||
return await self.db_pool.runInteraction("get_rooms", f)
|
||||
|
||||
async def add_event_report(
|
||||
self,
|
||||
room_id: str,
|
||||
|
|
|
@ -21,12 +21,7 @@ from synapse.events import EventBase
|
|||
from synapse.events.snapshot import EventContext
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import (
|
||||
LoggingTransaction,
|
||||
SQLBaseStore,
|
||||
db_to_json,
|
||||
make_in_list_sql_clause,
|
||||
)
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.engines import Sqlite3Engine
|
||||
|
@ -60,10 +55,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
# background update still running?
|
||||
self._current_state_events_membership_up_to_date = False
|
||||
|
||||
txn = LoggingTransaction(
|
||||
db_conn.cursor(),
|
||||
name="_check_safe_current_state_events_membership_updated",
|
||||
database_engine=self.database_engine,
|
||||
txn = db_conn.cursor(
|
||||
txn_name="_check_safe_current_state_events_membership_updated"
|
||||
)
|
||||
self._check_safe_current_state_events_membership_updated_txn(txn)
|
||||
txn.close()
|
||||
|
|
|
@ -66,16 +66,15 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
row[8] = bytes(row[8]).decode("utf-8")
|
||||
row[11] = bytes(row[11]).decode("utf-8")
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"""
|
||||
INSERT into pushers2 (
|
||||
id, user_name, access_token, profile_tag, kind,
|
||||
app_id, app_display_name, device_display_name,
|
||||
pushkey, ts, lang, data, last_token, last_success,
|
||||
failing_since
|
||||
) values (%s)"""
|
||||
% (",".join(["?" for _ in range(len(row))]))
|
||||
),
|
||||
"""
|
||||
INSERT into pushers2 (
|
||||
id, user_name, access_token, profile_tag, kind,
|
||||
app_id, app_display_name, device_display_name,
|
||||
pushkey, ts, lang, data, last_token, last_success,
|
||||
failing_since
|
||||
) values (%s)
|
||||
"""
|
||||
% (",".join(["?" for _ in range(len(row))])),
|
||||
row,
|
||||
)
|
||||
count += 1
|
||||
|
|
|
@ -71,8 +71,6 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
" VALUES (?, ?)"
|
||||
)
|
||||
|
||||
sql = database_engine.convert_param_style(sql)
|
||||
|
||||
cur.execute(sql, ("event_search", progress_json))
|
||||
|
||||
|
||||
|
|
|
@ -50,8 +50,6 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
" VALUES (?, ?)"
|
||||
)
|
||||
|
||||
sql = database_engine.convert_param_style(sql)
|
||||
|
||||
cur.execute(sql, ("event_origin_server_ts", progress_json))
|
||||
|
||||
|
||||
|
|
|
@ -59,9 +59,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
|
|||
user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n))
|
||||
for chunk in user_chunks:
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"UPDATE users SET appservice_id = ? WHERE name IN (%s)"
|
||||
% (",".join("?" for _ in chunk),)
|
||||
),
|
||||
"UPDATE users SET appservice_id = ? WHERE name IN (%s)"
|
||||
% (",".join("?" for _ in chunk),),
|
||||
[as_id] + chunk,
|
||||
)
|
||||
|
|
|
@ -65,16 +65,15 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
row = list(row)
|
||||
row[12] = token_to_stream_ordering(row[12])
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"""
|
||||
INSERT into pushers2 (
|
||||
id, user_name, access_token, profile_tag, kind,
|
||||
app_id, app_display_name, device_display_name,
|
||||
pushkey, ts, lang, data, last_stream_ordering, last_success,
|
||||
failing_since
|
||||
) values (%s)"""
|
||||
% (",".join(["?" for _ in range(len(row))]))
|
||||
),
|
||||
"""
|
||||
INSERT into pushers2 (
|
||||
id, user_name, access_token, profile_tag, kind,
|
||||
app_id, app_display_name, device_display_name,
|
||||
pushkey, ts, lang, data, last_stream_ordering, last_success,
|
||||
failing_since
|
||||
) values (%s)
|
||||
"""
|
||||
% (",".join(["?" for _ in range(len(row))])),
|
||||
row,
|
||||
)
|
||||
count += 1
|
||||
|
|
|
@ -55,8 +55,6 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
" VALUES (?, ?)"
|
||||
)
|
||||
|
||||
sql = database_engine.convert_param_style(sql)
|
||||
|
||||
cur.execute(sql, ("event_search_order", progress_json))
|
||||
|
||||
|
||||
|
|
|
@ -50,8 +50,6 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
" VALUES (?, ?)"
|
||||
)
|
||||
|
||||
sql = database_engine.convert_param_style(sql)
|
||||
|
||||
cur.execute(sql, ("event_fields_sender_url", progress_json))
|
||||
|
||||
|
||||
|
|
|
@ -23,8 +23,5 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
|
||||
def run_upgrade(cur, database_engine, *args, **kwargs):
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"UPDATE remote_media_cache SET last_access_ts = ?"
|
||||
),
|
||||
(int(time.time() * 1000),),
|
||||
"UPDATE remote_media_cache SET last_access_ts = ?", (int(time.time() * 1000),),
|
||||
)
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import logging
|
||||
from io import StringIO
|
||||
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.prepare_database import execute_statements_from_stream
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -46,7 +48,4 @@ def run_create(cur, database_engine, *args, **kwargs):
|
|||
select_clause,
|
||||
)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
cur.execute(sql)
|
||||
else:
|
||||
cur.executescript(sql)
|
||||
execute_statements_from_stream(cur, StringIO(sql))
|
||||
|
|
|
@ -68,7 +68,6 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
|
|||
INNER JOIN room_memberships AS r USING (event_id)
|
||||
WHERE type = 'm.room.member' AND state_key LIKE ?
|
||||
"""
|
||||
sql = database_engine.convert_param_style(sql)
|
||||
cur.execute(sql, ("%:" + config.server_name,))
|
||||
|
||||
cur.execute(
|
||||
|
|
|
@ -288,8 +288,6 @@ class UIAuthWorkerStore(SQLBaseStore):
|
|||
)
|
||||
return [(row["user_agent"], row["ip"]) for row in rows]
|
||||
|
||||
|
||||
class UIAuthStore(UIAuthWorkerStore):
|
||||
async def delete_old_ui_auth_sessions(self, expiration_time: int) -> None:
|
||||
"""
|
||||
Remove sessions which were last used earlier than the expiration time.
|
||||
|
@ -339,3 +337,7 @@ class UIAuthStore(UIAuthWorkerStore):
|
|||
iterable=session_ids,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
|
||||
class UIAuthStore(UIAuthWorkerStore):
|
||||
pass
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import imp
|
||||
import logging
|
||||
import os
|
||||
|
@ -24,9 +23,10 @@ from typing import Optional, TextIO
|
|||
import attr
|
||||
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.storage.database import LoggingDatabaseConnection
|
||||
from synapse.storage.engines import BaseDatabaseEngine
|
||||
from synapse.storage.engines.postgres import PostgresEngine
|
||||
from synapse.storage.types import Connection, Cursor
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import Collection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -67,7 +67,7 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = (
|
|||
|
||||
|
||||
def prepare_database(
|
||||
db_conn: Connection,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
database_engine: BaseDatabaseEngine,
|
||||
config: Optional[HomeServerConfig],
|
||||
databases: Collection[str] = ["main", "state"],
|
||||
|
@ -89,7 +89,7 @@ def prepare_database(
|
|||
"""
|
||||
|
||||
try:
|
||||
cur = db_conn.cursor()
|
||||
cur = db_conn.cursor(txn_name="prepare_database")
|
||||
|
||||
# sqlite does not automatically start transactions for DDL / SELECT statements,
|
||||
# so we start one before running anything. This ensures that any upgrades
|
||||
|
@ -258,9 +258,7 @@ def _setup_new_database(cur, database_engine, databases):
|
|||
executescript(cur, entry.absolute_path)
|
||||
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
|
||||
),
|
||||
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)",
|
||||
(max_current_ver, False),
|
||||
)
|
||||
|
||||
|
@ -486,17 +484,13 @@ def _upgrade_existing_database(
|
|||
|
||||
# Mark as done.
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)"
|
||||
),
|
||||
"INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)",
|
||||
(v, relative_path),
|
||||
)
|
||||
|
||||
cur.execute("DELETE FROM schema_version")
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
|
||||
),
|
||||
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)",
|
||||
(v, True),
|
||||
)
|
||||
|
||||
|
@ -532,10 +526,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
|
|||
schemas to be applied
|
||||
"""
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"SELECT file FROM applied_module_schemas WHERE module_name = ?"
|
||||
),
|
||||
(modname,),
|
||||
"SELECT file FROM applied_module_schemas WHERE module_name = ?", (modname,),
|
||||
)
|
||||
applied_deltas = {d for d, in cur}
|
||||
for (name, stream) in names_and_streams:
|
||||
|
@ -553,9 +544,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
|
|||
|
||||
# Mark as done.
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
|
||||
),
|
||||
"INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)",
|
||||
(modname, name),
|
||||
)
|
||||
|
||||
|
@ -627,9 +616,7 @@ def _get_or_create_schema_state(txn, database_engine):
|
|||
|
||||
if current_version:
|
||||
txn.execute(
|
||||
database_engine.convert_param_style(
|
||||
"SELECT file FROM applied_schema_deltas WHERE version >= ?"
|
||||
),
|
||||
"SELECT file FROM applied_schema_deltas WHERE version >= ?",
|
||||
(current_version,),
|
||||
)
|
||||
applied_deltas = [d for d, in txn]
|
||||
|
|
|
@ -61,3 +61,9 @@ class Connection(Protocol):
|
|||
|
||||
def rollback(self, *args, **kwargs) -> None:
|
||||
...
|
||||
|
||||
def __enter__(self) -> "Connection":
|
||||
...
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback) -> bool:
|
||||
...
|
||||
|
|
|
@ -54,7 +54,7 @@ def _load_current_id(db_conn, table, column, step=1):
|
|||
"""
|
||||
# debug logging for https://github.com/matrix-org/synapse/issues/7968
|
||||
logger.info("initialising stream generator for %s(%s)", table, column)
|
||||
cur = db_conn.cursor()
|
||||
cur = db_conn.cursor(txn_name="_load_current_id")
|
||||
if step == 1:
|
||||
cur.execute("SELECT MAX(%s) FROM %s" % (column, table))
|
||||
else:
|
||||
|
@ -269,7 +269,7 @@ class MultiWriterIdGenerator:
|
|||
def _load_current_ids(
|
||||
self, db_conn, table: str, instance_column: str, id_column: str
|
||||
):
|
||||
cur = db_conn.cursor()
|
||||
cur = db_conn.cursor(txn_name="_load_current_ids")
|
||||
|
||||
# Load the current positions of all writers for the stream.
|
||||
if self._writers:
|
||||
|
@ -283,15 +283,12 @@ class MultiWriterIdGenerator:
|
|||
stream_name = ?
|
||||
AND instance_name != ALL(?)
|
||||
"""
|
||||
sql = self._db.engine.convert_param_style(sql)
|
||||
cur.execute(sql, (self._stream_name, self._writers))
|
||||
|
||||
sql = """
|
||||
SELECT instance_name, stream_id FROM stream_positions
|
||||
WHERE stream_name = ?
|
||||
"""
|
||||
sql = self._db.engine.convert_param_style(sql)
|
||||
|
||||
cur.execute(sql, (self._stream_name,))
|
||||
|
||||
self._current_positions = {
|
||||
|
@ -340,8 +337,7 @@ class MultiWriterIdGenerator:
|
|||
"instance": instance_column,
|
||||
"cmp": "<=" if self._positive else ">=",
|
||||
}
|
||||
sql = self._db.engine.convert_param_style(sql)
|
||||
cur.execute(sql, (min_stream_id,))
|
||||
cur.execute(sql, (min_stream_id * self._return_factor,))
|
||||
|
||||
self._persisted_upto_position = min_stream_id
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import logging
|
|||
import threading
|
||||
from typing import Callable, List, Optional
|
||||
|
||||
from synapse.storage.database import LoggingDatabaseConnection
|
||||
from synapse.storage.engines import (
|
||||
BaseDatabaseEngine,
|
||||
IncorrectDatabaseSetup,
|
||||
|
@ -53,7 +54,11 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
|
|||
|
||||
@abc.abstractmethod
|
||||
def check_consistency(
|
||||
self, db_conn: Connection, table: str, id_column: str, positive: bool = True
|
||||
self,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
table: str,
|
||||
id_column: str,
|
||||
positive: bool = True,
|
||||
):
|
||||
"""Should be called during start up to test that the current value of
|
||||
the sequence is greater than or equal to the maximum ID in the table.
|
||||
|
@ -82,9 +87,13 @@ class PostgresSequenceGenerator(SequenceGenerator):
|
|||
return [i for (i,) in txn]
|
||||
|
||||
def check_consistency(
|
||||
self, db_conn: Connection, table: str, id_column: str, positive: bool = True
|
||||
self,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
table: str,
|
||||
id_column: str,
|
||||
positive: bool = True,
|
||||
):
|
||||
txn = db_conn.cursor()
|
||||
txn = db_conn.cursor(txn_name="sequence.check_consistency")
|
||||
|
||||
# First we get the current max ID from the table.
|
||||
table_sql = "SELECT GREATEST(%(agg)s(%(id)s), 0) FROM %(table)s" % {
|
||||
|
|
|
@ -12,13 +12,14 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Any, Callable, List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
import hiredis
|
||||
|
||||
from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime
|
||||
from twisted.internet.protocol import Protocol
|
||||
from twisted.internet.task import LoopingCall
|
||||
from twisted.web.http import HTTPChannel
|
||||
|
||||
|
@ -27,7 +28,7 @@ from synapse.app.generic_worker import (
|
|||
GenericWorkerServer,
|
||||
)
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.http.site import SynapseRequest, SynapseSite
|
||||
from synapse.replication.http import ReplicationRestResource, streams
|
||||
from synapse.replication.tcp.handler import ReplicationCommandHandler
|
||||
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
|
||||
|
@ -197,19 +198,37 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
self.server_factory = ReplicationStreamProtocolFactory(self.hs)
|
||||
self.streamer = self.hs.get_replication_streamer()
|
||||
|
||||
# Fake in memory Redis server that servers can connect to.
|
||||
self._redis_server = FakeRedisPubSubServer()
|
||||
|
||||
store = self.hs.get_datastore()
|
||||
self.database_pool = store.db_pool
|
||||
|
||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||
self.reactor.lookups["localhost"] = "127.0.0.1"
|
||||
|
||||
self._worker_hs_to_resource = {}
|
||||
# A map from a HS instance to the associated HTTP Site to use for
|
||||
# handling inbound HTTP requests to that instance.
|
||||
self._hs_to_site = {self.hs: self.site}
|
||||
|
||||
if self.hs.config.redis.redis_enabled:
|
||||
# Handle attempts to connect to fake redis server.
|
||||
self.reactor.add_tcp_client_callback(
|
||||
"localhost", 6379, self.connect_any_redis_attempts,
|
||||
)
|
||||
|
||||
self.hs.get_tcp_replication().start_replication(self.hs)
|
||||
|
||||
# When we see a connection attempt to the master replication listener we
|
||||
# automatically set up the connection. This is so that tests don't
|
||||
# manually have to go and explicitly set it up each time (plus sometimes
|
||||
# it is impossible to write the handling explicitly in the tests).
|
||||
#
|
||||
# Register the master replication listener:
|
||||
self.reactor.add_tcp_client_callback(
|
||||
"1.2.3.4", 8765, self._handle_http_replication_attempt
|
||||
"1.2.3.4",
|
||||
8765,
|
||||
lambda: self._handle_http_replication_attempt(self.hs, 8765),
|
||||
)
|
||||
|
||||
def create_test_json_resource(self):
|
||||
|
@ -253,28 +272,63 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
**kwargs
|
||||
)
|
||||
|
||||
# If the instance is in the `instance_map` config then workers may try
|
||||
# and send HTTP requests to it, so we register it with
|
||||
# `_handle_http_replication_attempt` like we do with the master HS.
|
||||
instance_name = worker_hs.get_instance_name()
|
||||
instance_loc = worker_hs.config.worker.instance_map.get(instance_name)
|
||||
if instance_loc:
|
||||
# Ensure the host is one that has a fake DNS entry.
|
||||
if instance_loc.host not in self.reactor.lookups:
|
||||
raise Exception(
|
||||
"Host does not have an IP for instance_map[%r].host = %r"
|
||||
% (instance_name, instance_loc.host,)
|
||||
)
|
||||
|
||||
self.reactor.add_tcp_client_callback(
|
||||
self.reactor.lookups[instance_loc.host],
|
||||
instance_loc.port,
|
||||
lambda: self._handle_http_replication_attempt(
|
||||
worker_hs, instance_loc.port
|
||||
),
|
||||
)
|
||||
|
||||
store = worker_hs.get_datastore()
|
||||
store.db_pool._db_pool = self.database_pool._db_pool
|
||||
|
||||
repl_handler = ReplicationCommandHandler(worker_hs)
|
||||
client = ClientReplicationStreamProtocol(
|
||||
worker_hs, "client", "test", self.clock, repl_handler,
|
||||
)
|
||||
server = self.server_factory.buildProtocol(None)
|
||||
# Set up TCP replication between master and the new worker if we don't
|
||||
# have Redis support enabled.
|
||||
if not worker_hs.config.redis_enabled:
|
||||
repl_handler = ReplicationCommandHandler(worker_hs)
|
||||
client = ClientReplicationStreamProtocol(
|
||||
worker_hs, "client", "test", self.clock, repl_handler,
|
||||
)
|
||||
server = self.server_factory.buildProtocol(None)
|
||||
|
||||
client_transport = FakeTransport(server, self.reactor)
|
||||
client.makeConnection(client_transport)
|
||||
client_transport = FakeTransport(server, self.reactor)
|
||||
client.makeConnection(client_transport)
|
||||
|
||||
server_transport = FakeTransport(client, self.reactor)
|
||||
server.makeConnection(server_transport)
|
||||
server_transport = FakeTransport(client, self.reactor)
|
||||
server.makeConnection(server_transport)
|
||||
|
||||
# Set up a resource for the worker
|
||||
resource = ReplicationRestResource(self.hs)
|
||||
resource = ReplicationRestResource(worker_hs)
|
||||
|
||||
for servlet in self.servlets:
|
||||
servlet(worker_hs, resource)
|
||||
|
||||
self._worker_hs_to_resource[worker_hs] = resource
|
||||
self._hs_to_site[worker_hs] = SynapseSite(
|
||||
logger_name="synapse.access.http.fake",
|
||||
site_tag="{}-{}".format(
|
||||
worker_hs.config.server.server_name, worker_hs.get_instance_name()
|
||||
),
|
||||
config=worker_hs.config.server.listeners[0],
|
||||
resource=resource,
|
||||
server_version_string="1",
|
||||
)
|
||||
|
||||
if worker_hs.config.redis.redis_enabled:
|
||||
worker_hs.get_tcp_replication().start_replication(worker_hs)
|
||||
|
||||
return worker_hs
|
||||
|
||||
|
@ -285,7 +339,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
return config
|
||||
|
||||
def render_on_worker(self, worker_hs: HomeServer, request: SynapseRequest):
|
||||
render(request, self._worker_hs_to_resource[worker_hs], self.reactor)
|
||||
render(request, self._hs_to_site[worker_hs].resource, self.reactor)
|
||||
|
||||
def replicate(self):
|
||||
"""Tell the master side of replication that something has happened, and then
|
||||
|
@ -294,9 +348,9 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
self.streamer.on_notifier_poke()
|
||||
self.pump()
|
||||
|
||||
def _handle_http_replication_attempt(self):
|
||||
"""Handles a connection attempt to the master replication HTTP
|
||||
listener.
|
||||
def _handle_http_replication_attempt(self, hs, repl_port):
|
||||
"""Handles a connection attempt to the given HS replication HTTP
|
||||
listener on the given port.
|
||||
"""
|
||||
|
||||
# We should have at least one outbound connection attempt, where the
|
||||
|
@ -305,7 +359,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
self.assertGreaterEqual(len(clients), 1)
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop()
|
||||
self.assertEqual(host, "1.2.3.4")
|
||||
self.assertEqual(port, 8765)
|
||||
self.assertEqual(port, repl_port)
|
||||
|
||||
# Set up client side protocol
|
||||
client_protocol = client_factory.buildProtocol(None)
|
||||
|
@ -315,7 +369,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
# Set up the server side protocol
|
||||
channel = _PushHTTPChannel(self.reactor)
|
||||
channel.requestFactory = request_factory
|
||||
channel.site = self.site
|
||||
channel.site = self._hs_to_site[hs]
|
||||
|
||||
# Connect client to server and vice versa.
|
||||
client_to_server_transport = FakeTransport(
|
||||
|
@ -333,6 +387,32 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
|||
# inside `connecTCP` before the connection has been passed back to the
|
||||
# code that requested the TCP connection.
|
||||
|
||||
def connect_any_redis_attempts(self):
|
||||
"""If redis is enabled we need to deal with workers connecting to a
|
||||
redis server. We don't want to use a real Redis server so we use a
|
||||
fake one.
|
||||
"""
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 1)
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
self.assertEqual(host, "localhost")
|
||||
self.assertEqual(port, 6379)
|
||||
|
||||
client_protocol = client_factory.buildProtocol(None)
|
||||
server_protocol = self._redis_server.buildProtocol(None)
|
||||
|
||||
client_to_server_transport = FakeTransport(
|
||||
server_protocol, self.reactor, client_protocol
|
||||
)
|
||||
client_protocol.makeConnection(client_to_server_transport)
|
||||
|
||||
server_to_client_transport = FakeTransport(
|
||||
client_protocol, self.reactor, server_protocol
|
||||
)
|
||||
server_protocol.makeConnection(server_to_client_transport)
|
||||
|
||||
return client_to_server_transport, server_to_client_transport
|
||||
|
||||
|
||||
class TestReplicationDataHandler(GenericWorkerReplicationHandler):
|
||||
"""Drop-in for ReplicationDataHandler which just collects RDATA rows"""
|
||||
|
@ -467,3 +547,105 @@ class _PullToPushProducer:
|
|||
pass
|
||||
|
||||
self.stopProducing()
|
||||
|
||||
|
||||
class FakeRedisPubSubServer:
|
||||
"""A fake Redis server for pub/sub.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._subscribers = set()
|
||||
|
||||
def add_subscriber(self, conn):
|
||||
"""A connection has called SUBSCRIBE
|
||||
"""
|
||||
self._subscribers.add(conn)
|
||||
|
||||
def remove_subscriber(self, conn):
|
||||
"""A connection has called UNSUBSCRIBE
|
||||
"""
|
||||
self._subscribers.discard(conn)
|
||||
|
||||
def publish(self, conn, channel, msg) -> int:
|
||||
"""A connection want to publish a message to subscribers.
|
||||
"""
|
||||
for sub in self._subscribers:
|
||||
sub.send(["message", channel, msg])
|
||||
|
||||
return len(self._subscribers)
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
return FakeRedisPubSubProtocol(self)
|
||||
|
||||
|
||||
class FakeRedisPubSubProtocol(Protocol):
|
||||
"""A connection from a client talking to the fake Redis server.
|
||||
"""
|
||||
|
||||
def __init__(self, server: FakeRedisPubSubServer):
|
||||
self._server = server
|
||||
self._reader = hiredis.Reader()
|
||||
|
||||
def dataReceived(self, data):
|
||||
self._reader.feed(data)
|
||||
|
||||
# We might get multiple messages in one packet.
|
||||
while True:
|
||||
msg = self._reader.gets()
|
||||
|
||||
if msg is False:
|
||||
# No more messages.
|
||||
return
|
||||
|
||||
if not isinstance(msg, list):
|
||||
# Inbound commands should always be a list
|
||||
raise Exception("Expected redis list")
|
||||
|
||||
self.handle_command(msg[0], *msg[1:])
|
||||
|
||||
def handle_command(self, command, *args):
|
||||
"""Received a Redis command from the client.
|
||||
"""
|
||||
|
||||
# We currently only support pub/sub.
|
||||
if command == b"PUBLISH":
|
||||
channel, message = args
|
||||
num_subscribers = self._server.publish(self, channel, message)
|
||||
self.send(num_subscribers)
|
||||
elif command == b"SUBSCRIBE":
|
||||
(channel,) = args
|
||||
self._server.add_subscriber(self)
|
||||
self.send(["subscribe", channel, 1])
|
||||
else:
|
||||
raise Exception("Unknown command")
|
||||
|
||||
def send(self, msg):
|
||||
"""Send a message back to the client.
|
||||
"""
|
||||
raw = self.encode(msg).encode("utf-8")
|
||||
|
||||
self.transport.write(raw)
|
||||
self.transport.flush()
|
||||
|
||||
def encode(self, obj):
|
||||
"""Encode an object to its Redis format.
|
||||
|
||||
Supports: strings/bytes, integers and list/tuples.
|
||||
"""
|
||||
|
||||
if isinstance(obj, bytes):
|
||||
# We assume bytes are just unicode strings.
|
||||
obj = obj.decode("utf-8")
|
||||
|
||||
if isinstance(obj, str):
|
||||
return "${len}\r\n{str}\r\n".format(len=len(obj), str=obj)
|
||||
if isinstance(obj, int):
|
||||
return ":{val}\r\n".format(val=obj)
|
||||
if isinstance(obj, (list, tuple)):
|
||||
items = "".join(self.encode(a) for a in obj)
|
||||
return "*{len}\r\n{items}".format(len=len(obj), items=items)
|
||||
|
||||
raise Exception("Unrecognized type for encoding redis: %r: %r", type(obj), obj)
|
||||
|
||||
def connectionLost(self, reason):
|
||||
self._server.remove_subscriber(self)
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||
from tests.utils import USE_POSTGRES_FOR_TESTS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
|
||||
"""Checks event persisting sharding works
|
||||
"""
|
||||
|
||||
# Event persister sharding requires postgres (due to needing
|
||||
# `MutliWriterIdGenerator`).
|
||||
if not USE_POSTGRES_FOR_TESTS:
|
||||
skip = "Requires Postgres"
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
# Register a user who sends a message that we'll get notified about
|
||||
self.other_user_id = self.register_user("otheruser", "pass")
|
||||
self.other_access_token = self.login("otheruser", "pass")
|
||||
|
||||
def default_config(self):
|
||||
conf = super().default_config()
|
||||
conf["redis"] = {"enabled": "true"}
|
||||
conf["stream_writers"] = {"events": ["worker1", "worker2"]}
|
||||
conf["instance_map"] = {
|
||||
"worker1": {"host": "testserv", "port": 1001},
|
||||
"worker2": {"host": "testserv", "port": 1002},
|
||||
}
|
||||
return conf
|
||||
|
||||
def test_basic(self):
|
||||
"""Simple test to ensure that multiple rooms can be created and joined,
|
||||
and that different rooms get handled by different instances.
|
||||
"""
|
||||
|
||||
self.make_worker_hs(
|
||||
"synapse.app.generic_worker", {"worker_name": "worker1"},
|
||||
)
|
||||
|
||||
self.make_worker_hs(
|
||||
"synapse.app.generic_worker", {"worker_name": "worker2"},
|
||||
)
|
||||
|
||||
persisted_on_1 = False
|
||||
persisted_on_2 = False
|
||||
|
||||
store = self.hs.get_datastore()
|
||||
|
||||
user_id = self.register_user("user", "pass")
|
||||
access_token = self.login("user", "pass")
|
||||
|
||||
# Keep making new rooms until we see rooms being persisted on both
|
||||
# workers.
|
||||
for _ in range(10):
|
||||
# Create a room
|
||||
room = self.helper.create_room_as(user_id, tok=access_token)
|
||||
|
||||
# The other user joins
|
||||
self.helper.join(
|
||||
room=room, user=self.other_user_id, tok=self.other_access_token
|
||||
)
|
||||
|
||||
# The other user sends some messages
|
||||
rseponse = self.helper.send(room, body="Hi!", tok=self.other_access_token)
|
||||
event_id = rseponse["event_id"]
|
||||
|
||||
# The event position includes which instance persisted the event.
|
||||
pos = self.get_success(store.get_position_for_event(event_id))
|
||||
|
||||
persisted_on_1 |= pos.instance_name == "worker1"
|
||||
persisted_on_2 |= pos.instance_name == "worker2"
|
||||
|
||||
if persisted_on_1 and persisted_on_2:
|
||||
break
|
||||
|
||||
self.assertTrue(persisted_on_1)
|
||||
self.assertTrue(persisted_on_2)
|
|
@ -372,6 +372,10 @@ def setup_test_homeserver(cleanup_func, *args, **kwargs):
|
|||
pool.threadpool = ThreadPool(clock._reactor)
|
||||
pool.running = True
|
||||
|
||||
# We've just changed the Databases to run DB transactions on the same
|
||||
# thread, so we need to disable the dedicated thread behaviour.
|
||||
server.get_datastores().main.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING = False
|
||||
|
||||
return server
|
||||
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
|
|||
# must be done after inserts
|
||||
database = hs.get_datastores().databases[0]
|
||||
self.store = ApplicationServiceStore(
|
||||
database, make_conn(database._database_config, database.engine), hs
|
||||
database, make_conn(database._database_config, database.engine, "test"), hs
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -132,7 +132,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
|||
|
||||
db_config = hs.config.get_single_database()
|
||||
self.store = TestTransactionStore(
|
||||
database, make_conn(db_config, self.engine), hs
|
||||
database, make_conn(db_config, self.engine, "test"), hs
|
||||
)
|
||||
|
||||
def _add_service(self, url, as_token, id):
|
||||
|
@ -448,7 +448,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
|||
|
||||
database = hs.get_datastores().databases[0]
|
||||
ApplicationServiceStore(
|
||||
database, make_conn(database._database_config, database.engine), hs
|
||||
database, make_conn(database._database_config, database.engine, "test"), hs
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -467,7 +467,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
|||
with self.assertRaises(ConfigError) as cm:
|
||||
database = hs.get_datastores().databases[0]
|
||||
ApplicationServiceStore(
|
||||
database, make_conn(database._database_config, database.engine), hs
|
||||
database,
|
||||
make_conn(database._database_config, database.engine, "test"),
|
||||
hs,
|
||||
)
|
||||
|
||||
e = cm.exception
|
||||
|
@ -491,7 +493,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
|||
with self.assertRaises(ConfigError) as cm:
|
||||
database = hs.get_datastores().databases[0]
|
||||
ApplicationServiceStore(
|
||||
database, make_conn(database._database_config, database.engine), hs
|
||||
database,
|
||||
make_conn(database._database_config, database.engine, "test"),
|
||||
hs,
|
||||
)
|
||||
|
||||
e = cm.exception
|
||||
|
|
|
@ -17,7 +17,7 @@ import resource
|
|||
|
||||
import mock
|
||||
|
||||
from synapse.app.homeserver import phone_stats_home
|
||||
from synapse.app.phone_stats_home import phone_stats_home
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
|
|
@ -241,7 +241,7 @@ class HomeserverTestCase(TestCase):
|
|||
# create a site to wrap the resource.
|
||||
self.site = SynapseSite(
|
||||
logger_name="synapse.access.http.fake",
|
||||
site_tag="test",
|
||||
site_tag=self.hs.config.server.server_name,
|
||||
config=self.hs.config.server.listeners[0],
|
||||
resource=self.resource,
|
||||
server_version_string="1",
|
||||
|
|
|
@ -38,6 +38,7 @@ from synapse.http.server import HttpServer
|
|||
from synapse.logging.context import current_context, set_current_context
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.database import LoggingDatabaseConnection
|
||||
from synapse.storage.engines import PostgresEngine, create_engine
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
|
@ -88,6 +89,7 @@ def setupdb():
|
|||
host=POSTGRES_HOST,
|
||||
password=POSTGRES_PASSWORD,
|
||||
)
|
||||
db_conn = LoggingDatabaseConnection(db_conn, db_engine, "tests")
|
||||
prepare_database(db_conn, db_engine, None)
|
||||
db_conn.close()
|
||||
|
||||
|
@ -276,7 +278,7 @@ def setup_test_homeserver(
|
|||
|
||||
hs.setup()
|
||||
if homeserverToUse.__name__ == "TestHomeServer":
|
||||
hs.setup_master()
|
||||
hs.setup_background_tasks()
|
||||
|
||||
if isinstance(db_engine, PostgresEngine):
|
||||
database = hs.get_datastores().databases[0]
|
||||
|
|
Loading…
Reference in New Issue