Various cleanups in the federation client code (#4031)

- Improve logging: log things in the right order, include destination and txids
  in all log lines, don't log successful responses twice

- Fix the docstring on TransportLayerClient.send_transaction

- Don't use treq.request, which is overcomplicated for our purposes: just use a
  twisted.web.client.Agent.

- simplify the logic for setting up the bodyProducer

- fix bytes/str confusions
pull/3786/merge
Richard van der Hoff 2018-10-16 10:44:49 +01:00 committed by GitHub
parent 03c11032c3
commit b8a5b0097c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 61 deletions

1
changelog.d/4031.misc Normal file
View File

@ -0,0 +1 @@
Various cleanups in the federation client code

View File

@ -633,14 +633,6 @@ class TransactionQueue(object):
transaction, json_data_cb transaction, json_data_cb
) )
code = 200 code = 200
if response:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e: except HttpResponseException as e:
code = e.code code = e.code
response = e.response response = e.response
@ -657,19 +649,24 @@ class TransactionQueue(object):
destination, txn_id, code destination, txn_id, code
) )
logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination)
yield self.transaction_actions.delivered( yield self.transaction_actions.delivered(
transaction, code, response transaction, code, response
) )
logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code != 200: if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
for p in pdus: for p in pdus:
logger.info( logger.warn(
"Failed to send event %s to %s", p.event_id, destination "TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
) )
success = False success = False

View File

@ -143,9 +143,17 @@ class TransportLayerClient(object):
transaction (Transaction) transaction (Transaction)
Returns: Returns:
Deferred: Results of the deferred is a tuple in the form of Deferred: Succeeds when we get a 2xx HTTP response. The result
(response_code, response_body) where the response_body is a will be the decoded JSON body.
python dict decoded from json
Fails with ``HTTPRequestException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
""" """
logger.debug( logger.debug(
"send_data dest=%s, txid=%s", "send_data dest=%s, txid=%s",
@ -170,11 +178,6 @@ class TransportLayerClient(object):
backoff_on_404=True, # If we get a 404 the other side has gone backoff_on_404=True, # If we get a 404 the other side has gone
) )
logger.debug(
"send_data dest=%s, txid=%s, got response: 200",
transaction.destination, transaction.transaction_id,
)
defer.returnValue(response) defer.returnValue(response)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object):
) )
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._store = hs.get_datastore() self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii') self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 60 self.default_timeout = 60
def schedule(x): def schedule(x):
@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
) )
method = request.method method_bytes = request.method.encode("ascii")
destination = request.destination destination_bytes = request.destination.encode("ascii")
path_bytes = request.path.encode("ascii") path_bytes = request.path.encode("ascii")
if request.query: if request.query:
query_bytes = encode_query_args(request.query) query_bytes = encode_query_args(request.query)
@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object):
query_bytes = b"" query_bytes = b""
headers_dict = { headers_dict = {
"User-Agent": [self.version_string], b"User-Agent": [self.version_string_bytes],
"Host": [request.destination], b"Host": [destination_bytes],
} }
with limiter: with limiter:
@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object):
else: else:
retries_left = MAX_SHORT_RETRIES retries_left = MAX_SHORT_RETRIES
url = urllib.parse.urlunparse(( url_bytes = urllib.parse.urlunparse((
b"matrix", destination.encode("ascii"), b"matrix", destination_bytes,
path_bytes, None, query_bytes, b"", path_bytes, None, query_bytes, b"",
)).decode('ascii') ))
url_str = url_bytes.decode('ascii')
http_url = urllib.parse.urlunparse(( url_to_sign_bytes = urllib.parse.urlunparse((
b"", b"", b"", b"",
path_bytes, None, query_bytes, b"", path_bytes, None, query_bytes, b"",
)).decode('ascii') ))
while True: while True:
try: try:
json = request.get_json() json = request.get_json()
if json: if json:
data = encode_canonical_json(json) headers_dict[b"Content-Type"] = [b"application/json"]
headers_dict["Content-Type"] = ["application/json"]
self.sign_request( self.sign_request(
destination, method, http_url, headers_dict, json destination_bytes, method_bytes, url_to_sign_bytes,
headers_dict, json,
) )
else: data = encode_canonical_json(json)
data = None
self.sign_request(destination, method, http_url, headers_dict)
logger.info(
"{%s} [%s] Sending request: %s %s",
request.txn_id, destination, method, url
)
if data:
producer = FileBodyProducer( producer = FileBodyProducer(
BytesIO(data), BytesIO(data),
cooperator=self._cooperator cooperator=self._cooperator,
) )
else: else:
producer = None producer = None
self.sign_request(
destination_bytes, method_bytes, url_to_sign_bytes,
headers_dict,
)
request_deferred = treq.request( logger.info(
method, "{%s} [%s] Sending request: %s %s",
url, request.txn_id, request.destination, request.method,
url_str,
)
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict), headers=Headers(headers_dict),
data=producer, bodyProducer=producer,
agent=self.agent,
reactor=self.hs.get_reactor(),
unbuffered=True
) )
request_deferred = timeout_deferred( request_deferred = timeout_deferred(
@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object):
logger.warn( logger.warn(
"{%s} [%s] Request failed: %s %s: %s", "{%s} [%s] Request failed: %s %s: %s",
request.txn_id, request.txn_id,
destination, request.destination,
method, request.method,
url, url_str,
_flatten_response_never_received(e), _flatten_response_never_received(e),
) )
@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object):
logger.debug( logger.debug(
"{%s} [%s] Waiting %ss before re-sending...", "{%s} [%s] Waiting %ss before re-sending...",
request.txn_id, request.txn_id,
destination, request.destination,
delay, delay,
) )
@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object):
logger.info( logger.info(
"{%s} [%s] Got response headers: %d %s", "{%s} [%s] Got response headers: %d %s",
request.txn_id, request.txn_id,
destination, request.destination,
response.code, response.code,
response.phrase.decode('ascii', errors='replace'), response.phrase.decode('ascii', errors='replace'),
) )
@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object):
destination_is must be non-None. destination_is must be non-None.
method (bytes): The HTTP method of the request method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request url_bytes (bytes): The URI path of the request
headers_dict (dict): Dictionary of request headers to append to headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
content (bytes): The body of the request append to
content (object): The body of the request
destination_is (bytes): As 'destination', but if the destination is an destination_is (bytes): As 'destination', but if the destination is an
identity server identity server