Merge branch 'release-v0.18.6' into develop

pull/1786/head
Matthew Hodgson 2017-01-05 13:03:02 +00:00
commit b31ed22738
5 changed files with 47 additions and 8 deletions

View File

@ -36,6 +36,15 @@ class _EventInternalMetadata(object):
def is_invite_from_remote(self): def is_invite_from_remote(self):
return getattr(self, "invite_from_remote", False) return getattr(self, "invite_from_remote", False)
def get_send_on_behalf_of(self):
"""Whether this server should send the event on behalf of another server.
This is used by the federation "send_join" API to forward the initial join
event for a server in the room.
returns a str with the name of the server this event is sent on behalf of.
"""
return getattr(self, "get_send_on_behalf_of", None)
def _event_dict_property(key): def _event_dict_property(key):
def getter(self): def getter(self):

View File

@ -19,7 +19,6 @@ from twisted.internet import defer
from .persistence import TransactionActions from .persistence import TransactionActions
from .units import Transaction, Edu from .units import Transaction, Edu
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn from synapse.util.logcontext import preserve_context_over_fn
@ -62,6 +61,7 @@ class TransactionQueue(object):
self.transport_layer = hs.get_federation_transport_client() self.transport_layer = hs.get_federation_transport_client()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
# Is a mapping from destinations -> deferreds. Used to keep track # Is a mapping from destinations -> deferreds. Used to keep track
# of which destinations have transactions in flight and when they are # of which destinations have transactions in flight and when they are
@ -153,17 +153,32 @@ class TransactionQueue(object):
break break
for event in events: for event in events:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
continue
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
users_in_room = yield self.state.get_current_user_in_room( users_in_room = yield self.state.get_current_user_in_room(
event.room_id, latest_event_ids=[event.event_id], event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
) )
destinations = set( destinations = set(
get_domain_from_id(user_id) for user_id in users_in_room get_domain_from_id(user_id) for user_id in users_in_room
) )
if send_on_behalf_of is not None:
if event.type == EventTypes.Member: # If we are sending the event on behalf of another server
if event.content["membership"] == Membership.JOIN: # then it already has the event and there is no reason to
destinations.add(get_domain_from_id(event.state_key)) # send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations) logger.debug("Sending %s to %r", event, destinations)

View File

@ -790,6 +790,10 @@ class FederationHandler(BaseHandler):
) )
event.internal_metadata.outlier = False event.internal_metadata.outlier = False
# Send this event on behalf of the origin server since they may not
# have an up to data view of the state of the room at this event so
# will not know which servers to send the event to.
event.internal_metadata.send_on_behalf_of = origin
context, event_stream_id, max_stream_id = yield self._handle_new_event( context, event_stream_id, max_stream_id = yield self._handle_new_event(
origin, event origin, event

View File

@ -1084,8 +1084,10 @@ class EventsStore(SQLBaseStore):
self._do_fetch self._do_fetch
) )
logger.info("Loading %d events", len(events))
with PreserveLoggingContext(): with PreserveLoggingContext():
rows = yield events_d rows = yield events_d
logger.info("Loaded %d events (%d rows)", len(events), len(rows))
if not allow_rejected: if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]] rows[:] = [r for r in rows if not r["rejects"]]

View File

@ -166,7 +166,11 @@ class Linearizer(object):
# do some work. # do some work.
""" """
def __init__(self): def __init__(self, name=None):
if name is None:
self.name = id(self)
else:
self.name = name
self.key_to_defer = {} self.key_to_defer = {}
@defer.inlineCallbacks @defer.inlineCallbacks
@ -185,15 +189,20 @@ class Linearizer(object):
self.key_to_defer[key] = new_defer self.key_to_defer[key] = new_defer
if current_defer: if current_defer:
logger.info("Waiting to acquire linearizer lock for key %r", key) logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key
)
with PreserveLoggingContext(): with PreserveLoggingContext():
yield current_defer yield current_defer
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
@contextmanager @contextmanager
def _ctx_manager(): def _ctx_manager():
try: try:
yield yield
finally: finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
new_defer.callback(None) new_defer.callback(None)
current_d = self.key_to_defer.get(key) current_d = self.key_to_defer.get(key)
if current_d is new_defer: if current_d is new_defer: