Add linearizer to protect stream catchup
parent
90bd1708b5
commit
6ac1ecad95
|
@ -35,6 +35,7 @@ from synapse.replication.tcp.commands import (
|
||||||
UserSyncCommand,
|
UserSyncCommand,
|
||||||
)
|
)
|
||||||
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
|
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
|
||||||
|
from synapse.util.async_helpers import Linearizer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -60,6 +61,8 @@ class ReplicationCommandHandler:
|
||||||
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
|
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
|
||||||
} # type: Dict[str, Stream]
|
} # type: Dict[str, Stream]
|
||||||
|
|
||||||
|
self._position_linearizer = Linearizer("replication_position")
|
||||||
|
|
||||||
# Map of stream to batched updates. See RdataCommand for info on how
|
# Map of stream to batched updates. See RdataCommand for info on how
|
||||||
# batching works.
|
# batching works.
|
||||||
self.pending_batches = {} # type: Dict[str, List[Any]]
|
self.pending_batches = {} # type: Dict[str, List[Any]]
|
||||||
|
@ -122,31 +125,35 @@ class ReplicationCommandHandler:
|
||||||
# to stop RDATA being handled at the same time.
|
# to stop RDATA being handled at the same time.
|
||||||
self.streams_connecting.add(cmd.stream_name)
|
self.streams_connecting.add(cmd.stream_name)
|
||||||
|
|
||||||
# Find where we previously streamed up to.
|
# We protect catching up with a linearizer in case the replicaiton
|
||||||
current_token = self.replication_data_handler.get_streams_to_replicate().get(
|
# connection reconnects under us.
|
||||||
cmd.stream_name
|
with await self._position_linearizer.queue(cmd.stream_name):
|
||||||
)
|
# Find where we previously streamed up to.
|
||||||
if current_token is None:
|
current_token = self.replication_data_handler.get_streams_to_replicate().get(
|
||||||
logger.warning(
|
cmd.stream_name
|
||||||
"Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
|
|
||||||
)
|
)
|
||||||
return
|
if current_token is None:
|
||||||
|
logger.warning(
|
||||||
# Fetch all updates between then and now.
|
"Got POSITION for stream we're not subscribed to: %s",
|
||||||
limited = True
|
|
||||||
while limited:
|
|
||||||
updates, current_token, limited = await stream.get_updates_since(
|
|
||||||
current_token, cmd.token
|
|
||||||
)
|
|
||||||
if updates:
|
|
||||||
await self.on_rdata(
|
|
||||||
cmd.stream_name,
|
cmd.stream_name,
|
||||||
current_token,
|
|
||||||
[stream.parse_row(update[1]) for update in updates],
|
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
# We've now caught up to position sent to us, notify handler.
|
# Fetch all updates between then and now.
|
||||||
await self.replication_data_handler.on_position(cmd.stream_name, cmd.token)
|
limited = True
|
||||||
|
while limited:
|
||||||
|
updates, current_token, limited = await stream.get_updates_since(
|
||||||
|
current_token, cmd.token
|
||||||
|
)
|
||||||
|
if updates:
|
||||||
|
await self.on_rdata(
|
||||||
|
cmd.stream_name,
|
||||||
|
current_token,
|
||||||
|
[stream.parse_row(update[1]) for update in updates],
|
||||||
|
)
|
||||||
|
|
||||||
|
# We've now caught up to position sent to us, notify handler.
|
||||||
|
await self.replication_data_handler.on_position(cmd.stream_name, cmd.token)
|
||||||
|
|
||||||
self.streams_connecting.discard(cmd.stream_name)
|
self.streams_connecting.discard(cmd.stream_name)
|
||||||
|
|
||||||
|
|
|
@ -15,9 +15,11 @@
|
||||||
|
|
||||||
from mock import Mock, NonCallableMock
|
from mock import Mock, NonCallableMock
|
||||||
|
|
||||||
from synapse.replication.tcp.client import ReplicationClientFactory
|
from synapse.replication.tcp.client import (
|
||||||
|
ReplicationClientFactory,
|
||||||
|
ReplicationDataHandler,
|
||||||
|
)
|
||||||
from synapse.replication.tcp.handler import ReplicationCommandHandler
|
from synapse.replication.tcp.handler import ReplicationCommandHandler
|
||||||
from synapse.replication.tcp.client import ReplicationDataHandler
|
|
||||||
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
||||||
from synapse.storage.database import make_conn
|
from synapse.storage.database import make_conn
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue