Use new helper base class for ReplicationSendEventRestServlet
							parent
							
								
									d81602b75a
								
							
						
					
					
						commit
						729b672823
					
				|  | @ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder | |||
| from synapse.crypto.event_signing import add_hashes_and_signatures | ||||
| from synapse.events.utils import serialize_event | ||||
| from synapse.events.validator import EventValidator | ||||
| from synapse.replication.http.send_event import send_event_to_master | ||||
| from synapse.replication.http.send_event import ReplicationSendEventRestServlet | ||||
| from synapse.types import RoomAlias, UserID | ||||
| from synapse.util.async import Linearizer | ||||
| from synapse.util.frozenutils import frozendict_json_encoder | ||||
|  | @ -171,7 +171,7 @@ class EventCreationHandler(object): | |||
|         self.notifier = hs.get_notifier() | ||||
|         self.config = hs.config | ||||
| 
 | ||||
|         self.http_client = hs.get_simple_http_client() | ||||
|         self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) | ||||
| 
 | ||||
|         # This is only used to get at ratelimit function, and maybe_kick_guest_users | ||||
|         self.base_handler = BaseHandler(hs) | ||||
|  | @ -559,12 +559,9 @@ class EventCreationHandler(object): | |||
|         try: | ||||
|             # If we're a worker we need to hit out to the master. | ||||
|             if self.config.worker_app: | ||||
|                 yield send_event_to_master( | ||||
|                     clock=self.hs.get_clock(), | ||||
|                 yield self.send_event_to_master( | ||||
|                     event_id=event.event_id, | ||||
|                     store=self.store, | ||||
|                     client=self.http_client, | ||||
|                     host=self.config.worker_replication_host, | ||||
|                     port=self.config.worker_replication_http_port, | ||||
|                     requester=requester, | ||||
|                     event=event, | ||||
|                     context=context, | ||||
|  |  | |||
|  | @ -14,90 +14,26 @@ | |||
| # limitations under the License. | ||||
| 
 | ||||
| import logging | ||||
| import re | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from synapse.api.errors import ( | ||||
|     CodeMessageException, | ||||
|     MatrixCodeMessageException, | ||||
|     SynapseError, | ||||
| ) | ||||
| from synapse.events import FrozenEvent | ||||
| from synapse.events.snapshot import EventContext | ||||
| from synapse.http.servlet import RestServlet, parse_json_object_from_request | ||||
| from synapse.http.servlet import parse_json_object_from_request | ||||
| from synapse.replication.http._base import ReplicationEndpoint | ||||
| from synapse.types import Requester, UserID | ||||
| from synapse.util.caches.response_cache import ResponseCache | ||||
| from synapse.util.metrics import Measure | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| @defer.inlineCallbacks | ||||
| def send_event_to_master(clock, store, client, host, port, requester, event, context, | ||||
|                          ratelimit, extra_users): | ||||
|     """Send event to be handled on the master | ||||
| 
 | ||||
|     Args: | ||||
|         clock (synapse.util.Clock) | ||||
|         store (DataStore) | ||||
|         client (SimpleHttpClient) | ||||
|         host (str): host of master | ||||
|         port (int): port on master listening for HTTP replication | ||||
|         requester (Requester) | ||||
|         event (FrozenEvent) | ||||
|         context (EventContext) | ||||
|         ratelimit (bool) | ||||
|         extra_users (list(UserID)): Any extra users to notify about event | ||||
|     """ | ||||
|     uri = "http://%s:%s/_synapse/replication/send_event/%s" % ( | ||||
|         host, port, event.event_id, | ||||
|     ) | ||||
| 
 | ||||
|     serialized_context = yield context.serialize(event, store) | ||||
| 
 | ||||
|     payload = { | ||||
|         "event": event.get_pdu_json(), | ||||
|         "internal_metadata": event.internal_metadata.get_dict(), | ||||
|         "rejected_reason": event.rejected_reason, | ||||
|         "context": serialized_context, | ||||
|         "requester": requester.serialize(), | ||||
|         "ratelimit": ratelimit, | ||||
|         "extra_users": [u.to_string() for u in extra_users], | ||||
|     } | ||||
| 
 | ||||
|     try: | ||||
|         # We keep retrying the same request for timeouts. This is so that we | ||||
|         # have a good idea that the request has either succeeded or failed on | ||||
|         # the master, and so whether we should clean up or not. | ||||
|         while True: | ||||
|             try: | ||||
|                 result = yield client.put_json(uri, payload) | ||||
|                 break | ||||
|             except CodeMessageException as e: | ||||
|                 if e.code != 504: | ||||
|                     raise | ||||
| 
 | ||||
|             logger.warn("send_event request timed out") | ||||
| 
 | ||||
|             # If we timed out we probably don't need to worry about backing | ||||
|             # off too much, but lets just wait a little anyway. | ||||
|             yield clock.sleep(1) | ||||
|     except MatrixCodeMessageException as e: | ||||
|         # We convert to SynapseError as we know that it was a SynapseError | ||||
|         # on the master process that we should send to the client. (And | ||||
|         # importantly, not stack traces everywhere) | ||||
|         raise SynapseError(e.code, e.msg, e.errcode) | ||||
|     defer.returnValue(result) | ||||
| 
 | ||||
| 
 | ||||
| class ReplicationSendEventRestServlet(RestServlet): | ||||
| class ReplicationSendEventRestServlet(ReplicationEndpoint): | ||||
|     """Handles events newly created on workers, including persisting and | ||||
|     notifying. | ||||
| 
 | ||||
|     The API looks like: | ||||
| 
 | ||||
|         POST /_synapse/replication/send_event/:event_id | ||||
|         POST /_synapse/replication/send_event/:event_id/:txn_id | ||||
| 
 | ||||
|         { | ||||
|             "event": { .. serialized event .. }, | ||||
|  | @ -109,27 +45,48 @@ class ReplicationSendEventRestServlet(RestServlet): | |||
|             "extra_users": [], | ||||
|         } | ||||
|     """ | ||||
|     PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")] | ||||
|     NAME = "send_event" | ||||
|     PATH_ARGS = ("event_id",) | ||||
|     POST = True | ||||
| 
 | ||||
|     def __init__(self, hs): | ||||
|         super(ReplicationSendEventRestServlet, self).__init__() | ||||
|         super(ReplicationSendEventRestServlet, self).__init__(hs) | ||||
| 
 | ||||
|         self.event_creation_handler = hs.get_event_creation_handler() | ||||
|         self.store = hs.get_datastore() | ||||
|         self.clock = hs.get_clock() | ||||
| 
 | ||||
|         # The responses are tiny, so we may as well cache them for a while | ||||
|         self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) | ||||
|     @staticmethod | ||||
|     @defer.inlineCallbacks | ||||
|     def _serialize_payload(event_id, store, event, context, requester, | ||||
|                            ratelimit, extra_users): | ||||
|         """ | ||||
|         Args: | ||||
|             event_id (str) | ||||
|             store (DataStore) | ||||
|             requester (Requester) | ||||
|             event (FrozenEvent) | ||||
|             context (EventContext) | ||||
|             ratelimit (bool) | ||||
|             extra_users (list(UserID)): Any extra users to notify about event | ||||
|         """ | ||||
| 
 | ||||
|     def on_PUT(self, request, event_id): | ||||
|         return self.response_cache.wrap( | ||||
|             event_id, | ||||
|             self._handle_request, | ||||
|             request | ||||
|         ) | ||||
|         serialized_context = yield context.serialize(event, store) | ||||
| 
 | ||||
|         payload = { | ||||
|             "event": event.get_pdu_json(), | ||||
|             "internal_metadata": event.internal_metadata.get_dict(), | ||||
|             "rejected_reason": event.rejected_reason, | ||||
|             "context": serialized_context, | ||||
|             "requester": requester.serialize(), | ||||
|             "ratelimit": ratelimit, | ||||
|             "extra_users": [u.to_string() for u in extra_users], | ||||
|         } | ||||
| 
 | ||||
|         defer.returnValue(payload) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _handle_request(self, request): | ||||
|     def _handle_request(self, request, event_id): | ||||
|         with Measure(self.clock, "repl_send_event_parse"): | ||||
|             content = parse_json_object_from_request(request) | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston