245 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			245 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
# Copyright 2019 New Vector Ltd
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
from http import HTTPStatus
 | 
						|
 | 
						|
from twisted.test.proto_helpers import MemoryReactor
 | 
						|
 | 
						|
from synapse.appservice import ApplicationService
 | 
						|
from synapse.rest import admin
 | 
						|
from synapse.rest.client import directory, login, room
 | 
						|
from synapse.server import HomeServer
 | 
						|
from synapse.types import RoomAlias
 | 
						|
from synapse.util import Clock
 | 
						|
from synapse.util.stringutils import random_string
 | 
						|
 | 
						|
from tests import unittest
 | 
						|
from tests.unittest import override_config
 | 
						|
 | 
						|
 | 
						|
class DirectoryTestCase(unittest.HomeserverTestCase):
 | 
						|
 | 
						|
    servlets = [
 | 
						|
        admin.register_servlets_for_client_rest_resource,
 | 
						|
        directory.register_servlets,
 | 
						|
        login.register_servlets,
 | 
						|
        room.register_servlets,
 | 
						|
    ]
 | 
						|
 | 
						|
    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
 | 
						|
        config = self.default_config()
 | 
						|
        config["require_membership_for_aliases"] = True
 | 
						|
 | 
						|
        self.hs = self.setup_test_homeserver(config=config)
 | 
						|
 | 
						|
        return self.hs
 | 
						|
 | 
						|
    def prepare(
 | 
						|
        self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
 | 
						|
    ) -> None:
 | 
						|
        """Create two local users and access tokens for them.
 | 
						|
        One of them creates a room."""
 | 
						|
        self.room_owner = self.register_user("room_owner", "test")
 | 
						|
        self.room_owner_tok = self.login("room_owner", "test")
 | 
						|
 | 
						|
        self.room_id = self.helper.create_room_as(
 | 
						|
            self.room_owner, tok=self.room_owner_tok
 | 
						|
        )
 | 
						|
 | 
						|
        self.user = self.register_user("user", "test")
 | 
						|
        self.user_tok = self.login("user", "test")
 | 
						|
 | 
						|
    def test_state_event_not_in_room(self) -> None:
 | 
						|
        self.ensure_user_left_room()
 | 
						|
        self.set_alias_via_state_event(HTTPStatus.FORBIDDEN)
 | 
						|
 | 
						|
    def test_directory_endpoint_not_in_room(self) -> None:
 | 
						|
        self.ensure_user_left_room()
 | 
						|
        self.set_alias_via_directory(HTTPStatus.FORBIDDEN)
 | 
						|
 | 
						|
    def test_state_event_in_room_too_long(self) -> None:
 | 
						|
        self.ensure_user_joined_room()
 | 
						|
        self.set_alias_via_state_event(HTTPStatus.BAD_REQUEST, alias_length=256)
 | 
						|
 | 
						|
    def test_directory_in_room_too_long(self) -> None:
 | 
						|
        self.ensure_user_joined_room()
 | 
						|
        self.set_alias_via_directory(HTTPStatus.BAD_REQUEST, alias_length=256)
 | 
						|
 | 
						|
    @override_config({"default_room_version": 5})
 | 
						|
    def test_state_event_user_in_v5_room(self) -> None:
 | 
						|
        """Test that a regular user can add alias events before room v6"""
 | 
						|
        self.ensure_user_joined_room()
 | 
						|
        self.set_alias_via_state_event(HTTPStatus.OK)
 | 
						|
 | 
						|
    @override_config({"default_room_version": 6})
 | 
						|
    def test_state_event_v6_room(self) -> None:
 | 
						|
        """Test that a regular user can *not* add alias events from room v6"""
 | 
						|
        self.ensure_user_joined_room()
 | 
						|
        self.set_alias_via_state_event(HTTPStatus.FORBIDDEN)
 | 
						|
 | 
						|
    def test_directory_in_room(self) -> None:
 | 
						|
        self.ensure_user_joined_room()
 | 
						|
        self.set_alias_via_directory(HTTPStatus.OK)
 | 
						|
 | 
						|
    def test_room_creation_too_long(self) -> None:
 | 
						|
        url = "/_matrix/client/r0/createRoom"
 | 
						|
 | 
						|
        # We use deliberately a localpart under the length threshold so
 | 
						|
        # that we can make sure that the check is done on the whole alias.
 | 
						|
        request_data = {"room_alias_name": random_string(256 - len(self.hs.hostname))}
 | 
						|
        channel = self.make_request(
 | 
						|
            "POST", url, request_data, access_token=self.user_tok
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
 | 
						|
 | 
						|
    def test_room_creation(self) -> None:
 | 
						|
        url = "/_matrix/client/r0/createRoom"
 | 
						|
 | 
						|
        # Check with an alias of allowed length. There should already be
 | 
						|
        # a test that ensures it works in test_register.py, but let's be
 | 
						|
        # as cautious as possible here.
 | 
						|
        request_data = {"room_alias_name": random_string(5)}
 | 
						|
        channel = self.make_request(
 | 
						|
            "POST", url, request_data, access_token=self.user_tok
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
 | 
						|
 | 
						|
    def test_deleting_alias_via_directory(self) -> None:
 | 
						|
        # Add an alias for the room. We must be joined to do so.
 | 
						|
        self.ensure_user_joined_room()
 | 
						|
        alias = self.set_alias_via_directory(HTTPStatus.OK)
 | 
						|
 | 
						|
        # Then try to remove the alias
 | 
						|
        channel = self.make_request(
 | 
						|
            "DELETE",
 | 
						|
            f"/_matrix/client/r0/directory/room/{alias}",
 | 
						|
            access_token=self.user_tok,
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
 | 
						|
 | 
						|
    def test_deleting_alias_via_directory_appservice(self) -> None:
 | 
						|
        user_id = "@as:test"
 | 
						|
        as_token = "i_am_an_app_service"
 | 
						|
 | 
						|
        appservice = ApplicationService(
 | 
						|
            as_token,
 | 
						|
            id="1234",
 | 
						|
            namespaces={"aliases": [{"regex": "#asns-*", "exclusive": True}]},
 | 
						|
            sender=user_id,
 | 
						|
        )
 | 
						|
        self.hs.get_datastores().main.services_cache.append(appservice)
 | 
						|
 | 
						|
        # Add an alias for the room, as the appservice
 | 
						|
        alias = RoomAlias(f"asns-{random_string(5)}", self.hs.hostname).to_string()
 | 
						|
        request_data = {"room_id": self.room_id}
 | 
						|
 | 
						|
        channel = self.make_request(
 | 
						|
            "PUT",
 | 
						|
            f"/_matrix/client/r0/directory/room/{alias}",
 | 
						|
            request_data,
 | 
						|
            access_token=as_token,
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
 | 
						|
 | 
						|
        # Then try to remove the alias, as the appservice
 | 
						|
        channel = self.make_request(
 | 
						|
            "DELETE",
 | 
						|
            f"/_matrix/client/r0/directory/room/{alias}",
 | 
						|
            access_token=as_token,
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
 | 
						|
 | 
						|
    def test_deleting_nonexistant_alias(self) -> None:
 | 
						|
        # Check that no alias exists
 | 
						|
        alias = "#potato:test"
 | 
						|
        channel = self.make_request(
 | 
						|
            "GET",
 | 
						|
            f"/_matrix/client/r0/directory/room/{alias}",
 | 
						|
            access_token=self.user_tok,
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.NOT_FOUND, channel.result)
 | 
						|
        self.assertIn("error", channel.json_body, channel.json_body)
 | 
						|
        self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND", channel.json_body)
 | 
						|
 | 
						|
        # Then try to remove the alias
 | 
						|
        channel = self.make_request(
 | 
						|
            "DELETE",
 | 
						|
            f"/_matrix/client/r0/directory/room/{alias}",
 | 
						|
            access_token=self.user_tok,
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.NOT_FOUND, channel.result)
 | 
						|
        self.assertIn("error", channel.json_body, channel.json_body)
 | 
						|
        self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND", channel.json_body)
 | 
						|
 | 
						|
    def set_alias_via_state_event(
 | 
						|
        self, expected_code: HTTPStatus, alias_length: int = 5
 | 
						|
    ) -> None:
 | 
						|
        url = "/_matrix/client/r0/rooms/%s/state/m.room.aliases/%s" % (
 | 
						|
            self.room_id,
 | 
						|
            self.hs.hostname,
 | 
						|
        )
 | 
						|
 | 
						|
        request_data = {"aliases": [self.random_alias(alias_length)]}
 | 
						|
 | 
						|
        channel = self.make_request(
 | 
						|
            "PUT", url, request_data, access_token=self.user_tok
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, expected_code, channel.result)
 | 
						|
 | 
						|
    def set_alias_via_directory(
 | 
						|
        self, expected_code: HTTPStatus, alias_length: int = 5
 | 
						|
    ) -> str:
 | 
						|
        alias = self.random_alias(alias_length)
 | 
						|
        url = "/_matrix/client/r0/directory/room/%s" % alias
 | 
						|
        request_data = {"room_id": self.room_id}
 | 
						|
 | 
						|
        channel = self.make_request(
 | 
						|
            "PUT", url, request_data, access_token=self.user_tok
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, expected_code, channel.result)
 | 
						|
        return alias
 | 
						|
 | 
						|
    def test_invalid_alias(self) -> None:
 | 
						|
        alias = "#potato"
 | 
						|
        channel = self.make_request(
 | 
						|
            "GET",
 | 
						|
            f"/_matrix/client/r0/directory/room/{alias}",
 | 
						|
            access_token=self.user_tok,
 | 
						|
        )
 | 
						|
        self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
 | 
						|
        self.assertIn("error", channel.json_body, channel.json_body)
 | 
						|
        self.assertEqual(
 | 
						|
            channel.json_body["errcode"], "M_INVALID_PARAM", channel.json_body
 | 
						|
        )
 | 
						|
 | 
						|
    def random_alias(self, length: int) -> str:
 | 
						|
        return RoomAlias(random_string(length), self.hs.hostname).to_string()
 | 
						|
 | 
						|
    def ensure_user_left_room(self) -> None:
 | 
						|
        self.ensure_membership("leave")
 | 
						|
 | 
						|
    def ensure_user_joined_room(self) -> None:
 | 
						|
        self.ensure_membership("join")
 | 
						|
 | 
						|
    def ensure_membership(self, membership: str) -> None:
 | 
						|
        try:
 | 
						|
            if membership == "leave":
 | 
						|
                self.helper.leave(room=self.room_id, user=self.user, tok=self.user_tok)
 | 
						|
            if membership == "join":
 | 
						|
                self.helper.join(room=self.room_id, user=self.user, tok=self.user_tok)
 | 
						|
        except AssertionError:
 | 
						|
            # We don't care whether the leave request didn't return a 200 (e.g.
 | 
						|
            # if the user isn't already in the room), because we only want to
 | 
						|
            # make sure the user isn't in the room.
 | 
						|
            pass
 |