Merge pull request #3995 from matrix-org/rav/no_deextrem_outliers
Fix bug in forward_extremity update logicpull/4017/head
						commit
						c6dbd216e6
					
				|  | @ -0,0 +1 @@ | |||
| Fix bug in event persistence logic which caused 'NoneType is not iterable' | ||||
|  | @ -38,6 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore | |||
| from synapse.storage.event_federation import EventFederationStore | ||||
| from synapse.storage.events_worker import EventsWorkerStore | ||||
| from synapse.types import RoomStreamToken, get_domain_from_id | ||||
| from synapse.util import batch_iter | ||||
| from synapse.util.async_helpers import ObservableDeferred | ||||
| from synapse.util.caches.descriptors import cached, cachedInlineCallbacks | ||||
| from synapse.util.frozenutils import frozendict_json_encoder | ||||
|  | @ -386,12 +387,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore | |||
|                             ) | ||||
| 
 | ||||
|                         for room_id, ev_ctx_rm in iteritems(events_by_room): | ||||
|                             # Work out new extremities by recursively adding and removing | ||||
|                             # the new events. | ||||
|                             latest_event_ids = yield self.get_latest_event_ids_in_room( | ||||
|                                 room_id | ||||
|                             ) | ||||
|                             new_latest_event_ids = yield self._calculate_new_extremeties( | ||||
|                             new_latest_event_ids = yield self._calculate_new_extremities( | ||||
|                                 room_id, ev_ctx_rm, latest_event_ids | ||||
|                             ) | ||||
| 
 | ||||
|  | @ -400,6 +399,12 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore | |||
|                                 # No change in extremities, so no change in state | ||||
|                                 continue | ||||
| 
 | ||||
|                             # there should always be at least one forward extremity. | ||||
|                             # (except during the initial persistence of the send_join | ||||
|                             # results, in which case there will be no existing | ||||
|                             # extremities, so we'll `continue` above and skip this bit.) | ||||
|                             assert new_latest_event_ids, "No forward extremities left!" | ||||
| 
 | ||||
|                             new_forward_extremeties[room_id] = new_latest_event_ids | ||||
| 
 | ||||
|                             len_1 = ( | ||||
|  | @ -517,44 +522,79 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore | |||
|                     ) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): | ||||
|         """Calculates the new forward extremeties for a room given events to | ||||
|     def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids): | ||||
|         """Calculates the new forward extremities for a room given events to | ||||
|         persist. | ||||
| 
 | ||||
|         Assumes that we are only persisting events for one room at a time. | ||||
|         """ | ||||
|         new_latest_event_ids = set(latest_event_ids) | ||||
|         # First, add all the new events to the list | ||||
|         new_latest_event_ids.update( | ||||
|             event.event_id for event, ctx in event_contexts | ||||
| 
 | ||||
|         # we're only interested in new events which aren't outliers and which aren't | ||||
|         # being rejected. | ||||
|         new_events = [ | ||||
|             event for event, ctx in event_contexts | ||||
|             if not event.internal_metadata.is_outlier() and not ctx.rejected | ||||
|         ] | ||||
| 
 | ||||
|         # start with the existing forward extremities | ||||
|         result = set(latest_event_ids) | ||||
| 
 | ||||
|         # add all the new events to the list | ||||
|         result.update( | ||||
|             event.event_id for event in new_events | ||||
|         ) | ||||
|         # Now remove all events that are referenced by the to-be-added events | ||||
|         new_latest_event_ids.difference_update( | ||||
| 
 | ||||
|         # Now remove all events which are prev_events of any of the new events | ||||
|         result.difference_update( | ||||
|             e_id | ||||
|             for event, ctx in event_contexts | ||||
|             for event in new_events | ||||
|             for e_id, _ in event.prev_events | ||||
|             if not event.internal_metadata.is_outlier() and not ctx.rejected | ||||
|         ) | ||||
| 
 | ||||
|         # And finally remove any events that are referenced by previously added | ||||
|         # events. | ||||
|         rows = yield self._simple_select_many_batch( | ||||
|             table="event_edges", | ||||
|             column="prev_event_id", | ||||
|             iterable=list(new_latest_event_ids), | ||||
|             retcols=["prev_event_id"], | ||||
|             keyvalues={ | ||||
|                 "is_state": False, | ||||
|             }, | ||||
|             desc="_calculate_new_extremeties", | ||||
|         ) | ||||
|         # Finally, remove any events which are prev_events of any existing events. | ||||
|         existing_prevs = yield self._get_events_which_are_prevs(result) | ||||
|         result.difference_update(existing_prevs) | ||||
| 
 | ||||
|         new_latest_event_ids.difference_update( | ||||
|             row["prev_event_id"] for row in rows | ||||
|         ) | ||||
|         defer.returnValue(result) | ||||
| 
 | ||||
|         defer.returnValue(new_latest_event_ids) | ||||
|     @defer.inlineCallbacks | ||||
|     def _get_events_which_are_prevs(self, event_ids): | ||||
|         """Filter the supplied list of event_ids to get those which are prev_events of | ||||
|         existing (non-outlier/rejected) events. | ||||
| 
 | ||||
|         Args: | ||||
|             event_ids (Iterable[str]): event ids to filter | ||||
| 
 | ||||
|         Returns: | ||||
|             Deferred[List[str]]: filtered event ids | ||||
|         """ | ||||
|         results = [] | ||||
| 
 | ||||
|         def _get_events(txn, batch): | ||||
|             sql = """ | ||||
|             SELECT prev_event_id | ||||
|             FROM event_edges | ||||
|                 INNER JOIN events USING (event_id) | ||||
|                 LEFT JOIN rejections USING (event_id) | ||||
|             WHERE | ||||
|                 prev_event_id IN (%s) | ||||
|                 AND NOT events.outlier | ||||
|                 AND rejections.event_id IS NULL | ||||
|             """ % ( | ||||
|                 ",".join("?" for _ in batch), | ||||
|             ) | ||||
| 
 | ||||
|             txn.execute(sql, batch) | ||||
|             results.extend(r[0] for r in txn) | ||||
| 
 | ||||
|         for chunk in batch_iter(event_ids, 100): | ||||
|             yield self.runInteraction( | ||||
|                 "_get_events_which_are_prevs", | ||||
|                 _get_events, | ||||
|                 chunk, | ||||
|             ) | ||||
| 
 | ||||
|         defer.returnValue(results) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids, | ||||
|  | @ -586,10 +626,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore | |||
|             the new current state is only returned if we've already calculated | ||||
|             it. | ||||
|         """ | ||||
| 
 | ||||
|         if not new_latest_event_ids: | ||||
|             return | ||||
| 
 | ||||
|         # map from state_group to ((type, key) -> event_id) state map | ||||
|         state_groups_map = {} | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Richard van der Hoff
						Richard van der Hoff