Merge branch 'develop' of github.com:matrix-org/synapse into develop

pull/349/head
Daniel Wagner-Hall 2015-11-04 17:31:01 +00:00
commit 4e62ffdb21
12 changed files with 7 additions and 231 deletions

View File

@ -111,17 +111,6 @@ class EventStreamHandler(BaseHandler):
if affect_presence:
yield self.started_stream(auth_user)
rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
auth_user.to_string()
)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
room_ids = set(r.room_id for r in rooms)
else:
room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user)
if timeout:
# If they've set a timeout set a minimum limit.
timeout = max(timeout, 500)
@ -131,7 +120,7 @@ class EventStreamHandler(BaseHandler):
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout,
auth_user, pagin_config, timeout,
only_room_events=only_room_events
)

View File

@ -72,8 +72,6 @@ class FederationHandler(BaseHandler):
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.lock_manager = hs.get_room_lock_manager()
self.replication_layer.set_handler(self)
# When joining a room we need to queue any events for that room up

View File

@ -827,7 +827,6 @@ class RoomEventSource(object):
user_id=user.to_string(),
from_key=from_key,
to_key=to_key,
room_id=None,
limit=limit,
)

View File

@ -143,21 +143,8 @@ class SyncHandler(BaseHandler):
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
sync_config.user.to_string()
)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
room_ids = set(r.room_id for r in rooms)
else:
room_ids = yield rm_handler.get_joined_rooms_for_user(
sync_config.user
)
result = yield self.notifier.wait_for_events(
sync_config.user, room_ids, timeout, current_sync_callback,
sync_config.user, timeout, current_sync_callback,
from_token=since_token
)
defer.returnValue(result)
@ -403,7 +390,6 @@ class SyncHandler(BaseHandler):
sync_config.user.to_string(),
from_key=since_token.room_key,
to_key=now_token.room_key,
room_id=None,
limit=timeline_limit + 1,
)

View File

@ -269,7 +269,7 @@ class Notifier(object):
logger.exception("Failed to notify listener")
@defer.inlineCallbacks
def wait_for_events(self, user, rooms, timeout, callback,
def wait_for_events(self, user, timeout, callback,
from_token=StreamToken("s0", "0", "0", "0", "0")):
"""Wait until the callback returns a non empty response or the
timeout fires.
@ -328,7 +328,7 @@ class Notifier(object):
defer.returnValue(result)
@defer.inlineCallbacks
def get_events_for(self, user, rooms, pagination_config, timeout,
def get_events_for(self, user, pagination_config, timeout,
only_room_events=False):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
@ -369,7 +369,7 @@ class Notifier(object):
defer.returnValue(None)
result = yield self.wait_for_events(
user, rooms, timeout, check_for_updates, from_token=from_token
user, timeout, check_for_updates, from_token=from_token
)
if result is None:

View File

@ -29,7 +29,6 @@ from synapse.state import StateHandler
from synapse.storage import DataStore
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.lockutils import LockManager
from synapse.streams.events import EventSources
from synapse.api.ratelimiting import Ratelimiter
from synapse.crypto.keyring import Keyring
@ -70,7 +69,6 @@ class BaseHomeServer(object):
'auth',
'rest_servlet_factory',
'state_handler',
'room_lock_manager',
'notifier',
'distributor',
'resource_for_client',
@ -201,9 +199,6 @@ class HomeServer(BaseHomeServer):
def build_state_handler(self):
return StateHandler(self)
def build_room_lock_manager(self):
return LockManager()
def build_distributor(self):
return Distributor()

View File

@ -158,8 +158,7 @@ class StreamStore(SQLBaseStore):
defer.returnValue(results)
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0):
def get_room_events_stream(self, user_id, from_key, to_key, limit=0):
current_room_membership_sql = (
"SELECT m.room_id FROM room_memberships as m "
" INNER JOIN current_state_events as c"

View File

@ -1,74 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import logging
logger = logging.getLogger(__name__)
class Lock(object):
def __init__(self, deferred, key):
self._deferred = deferred
self.released = False
self.key = key
def release(self):
self.released = True
self._deferred.callback(None)
def __del__(self):
if not self.released:
logger.critical("Lock was destructed but never released!")
self.release()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
logger.debug("Releasing lock for key=%r", self.key)
self.release()
class LockManager(object):
""" Utility class that allows us to lock based on a `key` """
def __init__(self):
self._lock_deferreds = {}
@defer.inlineCallbacks
def lock(self, key):
""" Allows us to block until it is our turn.
Args:
key (str)
Returns:
Lock
"""
new_deferred = defer.Deferred()
old_deferred = self._lock_deferreds.get(key)
self._lock_deferreds[key] = new_deferred
if old_deferred:
logger.debug("Queueing on lock for key=%r", key)
yield old_deferred
logger.debug("Obtained lock for key=%r", key)
else:
logger.debug("Entering uncontended lock for key=%r", key)
defer.returnValue(Lock(new_deferred, key))

View File

@ -120,7 +120,6 @@ class RedactionTestCase(unittest.TestCase):
self.u_alice.to_string(),
start,
end,
None, # Is currently ignored
)
self.assertEqual(1, len(results))
@ -149,7 +148,6 @@ class RedactionTestCase(unittest.TestCase):
self.u_alice.to_string(),
start,
end,
None, # Is currently ignored
)
self.assertEqual(1, len(results))
@ -199,7 +197,6 @@ class RedactionTestCase(unittest.TestCase):
self.u_alice.to_string(),
start,
end,
None, # Is currently ignored
)
self.assertEqual(1, len(results))
@ -228,7 +225,6 @@ class RedactionTestCase(unittest.TestCase):
self.u_alice.to_string(),
start,
end,
None, # Is currently ignored
)
self.assertEqual(1, len(results))

View File

@ -68,7 +68,6 @@ class StreamStoreTestCase(unittest.TestCase):
self.u_bob.to_string(),
start,
end,
None, # Is currently ignored
)
self.assertEqual(1, len(results))
@ -105,7 +104,6 @@ class StreamStoreTestCase(unittest.TestCase):
self.u_alice.to_string(),
start,
end,
None, # Is currently ignored
)
self.assertEqual(1, len(results))
@ -147,7 +145,6 @@ class StreamStoreTestCase(unittest.TestCase):
self.u_bob.to_string(),
start,
end,
None, # Is currently ignored
)
# We should not get the message, as it happened *after* bob left.
@ -175,7 +172,6 @@ class StreamStoreTestCase(unittest.TestCase):
self.u_bob.to_string(),
start,
end,
None, # Is currently ignored
)
# We should not get the message, as it happened *after* bob left.

View File

@ -1,108 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from tests import unittest
from synapse.util.lockutils import LockManager
class LockManagerTestCase(unittest.TestCase):
def setUp(self):
self.lock_manager = LockManager()
@defer.inlineCallbacks
def test_one_lock(self):
key = "test"
deferred_lock1 = self.lock_manager.lock(key)
self.assertTrue(deferred_lock1.called)
lock1 = yield deferred_lock1
self.assertFalse(lock1.released)
lock1.release()
self.assertTrue(lock1.released)
@defer.inlineCallbacks
def test_concurrent_locks(self):
key = "test"
deferred_lock1 = self.lock_manager.lock(key)
deferred_lock2 = self.lock_manager.lock(key)
self.assertTrue(deferred_lock1.called)
self.assertFalse(deferred_lock2.called)
lock1 = yield deferred_lock1
self.assertFalse(lock1.released)
self.assertFalse(deferred_lock2.called)
lock1.release()
self.assertTrue(lock1.released)
self.assertTrue(deferred_lock2.called)
lock2 = yield deferred_lock2
lock2.release()
@defer.inlineCallbacks
def test_sequential_locks(self):
key = "test"
deferred_lock1 = self.lock_manager.lock(key)
self.assertTrue(deferred_lock1.called)
lock1 = yield deferred_lock1
self.assertFalse(lock1.released)
lock1.release()
self.assertTrue(lock1.released)
deferred_lock2 = self.lock_manager.lock(key)
self.assertTrue(deferred_lock2.called)
lock2 = yield deferred_lock2
self.assertFalse(lock2.released)
lock2.release()
self.assertTrue(lock2.released)
@defer.inlineCallbacks
def test_with_statement(self):
key = "test"
with (yield self.lock_manager.lock(key)) as lock:
self.assertFalse(lock.released)
self.assertTrue(lock.released)
@defer.inlineCallbacks
def test_two_with_statement(self):
key = "test"
with (yield self.lock_manager.lock(key)):
pass
with (yield self.lock_manager.lock(key)):
pass

View File

@ -335,7 +335,7 @@ class MemoryDataStore(object):
]
def get_room_events_stream(self, user_id=None, from_key=None, to_key=None,
room_id=None, limit=0, with_feedback=False):
limit=0, with_feedback=False):
return ([], from_key) # TODO
def get_joined_hosts_for_room(self, room_id):