Add missing type hints to config classes. (#12402)

pull/12438/head
Patrick Cloke 2022-04-11 12:07:23 -04:00 committed by GitHub
parent 214f3b7d21
commit 4586119f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 263 additions and 151 deletions

1
changelog.d/12402.misc Normal file
View File

@ -0,0 +1 @@
Add missing type hints to configuration classes.

View File

@ -105,7 +105,7 @@ disallow_untyped_defs = True
[mypy-synapse.appservice.*]
disallow_untyped_defs = True
[mypy-synapse.config._base]
[mypy-synapse.config.*]
disallow_untyped_defs = True
[mypy-synapse.crypto.*]

View File

@ -138,9 +138,7 @@ def main() -> None:
config_args = parser.parse_args(sys.argv[1:])
config_files = find_config_files(search_paths=config_args.config_path)
config_dict = read_config_files(config_files)
config.parse_config_dict(
config_dict,
)
config.parse_config_dict(config_dict, "", "")
since_ms = time.time() * 1000 - Config.parse_duration(config_args.since)
exclude_users_with_email = config_args.exclude_emails

View File

@ -130,7 +130,7 @@ def start_reactor(
appname: str,
soft_file_limit: int,
gc_thresholds: Optional[Tuple[int, int, int]],
pid_file: str,
pid_file: Optional[str],
daemonize: bool,
print_pidfile: bool,
logger: logging.Logger,
@ -171,6 +171,8 @@ def start_reactor(
# appearing to go backwards.
with PreserveLoggingContext():
if daemonize:
assert pid_file is not None
if print_pidfile:
print(pid_file)

View File

@ -702,10 +702,7 @@ class RootConfig:
return obj
def parse_config_dict(
self,
config_dict: Dict[str, Any],
config_dir_path: Optional[str] = None,
data_dir_path: Optional[str] = None,
self, config_dict: Dict[str, Any], config_dir_path: str, data_dir_path: str
) -> None:
"""Read the information from the config dict into this Config object.

View File

@ -124,10 +124,7 @@ class RootConfig:
@classmethod
def invoke_all_static(cls, func_name: str, *args: Any, **kwargs: Any) -> None: ...
def parse_config_dict(
self,
config_dict: Dict[str, Any],
config_dir_path: Optional[str] = ...,
data_dir_path: Optional[str] = ...,
self, config_dict: Dict[str, Any], config_dir_path: str, data_dir_path: str
) -> None: ...
def generate_config(
self,

View File

@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any
from synapse.config._base import Config, ConfigError
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@ -29,7 +31,7 @@ https://matrix-org.github.io/synapse/latest/templates.html
class AccountValidityConfig(Config):
section = "account_validity"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"""Parses the old account validity config. The config format looks like this:
account_validity:

View File

@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import Iterable
from typing import Any, Iterable
from synapse.api.constants import EventTypes
from synapse.config._base import Config, ConfigError
@ -26,12 +26,12 @@ logger = logging.getLogger(__name__)
class ApiConfig(Config):
section = "api"
def read_config(self, config: JsonDict, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
validate_config(_MAIN_SCHEMA, config, ())
self.room_prejoin_state = list(self._get_prejoin_state_types(config))
self.track_puppeted_user_ips = config.get("track_puppeted_user_ips", False)
def generate_config_section(cls, **kwargs) -> str:
def generate_config_section(cls, **kwargs: Any) -> str:
formatted_default_state_types = "\n".join(
" # - %s" % (t,) for t in _DEFAULT_PREJOIN_STATE_TYPES
)

View File

@ -14,7 +14,7 @@
# limitations under the License.
import logging
from typing import Dict, List
from typing import Any, Dict, List
from urllib import parse as urlparse
import yaml
@ -31,12 +31,12 @@ logger = logging.getLogger(__name__)
class AppServiceConfig(Config):
section = "appservice"
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.app_service_config_files = config.get("app_service_config_files", [])
self.notify_appservices = config.get("notify_appservices", True)
self.track_appservice_user_ips = config.get("track_appservice_user_ips", False)
def generate_config_section(cls, **kwargs) -> str:
def generate_config_section(cls, **kwargs: Any) -> str:
return """\
# A list of application service config files to use
#

View File

@ -12,6 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from ._base import Config
@ -21,7 +24,7 @@ class AuthConfig(Config):
section = "auth"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
password_config = config.get("password_config", {})
if password_config is None:
password_config = {}
@ -40,7 +43,7 @@ class AuthConfig(Config):
ui_auth.get("session_timeout", 0)
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
password_config:
# Uncomment to disable password login

View File

@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from ._base import Config
@ -18,7 +21,7 @@ from ._base import Config
class BackgroundUpdateConfig(Config):
section = "background_updates"
def generate_config_section(self, **kwargs) -> str:
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## Background Updates ##
@ -52,7 +55,7 @@ class BackgroundUpdateConfig(Config):
#default_batch_size: 50
"""
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
bg_update_config = config.get("background_updates") or {}
self.update_duration_ms = bg_update_config.get(

View File

@ -16,10 +16,11 @@ import logging
import os
import re
import threading
from typing import Callable, Dict, Optional
from typing import Any, Callable, Dict, Optional
import attr
from synapse.types import JsonDict
from synapse.util.check_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
@ -105,7 +106,7 @@ class CacheConfig(Config):
with _CACHES_LOCK:
_CACHES.clear()
def generate_config_section(self, **kwargs) -> str:
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## Caching ##
@ -172,7 +173,7 @@ class CacheConfig(Config):
#sync_response_cache_duration: 2m
"""
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.event_cache_size = self.parse_size(
config.get("event_cache_size", _DEFAULT_EVENT_CACHE_SIZE)
)

View File

@ -12,15 +12,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
from typing import Any
from synapse.types import JsonDict
from ._base import Config, ConfigError
class CaptchaConfig(Config):
section = "captcha"
def read_config(self, config, **kwargs):
self.recaptcha_private_key = config.get("recaptcha_private_key")
self.recaptcha_public_key = config.get("recaptcha_public_key")
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
recaptcha_private_key = config.get("recaptcha_private_key")
if recaptcha_private_key is not None and not isinstance(
recaptcha_private_key, str
):
raise ConfigError("recaptcha_private_key must be a string.")
self.recaptcha_private_key = recaptcha_private_key
recaptcha_public_key = config.get("recaptcha_public_key")
if recaptcha_public_key is not None and not isinstance(
recaptcha_public_key, str
):
raise ConfigError("recaptcha_public_key must be a string.")
self.recaptcha_public_key = recaptcha_public_key
self.enable_registration_captcha = config.get(
"enable_registration_captcha", False
)
@ -30,7 +46,7 @@ class CaptchaConfig(Config):
)
self.recaptcha_template = self.read_template("recaptcha.html")
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## Captcha ##
# See docs/CAPTCHA_SETUP.md for full details of configuring this.

View File

@ -16,6 +16,7 @@
from typing import Any, List
from synapse.config.sso import SsoAttributeRequirement
from synapse.types import JsonDict
from ._base import Config
from ._util import validate_config
@ -29,7 +30,7 @@ class CasConfig(Config):
section = "cas"
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
cas_config = config.get("cas_config", None)
self.cas_enabled = cas_config and cas_config.get("enabled", True)
@ -52,7 +53,7 @@ class CasConfig(Config):
self.cas_displayname_attribute = None
self.cas_required_attributes = []
def generate_config_section(self, config_dir_path, server_name, **kwargs) -> str:
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# Enable Central Authentication Service (CAS) for registration and login.
#

View File

@ -13,9 +13,10 @@
# limitations under the License.
from os import path
from typing import Optional
from typing import Any, Optional
from synapse.config import ConfigError
from synapse.types import JsonDict
from ._base import Config
@ -76,18 +77,18 @@ class ConsentConfig(Config):
section = "consent"
def __init__(self, *args):
def __init__(self, *args: Any):
super().__init__(*args)
self.user_consent_version: Optional[str] = None
self.user_consent_template_dir: Optional[str] = None
self.user_consent_server_notice_content = None
self.user_consent_server_notice_content: Optional[JsonDict] = None
self.user_consent_server_notice_to_guests = False
self.block_events_without_consent_error = None
self.block_events_without_consent_error: Optional[str] = None
self.user_consent_at_registration = False
self.user_consent_policy_name = "Privacy Policy"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
consent_config = config.get("user_consent")
self.terms_template = self.read_template("terms.html")
@ -118,5 +119,5 @@ class ConsentConfig(Config):
"policy_name", "Privacy Policy"
)
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return DEFAULT_CONFIG

View File

@ -15,8 +15,10 @@
import argparse
import logging
import os
from typing import Any, List
from synapse.config._base import Config, ConfigError
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@ -121,12 +123,12 @@ class DatabaseConnectionConfig:
class DatabaseConfig(Config):
section = "database"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, *args: Any):
super().__init__(*args)
self.databases = []
self.databases: List[DatabaseConnectionConfig] = []
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# We *experimentally* support specifying multiple databases via the
# `databases` key. This is a map from a label to database config in the
# same format as the `database` config option, plus an extra
@ -170,7 +172,7 @@ class DatabaseConfig(Config):
self.databases = [DatabaseConnectionConfig("master", database_config)]
self.set_databasepath(database_path)
def generate_config_section(self, data_dir_path, **kwargs) -> str:
def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
return DEFAULT_CONFIG % {
"database_path": os.path.join(data_dir_path, "homeserver.db")
}

View File

@ -19,9 +19,12 @@ import email.utils
import logging
import os
from enum import Enum
from typing import Any
import attr
from synapse.types import JsonDict
from ._base import Config, ConfigError
logger = logging.getLogger(__name__)
@ -73,7 +76,7 @@ class EmailSubjectConfig:
class EmailConfig(Config):
section = "email"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# TODO: We should separate better the email configuration from the notification
# and account validity config.
@ -354,7 +357,7 @@ class EmailConfig(Config):
path=("email", "invite_client_location"),
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return (
"""\
# Configuration for sending emails from Synapse.

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.config._base import Config
from synapse.types import JsonDict
@ -21,7 +23,7 @@ class ExperimentalConfig(Config):
section = "experimental"
def read_config(self, config: JsonDict, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
experimental = config.get("experimental_features") or {}
# MSC3440 (thread relation)

View File

@ -11,16 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from typing import Any, Optional
from synapse.config._base import Config
from synapse.config._util import validate_config
from synapse.types import JsonDict
class FederationConfig(Config):
section = "federation"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist: Optional[dict] = None
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
@ -48,7 +49,7 @@ class FederationConfig(Config):
"allow_device_name_lookup_over_federation", True
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## Federation ##

View File

@ -12,17 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from ._base import Config
class GroupsConfig(Config):
section = "groups"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.enable_group_creation = config.get("enable_group_creation", False)
self.group_creation_prefix = config.get("group_creation_prefix", "")
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# Uncomment to allow non-server-admin users to create groups on this server
#

View File

@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from ._base import Config, ConfigError
MISSING_JWT = """Missing jwt library. This is required for jwt login.
@ -24,7 +28,7 @@ MISSING_JWT = """Missing jwt library. This is required for jwt login.
class JWTConfig(Config):
section = "jwt"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
jwt_config = config.get("jwt_config", None)
if jwt_config:
self.jwt_enabled = jwt_config.get("enabled", False)
@ -52,7 +56,7 @@ class JWTConfig(Config):
self.jwt_issuer = None
self.jwt_audiences = None
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# JSON web token integration. The following settings can be used to make
# Synapse JSON web tokens for authentication, instead of its internal

View File

@ -99,11 +99,14 @@ class TrustedKeyServer:
class KeyConfig(Config):
section = "key"
def read_config(self, config, config_dir_path, **kwargs):
def read_config(
self, config: JsonDict, config_dir_path: str, **kwargs: Any
) -> None:
# the signing key can be specified inline or in a separate file
if "signing_key" in config:
self.signing_key = read_signing_keys([config["signing_key"]])
else:
assert config_dir_path is not None
signing_key_path = config.get("signing_key_path")
if signing_key_path is None:
signing_key_path = os.path.join(
@ -172,8 +175,12 @@ class KeyConfig(Config):
self.form_secret = config.get("form_secret", None)
def generate_config_section(
self, config_dir_path, server_name, generate_secrets=False, **kwargs
):
self,
config_dir_path: str,
server_name: str,
generate_secrets: bool = False,
**kwargs: Any,
) -> str:
base_key_name = os.path.join(config_dir_path, server_name)
if generate_secrets:

View File

@ -35,6 +35,7 @@ from twisted.logger import (
from synapse.logging.context import LoggingContextFilter
from synapse.logging.filter import MetadataFilter
from synapse.types import JsonDict
from ._base import Config, ConfigError
@ -147,13 +148,15 @@ https://matrix-org.github.io/synapse/v1.54/structured_logging.html
class LoggingConfig(Config):
section = "logging"
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
if config.get("log_file"):
raise ConfigError(LOG_FILE_ERROR)
self.log_config = self.abspath(config.get("log_config"))
self.no_redirect_stdio = config.get("no_redirect_stdio", False)
def generate_config_section(self, config_dir_path, server_name, **kwargs) -> str:
def generate_config_section(
self, config_dir_path: str, server_name: str, **kwargs: Any
) -> str:
log_config = os.path.join(config_dir_path, server_name + ".log.config")
return (
"""\

View File

@ -13,8 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Optional
import attr
from synapse.types import JsonDict
from synapse.util.check_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
@ -37,7 +40,7 @@ class MetricsFlags:
class MetricsConfig(Config):
section = "metrics"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.enable_metrics = config.get("enable_metrics", False)
self.report_stats = config.get("report_stats", None)
self.report_stats_endpoint = config.get(
@ -67,7 +70,9 @@ class MetricsConfig(Config):
"sentry.dsn field is required when sentry integration is enabled"
)
def generate_config_section(self, report_stats=None, **kwargs):
def generate_config_section(
self, report_stats: Optional[bool] = None, **kwargs: Any
) -> str:
res = """\
## Metrics ###

View File

@ -14,13 +14,14 @@
from typing import Any, Dict, List, Tuple
from synapse.config._base import Config, ConfigError
from synapse.types import JsonDict
from synapse.util.module_loader import load_module
class ModulesConfig(Config):
section = "modules"
def read_config(self, config: dict, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.loaded_modules: List[Tuple[Any, Dict]] = []
configured_modules = config.get("modules") or []
@ -31,7 +32,7 @@ class ModulesConfig(Config):
self.loaded_modules.append(load_module(module, config_path))
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """
## Modules ##

View File

@ -40,7 +40,7 @@ class OembedConfig(Config):
section = "oembed"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
oembed_config: Dict[str, Any] = config.get("oembed") or {}
# A list of patterns which will be used.
@ -143,7 +143,7 @@ class OembedConfig(Config):
)
return re.compile(pattern)
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# oEmbed allows for easier embedding content from a website. It can be
# used for generating URLs previews of services which support it.

View File

@ -36,7 +36,7 @@ LEGACY_USER_MAPPING_PROVIDER = "synapse.handlers.oidc_handler.JinjaOidcMappingPr
class OIDCConfig(Config):
section = "oidc"
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.oidc_providers = tuple(_parse_oidc_provider_configs(config))
if not self.oidc_providers:
return
@ -66,7 +66,7 @@ class OIDCConfig(Config):
# OIDC is enabled if we have a provider
return bool(self.oidc_providers)
def generate_config_section(self, config_dir_path, server_name, **kwargs) -> str:
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# List of OpenID Connect (OIDC) / OAuth 2.0 identity providers, for registration
# and login.

View File

@ -14,6 +14,7 @@
from typing import Any, List, Tuple, Type
from synapse.types import JsonDict
from synapse.util.module_loader import load_module
from ._base import Config
@ -24,7 +25,7 @@ LDAP_PROVIDER = "ldap_auth_provider.LdapAuthProvider"
class PasswordAuthProviderConfig(Config):
section = "authproviders"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"""Parses the old password auth providers config. The config format looks like this:
password_providers:

View File

@ -13,13 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from ._base import Config
class PushConfig(Config):
section = "push"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
push_config = config.get("push") or {}
self.push_include_content = push_config.get("include_content", True)
self.push_group_unread_count_by_room = push_config.get(
@ -46,7 +50,7 @@ class PushConfig(Config):
)
self.push_include_content = not redact_content
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """
## Push ##

View File

@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Optional
from typing import Any, Dict, Optional
import attr
from synapse.types import JsonDict
from ._base import Config
@ -43,7 +45,7 @@ class FederationRateLimitConfig:
class RatelimitConfig(Config):
section = "ratelimiting"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# Load the new-style messages config if it exists. Otherwise fall back
# to the old method.
@ -142,7 +144,7 @@ class RatelimitConfig(Config):
},
)
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## Ratelimiting ##

View File

@ -12,14 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.config._base import Config
from synapse.types import JsonDict
from synapse.util.check_dependencies import check_requirements
class RedisConfig(Config):
section = "redis"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
redis_config = config.get("redis") or {}
self.redis_enabled = redis_config.get("enabled", False)
@ -32,7 +35,7 @@ class RedisConfig(Config):
self.redis_port = redis_config.get("port", 6379)
self.redis_password = redis_config.get("password")
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# Configuration for Redis when using workers. This *must* be enabled when
# using workers (unless using old style direct TCP configuration).

View File

@ -13,18 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from typing import Optional
from typing import Any, Optional
from synapse.api.constants import RoomCreationPreset
from synapse.config._base import Config, ConfigError
from synapse.types import RoomAlias, UserID
from synapse.types import JsonDict, RoomAlias, UserID
from synapse.util.stringutils import random_string_with_symbols, strtobool
class RegistrationConfig(Config):
section = "registration"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.enable_registration = strtobool(
str(config.get("enable_registration", False))
)
@ -196,7 +196,9 @@ class RegistrationConfig(Config):
self.inhibit_user_in_use_error = config.get("inhibit_user_in_use_error", False)
def generate_config_section(self, generate_secrets=False, **kwargs):
def generate_config_section(
self, generate_secrets: bool = False, **kwargs: Any
) -> str:
if generate_secrets:
registration_shared_secret = 'registration_shared_secret: "%s"' % (
random_string_with_symbols(50),

View File

@ -14,7 +14,7 @@
import logging
import os
from typing import Dict, List, Tuple
from typing import Any, Dict, List, Tuple
from urllib.request import getproxies_environment # type: ignore
import attr
@ -94,7 +94,7 @@ def parse_thumbnail_requirements(
class ContentRepositoryConfig(Config):
section = "media"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# Only enable the media repo if either the media repo is enabled or the
# current worker app is the media repo.
@ -223,7 +223,8 @@ class ContentRepositoryConfig(Config):
"url_preview_accept_language"
) or ["en"]
def generate_config_section(self, data_dir_path, **kwargs):
def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
assert data_dir_path is not None
media_store = os.path.join(data_dir_path, "media_store")
formatted_thumbnail_sizes = "".join(

View File

@ -13,11 +13,12 @@
# limitations under the License.
import logging
from typing import List, Optional
from typing import Any, List, Optional
import attr
from synapse.config._base import Config, ConfigError
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@ -34,7 +35,7 @@ class RetentionPurgeJob:
class RetentionConfig(Config):
section = "retention"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
retention_config = config.get("retention")
if retention_config is None:
retention_config = {}
@ -153,7 +154,7 @@ class RetentionConfig(Config):
RetentionPurgeJob(self.parse_duration("1d"), None, None)
]
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# Message retention policy at the server level.
#

View File

@ -13,8 +13,10 @@
# limitations under the License.
import logging
from typing import Any
from synapse.api.constants import RoomCreationPreset
from synapse.types import JsonDict
from ._base import Config, ConfigError
@ -32,7 +34,7 @@ class RoomDefaultEncryptionTypes:
class RoomConfig(Config):
section = "room"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# Whether new, locally-created rooms should have encryption enabled
encryption_for_room_type = config.get(
"encryption_enabled_by_default_for_room_type",
@ -61,7 +63,7 @@ class RoomConfig(Config):
"Invalid value for encryption_enabled_by_default_for_room_type"
)
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## Rooms ##

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from typing import Any, List
from matrix_common.regex import glob_to_regex
@ -25,7 +25,7 @@ from ._base import Config, ConfigError
class RoomDirectoryConfig(Config):
section = "roomdirectory"
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.enable_room_list_search = config.get("enable_room_list_search", True)
alias_creation_rules = config.get("alias_creation_rules")
@ -52,7 +52,7 @@ class RoomDirectoryConfig(Config):
_RoomDirectoryRule("room_list_publication_rules", {"action": "allow"})
]
def generate_config_section(self, config_dir_path, server_name, **kwargs) -> str:
def generate_config_section(self, **kwargs: Any) -> str:
return """
# Uncomment to disable searching the public room list. When disabled
# blocks searching local and remote room lists for local and remote

View File

@ -65,7 +65,7 @@ def _dict_merge(merge_dict: dict, into_dict: dict) -> None:
class SAML2Config(Config):
section = "saml2"
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.saml2_enabled = False
saml2_config = config.get("saml2_config")
@ -165,13 +165,13 @@ class SAML2Config(Config):
config_path = saml2_config.get("config_path", None)
if config_path is not None:
mod = load_python_module(config_path)
config = getattr(mod, "CONFIG", None)
if config is None:
config_dict_from_file = getattr(mod, "CONFIG", None)
if config_dict_from_file is None:
raise ConfigError(
"Config path specified by saml2_config.config_path does not "
"have a CONFIG property."
)
_dict_merge(merge_dict=config, into_dict=saml2_config_dict)
_dict_merge(merge_dict=config_dict_from_file, into_dict=saml2_config_dict)
import saml2.config
@ -223,7 +223,7 @@ class SAML2Config(Config):
},
}
def generate_config_section(self, config_dir_path, server_name, **kwargs) -> str:
def generate_config_section(self, config_dir_path: str, **kwargs: Any) -> str:
return """\
## Single sign-on integration ##

View File

@ -248,7 +248,7 @@ class LimitRemoteRoomsConfig:
class ServerConfig(Config):
section = "server"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.server_name = config["server_name"]
self.server_context = config.get("server_context", None)
@ -259,8 +259,8 @@ class ServerConfig(Config):
self.pid_file = self.abspath(config.get("pid_file"))
self.soft_file_limit = config.get("soft_file_limit", 0)
self.daemonize = config.get("daemonize")
self.print_pidfile = config.get("print_pidfile")
self.daemonize = bool(config.get("daemonize"))
self.print_pidfile = bool(config.get("print_pidfile"))
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.serve_server_wellknown = config.get("serve_server_wellknown", False)
@ -697,13 +697,13 @@ class ServerConfig(Config):
def generate_config_section(
self,
server_name,
data_dir_path,
open_private_ports,
listeners,
config_dir_path,
**kwargs,
):
config_dir_path: str,
data_dir_path: str,
server_name: str,
open_private_ports: bool,
listeners: Optional[List[dict]],
**kwargs: Any,
) -> str:
ip_range_blacklist = "\n".join(
" # - '%s'" % ip for ip in DEFAULT_IP_RANGE_BLACKLIST
)

View File

@ -11,7 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.types import UserID
from typing import Any, Optional
from synapse.types import JsonDict, UserID
from ._base import Config
@ -60,14 +63,14 @@ class ServerNoticesConfig(Config):
section = "servernotices"
def __init__(self, *args):
def __init__(self, *args: Any):
super().__init__(*args)
self.server_notices_mxid = None
self.server_notices_mxid_display_name = None
self.server_notices_mxid_avatar_url = None
self.server_notices_room_name = None
self.server_notices_mxid: Optional[str] = None
self.server_notices_mxid_display_name: Optional[str] = None
self.server_notices_mxid_avatar_url: Optional[str] = None
self.server_notices_room_name: Optional[str] = None
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
c = config.get("server_notices")
if c is None:
return
@ -81,5 +84,5 @@ class ServerNoticesConfig(Config):
# todo: i18n
self.server_notices_room_name = c.get("room_name", "Server Notices")
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return DEFAULT_CONFIG

View File

@ -16,6 +16,7 @@ import logging
from typing import Any, Dict, List, Tuple
from synapse.config import ConfigError
from synapse.types import JsonDict
from synapse.util.module_loader import load_module
from ._base import Config
@ -33,7 +34,7 @@ see https://matrix-org.github.io/synapse/latest/modules/index.html
class SpamCheckerConfig(Config):
section = "spamchecker"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.spam_checkers: List[Tuple[Any, Dict]] = []
spam_checkers = config.get("spam_checker") or []

View File

@ -16,6 +16,8 @@ from typing import Any, Dict, Optional
import attr
from synapse.types import JsonDict
from ._base import Config
logger = logging.getLogger(__name__)
@ -49,7 +51,7 @@ class SSOConfig(Config):
section = "sso"
def read_config(self, config, **kwargs) -> None:
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
sso_config: Dict[str, Any] = config.get("sso") or {}
# The sso-specific template_dir
@ -106,7 +108,7 @@ class SSOConfig(Config):
)
self.sso_client_whitelist.append(login_fallback_url)
def generate_config_section(self, **kwargs) -> str:
def generate_config_section(self, **kwargs: Any) -> str:
return """\
# Additional settings to use with single-sign on systems such as OpenID Connect,
# SAML2 and CAS.

View File

@ -13,6 +13,9 @@
# limitations under the License.
import logging
from typing import Any
from synapse.types import JsonDict
from ._base import Config
@ -36,7 +39,7 @@ class StatsConfig(Config):
section = "stats"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.stats_enabled = True
stats_config = config.get("stats", None)
if stats_config:
@ -44,7 +47,7 @@ class StatsConfig(Config):
if not self.stats_enabled:
logger.warning(ROOM_STATS_DISABLED_WARN)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """
# Settings for local room and user statistics collection. See
# https://matrix-org.github.io/synapse/latest/room_and_user_statistics.html.

View File

@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from synapse.util.module_loader import load_module
from ._base import Config
@ -20,7 +23,7 @@ from ._base import Config
class ThirdPartyRulesConfig(Config):
section = "thirdpartyrules"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.third_party_event_rules = None
provider = config.get("third_party_event_rules", None)

View File

@ -14,7 +14,7 @@
import logging
import os
from typing import List, Optional, Pattern
from typing import Any, List, Optional, Pattern
from matrix_common.regex import glob_to_regex
@ -22,6 +22,7 @@ from OpenSSL import SSL, crypto
from twisted.internet._sslverify import Certificate, trustRootFromCertificates
from synapse.config._base import Config, ConfigError
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@ -29,7 +30,7 @@ logger = logging.getLogger(__name__)
class TlsConfig(Config):
section = "tls"
def read_config(self, config: dict, config_dir_path: str, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.tls_certificate_file = self.abspath(config.get("tls_certificate_path"))
self.tls_private_key_file = self.abspath(config.get("tls_private_key_path"))
@ -142,13 +143,13 @@ class TlsConfig(Config):
def generate_config_section(
self,
config_dir_path,
server_name,
data_dir_path,
tls_certificate_path,
tls_private_key_path,
**kwargs,
):
config_dir_path: str,
data_dir_path: str,
server_name: str,
tls_certificate_path: Optional[str],
tls_private_key_path: Optional[str],
**kwargs: Any,
) -> str:
"""If the TLS paths are not specified the default will be certs in the
config directory"""

View File

@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Set
from typing import Any, Set
from synapse.types import JsonDict
from synapse.util.check_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
@ -22,7 +23,7 @@ from ._base import Config, ConfigError
class TracerConfig(Config):
section = "tracing"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
opentracing_config = config.get("opentracing")
if opentracing_config is None:
opentracing_config = {}
@ -65,7 +66,7 @@ class TracerConfig(Config):
)
self.force_tracing_for_users.add(u)
def generate_config_section(cls, **kwargs):
def generate_config_section(cls, **kwargs: Any) -> str:
return """\
## Opentracing ##

View File

@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from ._base import Config
@ -22,7 +26,7 @@ class UserDirectoryConfig(Config):
section = "userdirectory"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
user_directory_config = config.get("user_directory") or {}
self.user_directory_search_enabled = user_directory_config.get("enabled", True)
self.user_directory_search_all_users = user_directory_config.get(
@ -32,7 +36,7 @@ class UserDirectoryConfig(Config):
"prefer_local_users", False
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """
# User Directory configuration
#

View File

@ -12,13 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from synapse.types import JsonDict
from ._base import Config
class VoipConfig(Config):
section = "voip"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.turn_uris = config.get("turn_uris", [])
self.turn_shared_secret = config.get("turn_shared_secret")
self.turn_username = config.get("turn_username")
@ -28,7 +32,7 @@ class VoipConfig(Config):
)
self.turn_allow_guests = config.get("turn_allow_guests", True)
def generate_config_section(self, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## TURN ##

View File

@ -14,10 +14,12 @@
# limitations under the License.
import argparse
from typing import List, Union
from typing import Any, List, Union
import attr
from synapse.types import JsonDict
from ._base import (
Config,
ConfigError,
@ -110,7 +112,7 @@ class WorkerConfig(Config):
section = "worker"
def read_config(self, config, **kwargs):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.worker_app = config.get("worker_app")
# Canonicalise worker_app so that master always has None
@ -120,9 +122,13 @@ class WorkerConfig(Config):
self.worker_listeners = [
parse_listener_def(x) for x in config.get("worker_listeners", [])
]
self.worker_daemonize = config.get("worker_daemonize")
self.worker_daemonize = bool(config.get("worker_daemonize"))
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_config = config.get("worker_log_config")
worker_log_config = config.get("worker_log_config")
if worker_log_config is not None and not isinstance(worker_log_config, str):
raise ConfigError("worker_log_config must be a string")
self.worker_log_config = worker_log_config
# The host used to connect to the main synapse
self.worker_replication_host = config.get("worker_replication_host", None)
@ -290,7 +296,7 @@ class WorkerConfig(Config):
self.worker_name is None and background_tasks_instance == "master"
) or self.worker_name == background_tasks_instance
def generate_config_section(self, config_dir_path, server_name, **kwargs):
def generate_config_section(self, **kwargs: Any) -> str:
return """\
## Workers ##

View File

@ -107,6 +107,8 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
# TODO: get this from the homeserver rather than creating a new one for
# each request
try:
assert self._secret is not None
resp_body = await self._http_client.post_urlencoded_get_json(
self._url,
args={

View File

@ -37,7 +37,9 @@ class RegistrationConfigTestCase(ConfigFileTestCase):
"session_lifetime": "30m",
"nonrefreshable_access_token_lifetime": "31m",
**config_dict,
}
},
"",
"",
)
with self.assertRaises(ConfigError):
@ -46,7 +48,9 @@ class RegistrationConfigTestCase(ConfigFileTestCase):
"session_lifetime": "30m",
"refreshable_access_token_lifetime": "31m",
**config_dict,
}
},
"",
"",
)
with self.assertRaises(ConfigError):
@ -55,7 +59,9 @@ class RegistrationConfigTestCase(ConfigFileTestCase):
"session_lifetime": "30m",
"refresh_token_lifetime": "31m",
**config_dict,
}
},
"",
"",
)
# Then test all the fine conditions
@ -64,7 +70,9 @@ class RegistrationConfigTestCase(ConfigFileTestCase):
"session_lifetime": "31m",
"nonrefreshable_access_token_lifetime": "31m",
**config_dict,
}
},
"",
"",
)
HomeServerConfig().parse_config_dict(
@ -72,11 +80,15 @@ class RegistrationConfigTestCase(ConfigFileTestCase):
"session_lifetime": "31m",
"refreshable_access_token_lifetime": "31m",
**config_dict,
}
},
"",
"",
)
HomeServerConfig().parse_config_dict(
{"session_lifetime": "31m", "refresh_token_lifetime": "31m", **config_dict}
{"session_lifetime": "31m", "refresh_token_lifetime": "31m", **config_dict},
"",
"",
)
def test_refuse_to_start_if_open_registration_and_no_verification(self):

View File

@ -35,7 +35,7 @@ class ServerConfigTestCase(unittest.TestCase):
def test_unsecure_listener_no_listeners_open_private_ports_false(self):
conf = yaml.safe_load(
ServerConfig().generate_config_section(
"che.org", "/data_dir_path", False, None, config_dir_path="CONFDIR"
"CONFDIR", "/data_dir_path", "che.org", False, None
)
)
@ -55,7 +55,7 @@ class ServerConfigTestCase(unittest.TestCase):
def test_unsecure_listener_no_listeners_open_private_ports_true(self):
conf = yaml.safe_load(
ServerConfig().generate_config_section(
"che.org", "/data_dir_path", True, None, config_dir_path="CONFDIR"
"CONFDIR", "/data_dir_path", "che.org", True, None
)
)
@ -89,7 +89,7 @@ class ServerConfigTestCase(unittest.TestCase):
conf = yaml.safe_load(
ServerConfig().generate_config_section(
"this.one.listens", "/data_dir_path", True, listeners, "CONFDIR"
"CONFDIR", "/data_dir_path", "this.one.listens", True, listeners
)
)
@ -123,7 +123,7 @@ class ServerConfigTestCase(unittest.TestCase):
conf = yaml.safe_load(
ServerConfig().generate_config_section(
"this.one.listens", "/data_dir_path", True, listeners, "CONFDIR"
"CONFDIR", "/data_dir_path", "this.one.listens", True, listeners
)
)