Enable moving event persistence off of master
parent
a447c2eb0c
commit
1f6dbc3cd4
|
@ -38,7 +38,9 @@ from synapse.replication.tcp.commands import (
|
||||||
from synapse.replication.tcp.protocol import AbstractConnection
|
from synapse.replication.tcp.protocol import AbstractConnection
|
||||||
from synapse.replication.tcp.streams import (
|
from synapse.replication.tcp.streams import (
|
||||||
STREAMS_MAP,
|
STREAMS_MAP,
|
||||||
|
BackfillStream,
|
||||||
CachesStream,
|
CachesStream,
|
||||||
|
EventsStream,
|
||||||
FederationStream,
|
FederationStream,
|
||||||
Stream,
|
Stream,
|
||||||
)
|
)
|
||||||
|
@ -87,6 +89,12 @@ class ReplicationCommandHandler:
|
||||||
self._streams_to_replicate.append(stream)
|
self._streams_to_replicate.append(stream)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if (
|
||||||
|
isinstance(stream, (EventsStream, BackfillStream))
|
||||||
|
and hs.config.worker.writers.events == hs.get_instance_name()
|
||||||
|
):
|
||||||
|
self._streams_to_replicate.append(stream)
|
||||||
|
|
||||||
# Only add any other streams if we're on master.
|
# Only add any other streams if we're on master.
|
||||||
if hs.config.worker_app is not None:
|
if hs.config.worker_app is not None:
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -66,9 +66,9 @@ class DataStores(object):
|
||||||
|
|
||||||
self.main = main_store_class(database, db_conn, hs)
|
self.main = main_store_class(database, db_conn, hs)
|
||||||
|
|
||||||
# If we're on a process that can persist events (currently
|
# If we're on a process that can persist events also
|
||||||
# master), also instantiate a `PersistEventsStore`
|
# instansiate a `PersistEventsStore`
|
||||||
if hs.config.worker.worker_app is None:
|
if hs.config.worker.writers.events == hs.get_instance_name():
|
||||||
self.persist_events = PersistEventsStore(
|
self.persist_events = PersistEventsStore(
|
||||||
hs, database, self.main
|
hs, database, self.main
|
||||||
)
|
)
|
||||||
|
|
|
@ -138,10 +138,10 @@ class PersistEventsStore:
|
||||||
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
|
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
|
||||||
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
|
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
|
||||||
|
|
||||||
# This should only exist on master for now
|
# This should only exist on instances that are configured to write
|
||||||
assert (
|
assert (
|
||||||
hs.config.worker.worker_app is None
|
hs.config.worker.writers.events == hs.get_instance_name()
|
||||||
), "Can only instantiate PersistEventsStore on master"
|
), "Can only instantiate EventsStore on master"
|
||||||
|
|
||||||
@_retry_on_integrity_error
|
@_retry_on_integrity_error
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
Loading…
Reference in New Issue