Add an Admin API to temporarily grant the ability to update an existing cross-signing key without UIA (#16634)

clokep/statement-timeout
David Robertson 2023-11-15 17:28:10 +00:00 committed by GitHub
parent 999bd77d3a
commit 43d1aa75e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 613 additions and 14 deletions

1
changelog.d/16634.misc Normal file
View File

@ -0,0 +1 @@
Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA.

View File

@ -773,6 +773,43 @@ Note: The token will expire if the *admin* user calls `/logout/all` from any
of their devices, but the token will *not* expire if the target user does the of their devices, but the token will *not* expire if the target user does the
same. same.
## Allow replacing master cross-signing key without User-Interactive Auth
This endpoint is not intended for server administrator usage;
we describe it here for completeness.
This API temporarily permits a user to replace their master cross-signing key
without going through
[user-interactive authentication](https://spec.matrix.org/v1.8/client-server-api/#user-interactive-authentication-api) (UIA).
This is useful when Synapse has delegated its authentication to the
[Matrix Authentication Service](https://github.com/matrix-org/matrix-authentication-service/);
as Synapse cannot perform UIA is not possible in these circumstances.
The API is
```http request
POST /_synapse/admin/v1/users/<user_id>/_allow_cross_signing_replacement_without_uia
{}
```
If the user does not exist, or does exist but has no master cross-signing key,
this will return with status code `404 Not Found`.
Otherwise, a response body like the following is returned, with status `200 OK`:
```json
{
"updatable_without_uia_before_ms": 1234567890
}
```
The response body is a JSON object with a single field:
- `updatable_without_uia_before_ms`: integer. The timestamp in milliseconds
before which the user is permitted to replace their cross-signing key without
going through UIA.
_Added in Synapse 1.97.0._
## User devices ## User devices

View File

@ -1450,19 +1450,25 @@ class E2eKeysHandler:
return desired_key_data return desired_key_data
async def is_cross_signing_set_up_for_user(self, user_id: str) -> bool: async def check_cross_signing_setup(self, user_id: str) -> Tuple[bool, bool]:
"""Checks if the user has cross-signing set up """Checks if the user has cross-signing set up
Args: Args:
user_id: The user to check user_id: The user to check
Returns: Returns: a 2-tuple of booleans
True if the user has cross-signing set up, False otherwise - whether the user has cross-signing set up, and
- whether the user's master cross-signing key may be replaced without UIA.
""" """
existing_master_key = await self.store.get_e2e_cross_signing_key( (
user_id, "master" exists,
) ts_replacable_without_uia_before,
return existing_master_key is not None ) = await self.store.get_master_cross_signing_key_updatable_before(user_id)
if ts_replacable_without_uia_before is None:
return exists, False
else:
return exists, self.clock.time_msec() < ts_replacable_without_uia_before
def _check_cross_signing_key( def _check_cross_signing_key(

View File

@ -88,6 +88,7 @@ from synapse.rest.admin.users import (
UserByThreePid, UserByThreePid,
UserMembershipRestServlet, UserMembershipRestServlet,
UserRegisterServlet, UserRegisterServlet,
UserReplaceMasterCrossSigningKeyRestServlet,
UserRestServletV2, UserRestServletV2,
UsersRestServletV2, UsersRestServletV2,
UserTokenRestServlet, UserTokenRestServlet,
@ -292,6 +293,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListDestinationsRestServlet(hs).register(http_server) ListDestinationsRestServlet(hs).register(http_server)
RoomMessagesRestServlet(hs).register(http_server) RoomMessagesRestServlet(hs).register(http_server)
RoomTimestampToEventRestServlet(hs).register(http_server) RoomTimestampToEventRestServlet(hs).register(http_server)
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
UserByExternalId(hs).register(http_server) UserByExternalId(hs).register(http_server)
UserByThreePid(hs).register(http_server) UserByThreePid(hs).register(http_server)

View File

@ -1270,6 +1270,46 @@ class AccountDataRestServlet(RestServlet):
} }
class UserReplaceMasterCrossSigningKeyRestServlet(RestServlet):
"""Allow a given user to replace their master cross-signing key without UIA.
This replacement is permitted for a limited period (currently 10 minutes).
While this is exposed via the admin API, this is intended for use by the
Matrix Authentication Service rather than server admins.
"""
PATTERNS = admin_patterns(
"/users/(?P<user_id>[^/]*)/_allow_cross_signing_replacement_without_uia"
)
REPLACEMENT_PERIOD_MS = 10 * 60 * 1000 # 10 minutes
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
async def on_POST(
self,
request: SynapseRequest,
user_id: str,
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
if user_id is None:
raise NotFoundError("User not found")
timestamp = (
await self._store.allow_master_cross_signing_key_replacement_without_uia(
user_id, self.REPLACEMENT_PERIOD_MS
)
)
if timestamp is None:
raise NotFoundError("User has no master cross-signing key")
return HTTPStatus.OK, {"updatable_without_uia_before_ms": timestamp}
class UserByExternalId(RestServlet): class UserByExternalId(RestServlet):
"""Find a user based on an external ID from an auth provider""" """Find a user based on an external ID from an auth provider"""

View File

@ -376,9 +376,10 @@ class SigningKeyUploadServlet(RestServlet):
user_id = requester.user.to_string() user_id = requester.user.to_string()
body = parse_json_object_from_request(request) body = parse_json_object_from_request(request)
is_cross_signing_setup = ( (
await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id) is_cross_signing_setup,
) master_key_updatable_without_uia,
) = await self.e2e_keys_handler.check_cross_signing_setup(user_id)
# Before MSC3967 we required UIA both when setting up cross signing for the # Before MSC3967 we required UIA both when setting up cross signing for the
# first time and when resetting the device signing key. With MSC3967 we only # first time and when resetting the device signing key. With MSC3967 we only
@ -386,9 +387,14 @@ class SigningKeyUploadServlet(RestServlet):
# time. Because there is no UIA in MSC3861, for now we throw an error if the # time. Because there is no UIA in MSC3861, for now we throw an error if the
# user tries to reset the device signing key when MSC3861 is enabled, but allow # user tries to reset the device signing key when MSC3861 is enabled, but allow
# first-time setup. # first-time setup.
#
# XXX: We now have a get-out clause by which MAS can temporarily mark the master
# key as replaceable. It should do its own equivalent of user interactive auth
# before doing so.
if self.hs.config.experimental.msc3861.enabled: if self.hs.config.experimental.msc3861.enabled:
# There is no way to reset the device signing key with MSC3861 # The auth service has to explicitly mark the master key as replaceable
if is_cross_signing_setup: # without UIA to reset the device signing key with MSC3861.
if is_cross_signing_setup and not master_key_updatable_without_uia:
raise SynapseError( raise SynapseError(
HTTPStatus.NOT_IMPLEMENTED, HTTPStatus.NOT_IMPLEMENTED,
"Resetting cross signing keys is not yet supported with MSC3861", "Resetting cross signing keys is not yet supported with MSC3861",

View File

@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
return otk_rows return otk_rows
async def get_master_cross_signing_key_updatable_before(
self, user_id: str
) -> Tuple[bool, Optional[int]]:
"""Get time before which a master cross-signing key may be replaced without UIA.
(UIA means "User-Interactive Auth".)
There are three cases to distinguish:
(1) No master cross-signing key.
(2) The key exists, but there is no replace-without-UI timestamp in the DB.
(3) The key exists, and has such a timestamp recorded.
Returns: a 2-tuple of:
- a boolean: is there a master cross-signing key already?
- an optional timestamp, directly taken from the DB.
In terms of the cases above, these are:
(1) (False, None).
(2) (True, None).
(3) (True, <timestamp in ms>).
"""
def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]:
# We want to distinguish between three cases:
txn.execute(
"""
SELECT updatable_without_uia_before_ms
FROM e2e_cross_signing_keys
WHERE user_id = ? AND keytype = 'master'
ORDER BY stream_id DESC
LIMIT 1
""",
(user_id,),
)
row = cast(Optional[Tuple[Optional[int]]], txn.fetchone())
if row is None:
return False, None
return True, row[0]
return await self.db_pool.runInteraction(
"e2e_cross_signing_keys",
impl,
)
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def __init__( def __init__(
@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
], ],
desc="add_e2e_signing_key", desc="add_e2e_signing_key",
) )
async def allow_master_cross_signing_key_replacement_without_uia(
self, user_id: str, duration_ms: int
) -> Optional[int]:
"""Mark this user's latest master key as being replaceable without UIA.
Said replacement will only be permitted for a short time after calling this
function. That time period is controlled by the duration argument.
Returns:
None, if there is no such key.
Otherwise, the timestamp before which replacement is allowed without UIA.
"""
timestamp = self._clock.time_msec() + duration_ms
def impl(txn: LoggingTransaction) -> Optional[int]:
txn.execute(
"""
UPDATE e2e_cross_signing_keys
SET updatable_without_uia_before_ms = ?
WHERE stream_id = (
SELECT stream_id
FROM e2e_cross_signing_keys
WHERE user_id = ? AND keytype = 'master'
ORDER BY stream_id DESC
LIMIT 1
)
""",
(timestamp, user_id),
)
if txn.rowcount == 0:
return None
return timestamp
return await self.db_pool.runInteraction(
"allow_master_cross_signing_key_replacement_without_uia",
impl,
)

View File

@ -0,0 +1,15 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL;

View File

@ -1602,3 +1602,50 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
} }
}, },
) )
def test_check_cross_signing_setup(self) -> None:
# First check what happens with no master key.
alice = "@alice:test"
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, False)
self.assertIs(replaceable_without_uia, False)
# Upload a master key but don't specify a replacement timestamp.
dummy_key = {"keys": {"a": "b"}}
self.get_success(
self.store.set_e2e_cross_signing_key("@alice:test", "master", dummy_key)
)
# Should now find the key exists.
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, True)
self.assertIs(replaceable_without_uia, False)
# Set an expiry timestamp in the future.
self.get_success(
self.store.allow_master_cross_signing_key_replacement_without_uia(
alice,
1000,
)
)
# Should now be allowed to replace the key without UIA.
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, True)
self.assertIs(replaceable_without_uia, True)
# Wait 2 seconds, so that the timestamp is in the past.
self.reactor.advance(2.0)
# Should no longer be allowed to replace the key without UIA.
exists, replaceable_without_uia = self.get_success(
self.handler.check_cross_signing_setup(alice)
)
self.assertIs(exists, True)
self.assertIs(replaceable_without_uia, False)

View File

@ -4854,3 +4854,59 @@ class UsersByThreePidTestCase(unittest.HomeserverTestCase):
{"user_id": self.other_user}, {"user_id": self.other_user},
channel.json_body, channel.json_body,
) )
class AllowCrossSigningReplacementTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
]
@staticmethod
def url(user: str) -> str:
template = (
"/_synapse/admin/v1/users/{}/_allow_cross_signing_replacement_without_uia"
)
return template.format(urllib.parse.quote(user))
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
def test_error_cases(self) -> None:
fake_user = "@bums:other"
channel = self.make_request(
"POST", self.url(fake_user), access_token=self.admin_user_tok
)
# Fail: user doesn't exist
self.assertEqual(404, channel.code, msg=channel.json_body)
channel = self.make_request(
"POST", self.url(self.other_user), access_token=self.admin_user_tok
)
# Fail: user exists, but has no master cross-signing key
self.assertEqual(404, channel.code, msg=channel.json_body)
def test_success(self) -> None:
# Upload a master key.
dummy_key = {"keys": {"a": "b"}}
self.get_success(
self.store.set_e2e_cross_signing_key(self.other_user, "master", dummy_key)
)
channel = self.make_request(
"POST", self.url(self.other_user), access_token=self.admin_user_tok
)
# Success!
self.assertEqual(200, channel.code, msg=channel.json_body)
# Should now find that the key exists.
_, timestamp = self.get_success(
self.store.get_master_cross_signing_key_updatable_before(self.other_user)
)
assert timestamp is not None
self.assertGreater(timestamp, self.clock.time_msec())

View File

@ -11,8 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License # limitations under the License
import urllib.parse
from http import HTTPStatus from http import HTTPStatus
from unittest.mock import patch
from signedjson.key import ( from signedjson.key import (
encode_verify_key_base64, encode_verify_key_base64,
@ -24,12 +25,19 @@ from signedjson.sign import sign_json
from synapse.api.errors import Codes from synapse.api.errors import Codes
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import keys, login from synapse.rest.client import keys, login
from synapse.types import JsonDict from synapse.types import JsonDict, Requester, create_requester
from tests import unittest from tests import unittest
from tests.http.server._base import make_request_with_cancellation_test from tests.http.server._base import make_request_with_cancellation_test
from tests.unittest import override_config from tests.unittest import override_config
try:
import authlib # noqa: F401
HAS_AUTHLIB = True
except ImportError:
HAS_AUTHLIB = False
class KeyQueryTestCase(unittest.HomeserverTestCase): class KeyQueryTestCase(unittest.HomeserverTestCase):
servlets = [ servlets = [
@ -259,3 +267,179 @@ class KeyQueryTestCase(unittest.HomeserverTestCase):
alice_token, alice_token,
) )
self.assertEqual(channel.code, HTTPStatus.OK, channel.result) self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
class SigningKeyUploadServletTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
keys.register_servlets,
]
OIDC_ADMIN_TOKEN = "_oidc_admin_token"
@unittest.skip_unless(HAS_AUTHLIB, "requires authlib")
@override_config(
{
"enable_registration": False,
"experimental_features": {
"msc3861": {
"enabled": True,
"issuer": "https://issuer",
"account_management_url": "https://my-account.issuer",
"client_id": "id",
"client_auth_method": "client_secret_post",
"client_secret": "secret",
"admin_token": OIDC_ADMIN_TOKEN,
},
},
}
)
def test_master_cross_signing_key_replacement_msc3861(self) -> None:
# Provision a user like MAS would, cribbing from
# https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L224-L229
alice = "@alice:test"
channel = self.make_request(
"PUT",
f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}",
access_token=self.OIDC_ADMIN_TOKEN,
content={},
)
self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
# Provision a device like MAS would, cribbing from
# https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L260-L262
alice_device = "alice_device"
channel = self.make_request(
"POST",
f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}/devices",
access_token=self.OIDC_ADMIN_TOKEN,
content={"device_id": alice_device},
)
self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
# Prepare a mock MAS access token.
alice_token = "alice_token_1234_oidcwhatyoudidthere"
async def mocked_get_user_by_access_token(
token: str, allow_expired: bool = False
) -> Requester:
self.assertEqual(token, alice_token)
return create_requester(
user_id=alice,
device_id=alice_device,
scope=[],
is_guest=False,
)
patch_get_user_by_access_token = patch.object(
self.hs.get_auth(),
"get_user_by_access_token",
wraps=mocked_get_user_by_access_token,
)
# Copied from E2eKeysHandlerTestCase
master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
master_pubkey2 = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM"
master_pubkey3 = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
master_key: JsonDict = {
"user_id": alice,
"usage": ["master"],
"keys": {"ed25519:" + master_pubkey: master_pubkey},
}
master_key2: JsonDict = {
"user_id": alice,
"usage": ["master"],
"keys": {"ed25519:" + master_pubkey2: master_pubkey2},
}
master_key3: JsonDict = {
"user_id": alice,
"usage": ["master"],
"keys": {"ed25519:" + master_pubkey3: master_pubkey3},
}
with patch_get_user_by_access_token:
# Upload an initial cross-signing key.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key,
},
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
# Should not be able to upload another master key.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key2,
},
)
self.assertEqual(
channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
)
# Pretend that MAS did UIA and allowed us to replace the master key.
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
access_token=self.OIDC_ADMIN_TOKEN,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
with patch_get_user_by_access_token:
# Should now be able to upload master key2.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key2,
},
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
# Even though we're still in the grace period, we shouldn't be able to
# upload master key 3 immediately after uploading key 2.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key3,
},
)
self.assertEqual(
channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
)
# Pretend that MAS did UIA and allowed us to replace the master key.
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
access_token=self.OIDC_ADMIN_TOKEN,
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
timestamp_ms = channel.json_body["updatable_without_uia_before_ms"]
# Advance to 1 second after the replacement period ends.
self.reactor.advance(timestamp_ms - self.clock.time_msec() + 1000)
with patch_get_user_by_access_token:
# We should not be able to upload master key3 because the replacement has
# expired.
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/device_signing/upload",
access_token=alice_token,
content={
"master_key": master_key3,
},
)
self.assertEqual(
channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
)

View File

@ -0,0 +1,121 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional, Tuple
from twisted.test.proto_helpers import MemoryReactor
from synapse.server import HomeServer
from synapse.storage._base import db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
class EndToEndKeyWorkerStoreTestCase(HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
def test_get_master_cross_signing_key_updatable_before(self) -> None:
# Should return False, None when there is no master key.
alice = "@alice:test"
exists, timestamp = self.get_success(
self.store.get_master_cross_signing_key_updatable_before(alice)
)
self.assertIs(exists, False)
self.assertIsNone(timestamp)
# Upload a master key.
dummy_key = {"keys": {"a": "b"}}
self.get_success(
self.store.set_e2e_cross_signing_key(alice, "master", dummy_key)
)
# Should now find that the key exists.
exists, timestamp = self.get_success(
self.store.get_master_cross_signing_key_updatable_before(alice)
)
self.assertIs(exists, True)
self.assertIsNone(timestamp)
# Write an updateable_before timestamp.
written_timestamp = self.get_success(
self.store.allow_master_cross_signing_key_replacement_without_uia(
alice, 1000
)
)
# Should now find that the key exists.
exists, timestamp = self.get_success(
self.store.get_master_cross_signing_key_updatable_before(alice)
)
self.assertIs(exists, True)
self.assertEqual(timestamp, written_timestamp)
def test_master_replacement_only_applies_to_latest_master_key(
self,
) -> None:
"""We shouldn't allow updates w/o UIA to old master keys or other key types."""
alice = "@alice:test"
# Upload two master keys.
key1 = {"keys": {"a": "b"}}
key2 = {"keys": {"c": "d"}}
key3 = {"keys": {"e": "f"}}
self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key1))
self.get_success(self.store.set_e2e_cross_signing_key(alice, "other", key2))
self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key3))
# Third key should be the current one.
key = self.get_success(
self.store.get_e2e_cross_signing_key(alice, "master", alice)
)
self.assertEqual(key, key3)
timestamp = self.get_success(
self.store.allow_master_cross_signing_key_replacement_without_uia(
alice, 1000
)
)
assert timestamp is not None
def check_timestamp_column(
txn: LoggingTransaction,
) -> List[Tuple[JsonDict, Optional[int]]]:
"""Fetch all rows for Alice's keys."""
txn.execute(
"""
SELECT keydata, updatable_without_uia_before_ms
FROM e2e_cross_signing_keys
WHERE user_id = ?
ORDER BY stream_id ASC;
""",
(alice,),
)
return [(db_to_json(keydata), ts) for keydata, ts in txn.fetchall()]
values = self.get_success(
self.store.db_pool.runInteraction(
"check_timestamp_column",
check_timestamp_column,
)
)
self.assertEqual(
values,
[
(key1, None),
(key2, None),
(key3, timestamp),
],
)