Enable passing typing stream writers as a list. (#11237)
This makes the typing stream writer config match the other stream writers that only currently support a single worker.pull/11240/head
parent
2735b3e6f2
commit
af54167516
|
@ -0,0 +1 @@
|
||||||
|
Allow `stream_writers.typing` config to be a list of one worker.
|
|
@ -63,7 +63,8 @@ class WriterLocations:
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
events: The instances that write to the event and backfill streams.
|
events: The instances that write to the event and backfill streams.
|
||||||
typing: The instance that writes to the typing stream.
|
typing: The instances that write to the typing stream. Currently
|
||||||
|
can only be a single instance.
|
||||||
to_device: The instances that write to the to_device stream. Currently
|
to_device: The instances that write to the to_device stream. Currently
|
||||||
can only be a single instance.
|
can only be a single instance.
|
||||||
account_data: The instances that write to the account data streams. Currently
|
account_data: The instances that write to the account data streams. Currently
|
||||||
|
@ -75,9 +76,15 @@ class WriterLocations:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
events = attr.ib(
|
events = attr.ib(
|
||||||
default=["master"], type=List[str], converter=_instance_to_list_converter
|
default=["master"],
|
||||||
|
type=List[str],
|
||||||
|
converter=_instance_to_list_converter,
|
||||||
|
)
|
||||||
|
typing = attr.ib(
|
||||||
|
default=["master"],
|
||||||
|
type=List[str],
|
||||||
|
converter=_instance_to_list_converter,
|
||||||
)
|
)
|
||||||
typing = attr.ib(default="master", type=str)
|
|
||||||
to_device = attr.ib(
|
to_device = attr.ib(
|
||||||
default=["master"],
|
default=["master"],
|
||||||
type=List[str],
|
type=List[str],
|
||||||
|
@ -217,6 +224,11 @@ class WorkerConfig(Config):
|
||||||
% (instance, stream)
|
% (instance, stream)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if len(self.writers.typing) != 1:
|
||||||
|
raise ConfigError(
|
||||||
|
"Must only specify one instance to handle `typing` messages."
|
||||||
|
)
|
||||||
|
|
||||||
if len(self.writers.to_device) != 1:
|
if len(self.writers.to_device) != 1:
|
||||||
raise ConfigError(
|
raise ConfigError(
|
||||||
"Must only specify one instance to handle `to_device` messages."
|
"Must only specify one instance to handle `to_device` messages."
|
||||||
|
|
|
@ -1232,10 +1232,6 @@ class FederationHandlerRegistry:
|
||||||
|
|
||||||
self.query_handlers[query_type] = handler
|
self.query_handlers[query_type] = handler
|
||||||
|
|
||||||
def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
|
|
||||||
"""Register that the EDU handler is on a different instance than master."""
|
|
||||||
self._edu_type_to_instance[edu_type] = [instance_name]
|
|
||||||
|
|
||||||
def register_instances_for_edu(
|
def register_instances_for_edu(
|
||||||
self, edu_type: str, instance_names: List[str]
|
self, edu_type: str, instance_names: List[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
@ -62,8 +62,8 @@ class FollowerTypingHandler:
|
||||||
if hs.should_send_federation():
|
if hs.should_send_federation():
|
||||||
self.federation = hs.get_federation_sender()
|
self.federation = hs.get_federation_sender()
|
||||||
|
|
||||||
if hs.config.worker.writers.typing != hs.get_instance_name():
|
if hs.get_instance_name() not in hs.config.worker.writers.typing:
|
||||||
hs.get_federation_registry().register_instance_for_edu(
|
hs.get_federation_registry().register_instances_for_edu(
|
||||||
"m.typing",
|
"m.typing",
|
||||||
hs.config.worker.writers.typing,
|
hs.config.worker.writers.typing,
|
||||||
)
|
)
|
||||||
|
@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
super().__init__(hs)
|
super().__init__(hs)
|
||||||
|
|
||||||
assert hs.config.worker.writers.typing == hs.get_instance_name()
|
assert hs.get_instance_name() in hs.config.worker.writers.typing
|
||||||
|
|
||||||
self.auth = hs.get_auth()
|
self.auth = hs.get_auth()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
|
|
|
@ -138,7 +138,7 @@ class ReplicationCommandHandler:
|
||||||
if isinstance(stream, TypingStream):
|
if isinstance(stream, TypingStream):
|
||||||
# Only add TypingStream as a source on the instance in charge of
|
# Only add TypingStream as a source on the instance in charge of
|
||||||
# typing.
|
# typing.
|
||||||
if hs.config.worker.writers.typing == hs.get_instance_name():
|
if hs.get_instance_name() in hs.config.worker.writers.typing:
|
||||||
self._streams_to_replicate.append(stream)
|
self._streams_to_replicate.append(stream)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -328,8 +328,7 @@ class TypingStream(Stream):
|
||||||
ROW_TYPE = TypingStreamRow
|
ROW_TYPE = TypingStreamRow
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
writer_instance = hs.config.worker.writers.typing
|
if hs.get_instance_name() in hs.config.worker.writers.typing:
|
||||||
if writer_instance == hs.get_instance_name():
|
|
||||||
# On the writer, query the typing handler
|
# On the writer, query the typing handler
|
||||||
typing_writer_handler = hs.get_typing_writer_handler()
|
typing_writer_handler = hs.get_typing_writer_handler()
|
||||||
update_function: Callable[
|
update_function: Callable[
|
||||||
|
|
|
@ -914,7 +914,7 @@ class RoomTypingRestServlet(RestServlet):
|
||||||
# If we're not on the typing writer instance we should scream if we get
|
# If we're not on the typing writer instance we should scream if we get
|
||||||
# requests.
|
# requests.
|
||||||
self._is_typing_writer = (
|
self._is_typing_writer = (
|
||||||
hs.config.worker.writers.typing == hs.get_instance_name()
|
hs.get_instance_name() in hs.config.worker.writers.typing
|
||||||
)
|
)
|
||||||
|
|
||||||
async def on_PUT(
|
async def on_PUT(
|
||||||
|
|
|
@ -463,7 +463,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_typing_writer_handler(self) -> TypingWriterHandler:
|
def get_typing_writer_handler(self) -> TypingWriterHandler:
|
||||||
if self.config.worker.writers.typing == self.get_instance_name():
|
if self.get_instance_name() in self.config.worker.writers.typing:
|
||||||
return TypingWriterHandler(self)
|
return TypingWriterHandler(self)
|
||||||
else:
|
else:
|
||||||
raise Exception("Workers cannot write typing")
|
raise Exception("Workers cannot write typing")
|
||||||
|
@ -474,7 +474,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_typing_handler(self) -> FollowerTypingHandler:
|
def get_typing_handler(self) -> FollowerTypingHandler:
|
||||||
if self.config.worker.writers.typing == self.get_instance_name():
|
if self.get_instance_name() in self.config.worker.writers.typing:
|
||||||
# Use get_typing_writer_handler to ensure that we use the same
|
# Use get_typing_writer_handler to ensure that we use the same
|
||||||
# cached version.
|
# cached version.
|
||||||
return self.get_typing_writer_handler()
|
return self.get_typing_writer_handler()
|
||||||
|
|
Loading…
Reference in New Issue