Fix get|set_type_stream_id_for_appservice store functions (#8648)
parent
5eda018561
commit
9e0f5a0ac4
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug introduced in v1.22.0rc1 which would cause ephemeral events to not be sent to appservices.
|
|
@ -203,16 +203,16 @@ class ApplicationServicesHandler:
|
||||||
events = await self._handle_receipts(service)
|
events = await self._handle_receipts(service)
|
||||||
if events:
|
if events:
|
||||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||||
await self.store.set_type_stream_id_for_appservice(
|
await self.store.set_type_stream_id_for_appservice(
|
||||||
service, "read_receipt", new_token
|
service, "read_receipt", new_token
|
||||||
)
|
)
|
||||||
elif stream_key == "presence_key":
|
elif stream_key == "presence_key":
|
||||||
events = await self._handle_presence(service, users)
|
events = await self._handle_presence(service, users)
|
||||||
if events:
|
if events:
|
||||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||||
await self.store.set_type_stream_id_for_appservice(
|
await self.store.set_type_stream_id_for_appservice(
|
||||||
service, "presence", new_token
|
service, "presence", new_token
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _handle_typing(self, service: ApplicationService, new_token: int):
|
async def _handle_typing(self, service: ApplicationService, new_token: int):
|
||||||
typing_source = self.event_sources.sources["typing"]
|
typing_source = self.event_sources.sources["typing"]
|
||||||
|
|
|
@ -369,17 +369,25 @@ class ApplicationServiceTransactionWorkerStore(
|
||||||
async def get_type_stream_id_for_appservice(
|
async def get_type_stream_id_for_appservice(
|
||||||
self, service: ApplicationService, type: str
|
self, service: ApplicationService, type: str
|
||||||
) -> int:
|
) -> int:
|
||||||
|
if type not in ("read_receipt", "presence"):
|
||||||
|
raise ValueError(
|
||||||
|
"Expected type to be a valid application stream id type, got %s"
|
||||||
|
% (type,)
|
||||||
|
)
|
||||||
|
|
||||||
def get_type_stream_id_for_appservice_txn(txn):
|
def get_type_stream_id_for_appservice_txn(txn):
|
||||||
stream_id_type = "%s_stream_id" % type
|
stream_id_type = "%s_stream_id" % type
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"SELECT ? FROM application_services_state WHERE as_id=?",
|
# We do NOT want to escape `stream_id_type`.
|
||||||
(stream_id_type, service.id,),
|
"SELECT %s FROM application_services_state WHERE as_id=?"
|
||||||
|
% stream_id_type,
|
||||||
|
(service.id,),
|
||||||
)
|
)
|
||||||
last_txn_id = txn.fetchone()
|
last_stream_id = txn.fetchone()
|
||||||
if last_txn_id is None or last_txn_id[0] is None: # no row exists
|
if last_stream_id is None or last_stream_id[0] is None: # no row exists
|
||||||
return 0
|
return 0
|
||||||
else:
|
else:
|
||||||
return int(last_txn_id[0])
|
return int(last_stream_id[0])
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
|
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
|
||||||
|
@ -388,11 +396,18 @@ class ApplicationServiceTransactionWorkerStore(
|
||||||
async def set_type_stream_id_for_appservice(
|
async def set_type_stream_id_for_appservice(
|
||||||
self, service: ApplicationService, type: str, pos: int
|
self, service: ApplicationService, type: str, pos: int
|
||||||
) -> None:
|
) -> None:
|
||||||
|
if type not in ("read_receipt", "presence"):
|
||||||
|
raise ValueError(
|
||||||
|
"Expected type to be a valid application stream id type, got %s"
|
||||||
|
% (type,)
|
||||||
|
)
|
||||||
|
|
||||||
def set_type_stream_id_for_appservice_txn(txn):
|
def set_type_stream_id_for_appservice_txn(txn):
|
||||||
stream_id_type = "%s_stream_id" % type
|
stream_id_type = "%s_stream_id" % type
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
|
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
|
||||||
(stream_id_type, pos, service.id),
|
% stream_id_type,
|
||||||
|
(pos, service.id),
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
|
|
|
@ -410,6 +410,62 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase):
|
||||||
|
def make_homeserver(self, reactor, clock):
|
||||||
|
hs = self.setup_test_homeserver()
|
||||||
|
return hs
|
||||||
|
|
||||||
|
def prepare(self, hs, reactor, clock):
|
||||||
|
self.service = Mock(id="foo")
|
||||||
|
self.store = self.hs.get_datastore()
|
||||||
|
self.get_success(self.store.set_appservice_state(self.service, "up"))
|
||||||
|
|
||||||
|
def test_get_type_stream_id_for_appservice_no_value(self):
|
||||||
|
value = self.get_success(
|
||||||
|
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
|
||||||
|
)
|
||||||
|
self.assertEquals(value, 0)
|
||||||
|
|
||||||
|
value = self.get_success(
|
||||||
|
self.store.get_type_stream_id_for_appservice(self.service, "presence")
|
||||||
|
)
|
||||||
|
self.assertEquals(value, 0)
|
||||||
|
|
||||||
|
def test_get_type_stream_id_for_appservice_invalid_type(self):
|
||||||
|
self.get_failure(
|
||||||
|
self.store.get_type_stream_id_for_appservice(self.service, "foobar"),
|
||||||
|
ValueError,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_set_type_stream_id_for_appservice(self):
|
||||||
|
read_receipt_value = 1024
|
||||||
|
self.get_success(
|
||||||
|
self.store.set_type_stream_id_for_appservice(
|
||||||
|
self.service, "read_receipt", read_receipt_value
|
||||||
|
)
|
||||||
|
)
|
||||||
|
result = self.get_success(
|
||||||
|
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
|
||||||
|
)
|
||||||
|
self.assertEqual(result, read_receipt_value)
|
||||||
|
|
||||||
|
self.get_success(
|
||||||
|
self.store.set_type_stream_id_for_appservice(
|
||||||
|
self.service, "presence", read_receipt_value
|
||||||
|
)
|
||||||
|
)
|
||||||
|
result = self.get_success(
|
||||||
|
self.store.get_type_stream_id_for_appservice(self.service, "presence")
|
||||||
|
)
|
||||||
|
self.assertEqual(result, read_receipt_value)
|
||||||
|
|
||||||
|
def test_set_type_stream_id_for_appservice_invalid_type(self):
|
||||||
|
self.get_failure(
|
||||||
|
self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024),
|
||||||
|
ValueError,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# required for ApplicationServiceTransactionStoreTestCase tests
|
# required for ApplicationServiceTransactionStoreTestCase tests
|
||||||
class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
|
class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
|
||||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||||
|
|
Loading…
Reference in New Issue