Merge pull request #6310 from matrix-org/babolivier/msc2326_bg_update
MSC2326: Add background update to take previous events into accountpull/6353/head
						commit
						46e5db9eb2
					
				| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)).
 | 
			
		||||
| 
						 | 
				
			
			@ -21,6 +21,7 @@ from canonicaljson import json
 | 
			
		|||
 | 
			
		||||
from twisted.internet import defer
 | 
			
		||||
 | 
			
		||||
from synapse.api.constants import EventContentFields
 | 
			
		||||
from synapse.storage._base import make_in_list_sql_clause
 | 
			
		||||
from synapse.storage.background_updates import BackgroundUpdateStore
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -85,6 +86,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
 | 
			
		|||
            "event_fix_redactions_bytes", self._event_fix_redactions_bytes
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self.register_background_update_handler(
 | 
			
		||||
            "event_store_labels", self._event_store_labels
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def _background_reindex_fields_sender(self, progress, batch_size):
 | 
			
		||||
        target_min_stream_id = progress["target_min_stream_id_inclusive"]
 | 
			
		||||
| 
						 | 
				
			
			@ -503,3 +508,61 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
 | 
			
		|||
        yield self._end_background_update("event_fix_redactions_bytes")
 | 
			
		||||
 | 
			
		||||
        return 1
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def _event_store_labels(self, progress, batch_size):
 | 
			
		||||
        """Background update handler which will store labels for existing events."""
 | 
			
		||||
        last_event_id = progress.get("last_event_id", "")
 | 
			
		||||
 | 
			
		||||
        def _event_store_labels_txn(txn):
 | 
			
		||||
            txn.execute(
 | 
			
		||||
                """
 | 
			
		||||
                SELECT event_id, json FROM event_json
 | 
			
		||||
                LEFT JOIN event_labels USING (event_id)
 | 
			
		||||
                WHERE event_id > ? AND label IS NULL
 | 
			
		||||
                ORDER BY event_id LIMIT ?
 | 
			
		||||
                """,
 | 
			
		||||
                (last_event_id, batch_size),
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            results = list(txn)
 | 
			
		||||
 | 
			
		||||
            nbrows = 0
 | 
			
		||||
            last_row_event_id = ""
 | 
			
		||||
            for (event_id, event_json_raw) in results:
 | 
			
		||||
                event_json = json.loads(event_json_raw)
 | 
			
		||||
 | 
			
		||||
                self._simple_insert_many_txn(
 | 
			
		||||
                    txn=txn,
 | 
			
		||||
                    table="event_labels",
 | 
			
		||||
                    values=[
 | 
			
		||||
                        {
 | 
			
		||||
                            "event_id": event_id,
 | 
			
		||||
                            "label": label,
 | 
			
		||||
                            "room_id": event_json["room_id"],
 | 
			
		||||
                            "topological_ordering": event_json["depth"],
 | 
			
		||||
                        }
 | 
			
		||||
                        for label in event_json["content"].get(
 | 
			
		||||
                            EventContentFields.LABELS, []
 | 
			
		||||
                        )
 | 
			
		||||
                        if isinstance(label, str)
 | 
			
		||||
                    ],
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                nbrows += 1
 | 
			
		||||
                last_row_event_id = event_id
 | 
			
		||||
 | 
			
		||||
            self._background_update_progress_txn(
 | 
			
		||||
                txn, "event_store_labels", {"last_event_id": last_row_event_id}
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            return nbrows
 | 
			
		||||
 | 
			
		||||
        num_rows = yield self.runInteraction(
 | 
			
		||||
            desc="event_store_labels", func=_event_store_labels_txn
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if not num_rows:
 | 
			
		||||
            yield self._end_background_update("event_store_labels")
 | 
			
		||||
 | 
			
		||||
        return num_rows
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,17 @@
 | 
			
		|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
INSERT INTO background_updates (update_name, progress_json) VALUES
 | 
			
		||||
  ('event_store_labels', '{}');
 | 
			
		||||
		Loading…
	
		Reference in New Issue