From 437a99fb99f42e64a0488ffe0a394cbc60921254 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Oct 2020 13:16:32 +0000 Subject: [PATCH 1/3] Fix user_daily_visits to not have duplicate rows for UA. (#8654) * Fix user_daily_visits to not have duplicate rows for UA. Fixes #8641. * Newsfile * Fix typo. Co-authored-by: Patrick Cloke --- changelog.d/8654.bugfix | 1 + synapse/storage/databases/main/metrics.py | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) create mode 100644 changelog.d/8654.bugfix diff --git a/changelog.d/8654.bugfix b/changelog.d/8654.bugfix new file mode 100644 index 0000000000..91d3265b7f --- /dev/null +++ b/changelog.d/8654.bugfix @@ -0,0 +1 @@ +Fix `user_daily_visits` to not have duplicate rows for UA. Broke in v1.22.0rc1. diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 79b01d16f9..ab18cc4d79 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -282,9 +282,10 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): now = self._clock.time_msec() # A note on user_agent. Technically a given device can have multiple - # user agents, so we need to decide which one to pick. We could have handled this - # in number of ways, but given that we don't _that_ much have gone for MAX() - # For more details of the other options considered see + # user agents, so we need to decide which one to pick. We could have + # handled this in number of ways, but given that we don't care + # _that_ much we have gone for MAX(). For more details of the other + # options considered see # https://github.com/matrix-org/synapse/pull/8503#discussion_r502306111 sql = """ INSERT INTO user_daily_visits (user_id, device_id, timestamp, user_agent) @@ -299,7 +300,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): WHERE last_seen > ? AND last_seen <= ? AND udv.timestamp IS NULL AND users.is_guest=0 AND users.appservice_id IS NULL - GROUP BY u.user_id, u.device_id, u.user_agent + GROUP BY u.user_id, u.device_id """ # This means that the day has rolled over but there could still From 5eda0185612be2e2c15eaba0a607442febd4a5a8 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 26 Oct 2020 09:19:07 -0400 Subject: [PATCH 2/3] Properly handle presence events for application services. (#8656) --- changelog.d/8656.bugfix | 1 + synapse/handlers/appservice.py | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) create mode 100644 changelog.d/8656.bugfix diff --git a/changelog.d/8656.bugfix b/changelog.d/8656.bugfix new file mode 100644 index 0000000000..d6415e8282 --- /dev/null +++ b/changelog.d/8656.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.22.0rc1 where presence events were not properly passed to application services. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 07240d3a14..fe8cfc9b18 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -238,7 +238,7 @@ class ApplicationServicesHandler: async def _handle_presence( self, service: ApplicationService, users: Collection[UserID] - ): + ) -> List[JsonDict]: events = [] # type: List[JsonDict] presence_source = self.event_sources.sources["presence"] from_key = await self.store.get_type_stream_id_for_appservice( @@ -252,7 +252,7 @@ class ApplicationServicesHandler: user=user, service=service, from_key=from_key, ) time_now = self.clock.time_msec() - presence_events = [ + events.extend( { "type": "m.presence", "sender": event.user_id, @@ -261,8 +261,9 @@ class ApplicationServicesHandler: ), } for event in presence_events - ] - events = events + presence_events + ) + + return events async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. From 9e0f5a0ac44678cd8a1dcb82ba7e116f8b554c4d Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 26 Oct 2020 14:51:33 +0000 Subject: [PATCH 3/3] Fix get|set_type_stream_id_for_appservice store functions (#8648) --- changelog.d/8648.bugfix | 1 + synapse/handlers/appservice.py | 12 ++--- synapse/storage/databases/main/appservice.py | 29 +++++++--- tests/storage/test_appservice.py | 56 ++++++++++++++++++++ 4 files changed, 85 insertions(+), 13 deletions(-) create mode 100644 changelog.d/8648.bugfix diff --git a/changelog.d/8648.bugfix b/changelog.d/8648.bugfix new file mode 100644 index 0000000000..aa71ad0ff2 --- /dev/null +++ b/changelog.d/8648.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.22.0rc1 which would cause ephemeral events to not be sent to appservices. \ No newline at end of file diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fe8cfc9b18..64dea23fc5 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -203,16 +203,16 @@ class ApplicationServicesHandler: events = await self._handle_receipts(service) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token - ) + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) elif stream_key == "presence_key": events = await self._handle_presence(service, users) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing(self, service: ApplicationService, new_token: int): typing_source = self.event_sources.sources["typing"] diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 43bf0f649a..637a938bac 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -369,17 +369,25 @@ class ApplicationServiceTransactionWorkerStore( async def get_type_stream_id_for_appservice( self, service: ApplicationService, type: str ) -> int: + if type not in ("read_receipt", "presence"): + raise ValueError( + "Expected type to be a valid application stream id type, got %s" + % (type,) + ) + def get_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type txn.execute( - "SELECT ? FROM application_services_state WHERE as_id=?", - (stream_id_type, service.id,), + # We do NOT want to escape `stream_id_type`. + "SELECT %s FROM application_services_state WHERE as_id=?" + % stream_id_type, + (service.id,), ) - last_txn_id = txn.fetchone() - if last_txn_id is None or last_txn_id[0] is None: # no row exists + last_stream_id = txn.fetchone() + if last_stream_id is None or last_stream_id[0] is None: # no row exists return 0 else: - return int(last_txn_id[0]) + return int(last_stream_id[0]) return await self.db_pool.runInteraction( "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn @@ -388,11 +396,18 @@ class ApplicationServiceTransactionWorkerStore( async def set_type_stream_id_for_appservice( self, service: ApplicationService, type: str, pos: int ) -> None: + if type not in ("read_receipt", "presence"): + raise ValueError( + "Expected type to be a valid application stream id type, got %s" + % (type,) + ) + def set_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type txn.execute( - "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?", - (stream_id_type, pos, service.id), + "UPDATE application_services_state SET %s = ? WHERE as_id=?" + % stream_id_type, + (pos, service.id), ) await self.db_pool.runInteraction( diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index c5c7987349..1ce29af5fd 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -410,6 +410,62 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): ) +class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver() + return hs + + def prepare(self, hs, reactor, clock): + self.service = Mock(id="foo") + self.store = self.hs.get_datastore() + self.get_success(self.store.set_appservice_state(self.service, "up")) + + def test_get_type_stream_id_for_appservice_no_value(self): + value = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "read_receipt") + ) + self.assertEquals(value, 0) + + value = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "presence") + ) + self.assertEquals(value, 0) + + def test_get_type_stream_id_for_appservice_invalid_type(self): + self.get_failure( + self.store.get_type_stream_id_for_appservice(self.service, "foobar"), + ValueError, + ) + + def test_set_type_stream_id_for_appservice(self): + read_receipt_value = 1024 + self.get_success( + self.store.set_type_stream_id_for_appservice( + self.service, "read_receipt", read_receipt_value + ) + ) + result = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "read_receipt") + ) + self.assertEqual(result, read_receipt_value) + + self.get_success( + self.store.set_type_stream_id_for_appservice( + self.service, "presence", read_receipt_value + ) + ) + result = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "presence") + ) + self.assertEqual(result, read_receipt_value) + + def test_set_type_stream_id_for_appservice_invalid_type(self): + self.get_failure( + self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024), + ValueError, + ) + + # required for ApplicationServiceTransactionStoreTestCase tests class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore): def __init__(self, database: DatabasePool, db_conn, hs):