Merge pull request #755 from matrix-org/markjh/right_direction
Fix backfill replication to advance the stream correctlypull/756/head
commit
21d188bf95
|
@ -382,7 +382,7 @@ class _Writer(object):
|
||||||
position = rows[-1][0]
|
position = rows[-1][0]
|
||||||
|
|
||||||
self.streams[name] = {
|
self.streams[name] = {
|
||||||
"position": str(position),
|
"position": position if type(position) is int else str(position),
|
||||||
"field_names": fields,
|
"field_names": fields,
|
||||||
"rows": rows,
|
"rows": rows,
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(SlavedEventStore, self).stream_positions()
|
result = super(SlavedEventStore, self).stream_positions()
|
||||||
result["events"] = self._stream_id_gen.get_current_token()
|
result["events"] = self._stream_id_gen.get_current_token()
|
||||||
result["backfill"] = self._backfill_id_gen.get_current_token()
|
result["backfill"] = -self._backfill_id_gen.get_current_token()
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def process_replication(self, result):
|
def process_replication(self, result):
|
||||||
|
@ -136,7 +136,7 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
|
|
||||||
stream = result.get("backfill")
|
stream = result.get("backfill")
|
||||||
if stream:
|
if stream:
|
||||||
self._backfill_id_gen.advance(stream["position"])
|
self._backfill_id_gen.advance(-stream["position"])
|
||||||
for row in stream["rows"]:
|
for row in stream["rows"]:
|
||||||
self._process_replication_row(
|
self._process_replication_row(
|
||||||
row, backfilled=True, state_resets=state_resets
|
row, backfilled=True, state_resets=state_resets
|
||||||
|
|
Loading…
Reference in New Issue