Merge pull request #3079 from matrix-org/erikj/limit_concurrent_sends
Limit concurrent event sends for a roompull/3086/head
commit
eaa2ebf20b
|
@ -454,40 +454,39 @@ class EventCreationHandler(object):
|
||||||
"""
|
"""
|
||||||
builder = self.event_builder_factory.new(event_dict)
|
builder = self.event_builder_factory.new(event_dict)
|
||||||
|
|
||||||
with (yield self.limiter.queue(builder.room_id)):
|
self.validator.validate_new(builder)
|
||||||
self.validator.validate_new(builder)
|
|
||||||
|
|
||||||
if builder.type == EventTypes.Member:
|
if builder.type == EventTypes.Member:
|
||||||
membership = builder.content.get("membership", None)
|
membership = builder.content.get("membership", None)
|
||||||
target = UserID.from_string(builder.state_key)
|
target = UserID.from_string(builder.state_key)
|
||||||
|
|
||||||
if membership in {Membership.JOIN, Membership.INVITE}:
|
if membership in {Membership.JOIN, Membership.INVITE}:
|
||||||
# If event doesn't include a display name, add one.
|
# If event doesn't include a display name, add one.
|
||||||
profile = self.profile_handler
|
profile = self.profile_handler
|
||||||
content = builder.content
|
content = builder.content
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if "displayname" not in content:
|
if "displayname" not in content:
|
||||||
content["displayname"] = yield profile.get_displayname(target)
|
content["displayname"] = yield profile.get_displayname(target)
|
||||||
if "avatar_url" not in content:
|
if "avatar_url" not in content:
|
||||||
content["avatar_url"] = yield profile.get_avatar_url(target)
|
content["avatar_url"] = yield profile.get_avatar_url(target)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Failed to get profile information for %r: %s",
|
"Failed to get profile information for %r: %s",
|
||||||
target, e
|
target, e
|
||||||
)
|
)
|
||||||
|
|
||||||
if token_id is not None:
|
if token_id is not None:
|
||||||
builder.internal_metadata.token_id = token_id
|
builder.internal_metadata.token_id = token_id
|
||||||
|
|
||||||
if txn_id is not None:
|
if txn_id is not None:
|
||||||
builder.internal_metadata.txn_id = txn_id
|
builder.internal_metadata.txn_id = txn_id
|
||||||
|
|
||||||
event, context = yield self.create_new_client_event(
|
event, context = yield self.create_new_client_event(
|
||||||
builder=builder,
|
builder=builder,
|
||||||
requester=requester,
|
requester=requester,
|
||||||
prev_event_ids=prev_event_ids,
|
prev_event_ids=prev_event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((event, context))
|
defer.returnValue((event, context))
|
||||||
|
|
||||||
|
@ -557,27 +556,34 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
See self.create_event and self.send_nonmember_event.
|
See self.create_event and self.send_nonmember_event.
|
||||||
"""
|
"""
|
||||||
event, context = yield self.create_event(
|
|
||||||
requester,
|
|
||||||
event_dict,
|
|
||||||
token_id=requester.access_token_id,
|
|
||||||
txn_id=txn_id
|
|
||||||
)
|
|
||||||
|
|
||||||
spam_error = self.spam_checker.check_event_for_spam(event)
|
# We limit the number of concurrent event sends in a room so that we
|
||||||
if spam_error:
|
# don't fork the DAG too much. If we don't limit then we can end up in
|
||||||
if not isinstance(spam_error, basestring):
|
# a situation where event persistence can't keep up, causing
|
||||||
spam_error = "Spam is not permitted here"
|
# extremities to pile up, which in turn leads to state resolution
|
||||||
raise SynapseError(
|
# taking longer.
|
||||||
403, spam_error, Codes.FORBIDDEN
|
with (yield self.limiter.queue(event_dict["room_id"])):
|
||||||
|
event, context = yield self.create_event(
|
||||||
|
requester,
|
||||||
|
event_dict,
|
||||||
|
token_id=requester.access_token_id,
|
||||||
|
txn_id=txn_id
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.send_nonmember_event(
|
spam_error = self.spam_checker.check_event_for_spam(event)
|
||||||
requester,
|
if spam_error:
|
||||||
event,
|
if not isinstance(spam_error, basestring):
|
||||||
context,
|
spam_error = "Spam is not permitted here"
|
||||||
ratelimit=ratelimit,
|
raise SynapseError(
|
||||||
)
|
403, spam_error, Codes.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self.send_nonmember_event(
|
||||||
|
requester,
|
||||||
|
event,
|
||||||
|
context,
|
||||||
|
ratelimit=ratelimit,
|
||||||
|
)
|
||||||
defer.returnValue(event)
|
defer.returnValue(event)
|
||||||
|
|
||||||
@measure_func("create_new_client_event")
|
@measure_func("create_new_client_event")
|
||||||
|
|
|
@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
||||||
content=content,
|
content=content,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
event, context = yield self.event_creation_hander.create_event(
|
event = yield self.event_creation_hander.create_and_send_nonmember_event(
|
||||||
requester,
|
requester,
|
||||||
event_dict,
|
event_dict,
|
||||||
token_id=requester.access_token_id,
|
|
||||||
txn_id=txn_id,
|
txn_id=txn_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.event_creation_hander.send_nonmember_event(
|
|
||||||
requester, event, context,
|
|
||||||
)
|
|
||||||
|
|
||||||
ret = {}
|
ret = {}
|
||||||
if event:
|
if event:
|
||||||
ret = {"event_id": event.event_id}
|
ret = {"event_id": event.event_id}
|
||||||
|
|
Loading…
Reference in New Issue