Compare commits

...

2 Commits

Author SHA1 Message Date
Erik Johnston 49323eecd2 Actually test that event persistence is sharded 2020-10-01 20:09:36 +01:00
Erik Johnston 8c78cdfc1b Update tests/replication/_base.py
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
2020-10-01 18:59:21 +01:00
2 changed files with 39 additions and 12 deletions

View File

@ -224,7 +224,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
# manually have to go and explicitly set it up each time (plus sometimes
# it is impossible to write the handling explicitly in the tests).
#
# This sets registers the master replication listener:
# Register the master replication listener:
self.reactor.add_tcp_client_callback(
"1.2.3.4",
8765,
@ -647,5 +647,5 @@ class FakeRedisPubSubProtocol(Protocol):
raise Exception("Unrecognized type for encoding redis: %r: %r", type(obj), obj)
def connectionList(self, reason):
def connectionLost(self, reason):
self._server.remove_subscriber(self)

View File

@ -18,6 +18,7 @@ from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import USE_POSTGRES_FOR_TESTS
logger = logging.getLogger(__name__)
@ -26,6 +27,11 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
"""Checks event persisting sharding works
"""
# Event persister sharding requires postgres (due to needing
# `MutliWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
@ -48,8 +54,8 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
return conf
def test_basic(self):
"""Simple test that rooms can be created and joined when there are
multiple event persisters.
"""Simple test to ensure that multiple rooms can be created and joined,
and that different rooms get handled by different instances.
"""
self.make_worker_hs(
@ -60,16 +66,37 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
"synapse.app.generic_worker", {"worker_name": "worker2"},
)
persisted_on_1 = False
persisted_on_2 = False
store = self.hs.get_datastore()
user_id = self.register_user("user", "pass")
access_token = self.login("user", "pass")
# Create a room
room = self.helper.create_room_as(user_id, tok=access_token)
# Keep making new rooms until we see rooms being persisted on both
# workers.
for _ in range(10):
# Create a room
room = self.helper.create_room_as(user_id, tok=access_token)
# The other user joins
self.helper.join(
room=room, user=self.other_user_id, tok=self.other_access_token
)
# The other user joins
self.helper.join(
room=room, user=self.other_user_id, tok=self.other_access_token
)
# The other user sends some messages
self.helper.send(room, body="Hi!", tok=self.other_access_token)
# The other user sends some messages
rseponse = self.helper.send(room, body="Hi!", tok=self.other_access_token)
event_id = rseponse["event_id"]
# The event position includes which instance persisted the event.
pos = self.get_success(store.get_position_for_event(event_id))
persisted_on_1 |= pos.instance_name == "worker1"
persisted_on_2 |= pos.instance_name == "worker2"
if persisted_on_1 and persisted_on_2:
break
self.assertTrue(persisted_on_1)
self.assertTrue(persisted_on_2)