Fetch events from events_id in their own transactions
							parent
							
								
									a988361aea
								
							
						
					
					
						commit
						4071f29653
					
				| 
						 | 
				
			
			@ -867,11 +867,26 @@ class SQLBaseStore(object):
 | 
			
		|||
 | 
			
		||||
        return self.runInteraction("_simple_max_id", func)
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def _get_events(self, event_ids, check_redacted=True,
 | 
			
		||||
                    get_prev_content=False, desc="_get_events"):
 | 
			
		||||
        return self.runInteraction(
 | 
			
		||||
            desc, self._get_events_txn, event_ids,
 | 
			
		||||
            check_redacted=check_redacted, get_prev_content=get_prev_content,
 | 
			
		||||
        N = 50  # Only fetch 100 events at a time.
 | 
			
		||||
 | 
			
		||||
        ds = [
 | 
			
		||||
            self.runInteraction(
 | 
			
		||||
                desc,
 | 
			
		||||
                self._fetch_events_txn,
 | 
			
		||||
                event_ids[i*N:(i+1)*N],
 | 
			
		||||
                check_redacted=check_redacted,
 | 
			
		||||
                get_prev_content=get_prev_content,
 | 
			
		||||
            )
 | 
			
		||||
            for i in range(1 + len(event_ids) / N)
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        res = yield defer.gatherResults(ds, consumeErrors=True)
 | 
			
		||||
 | 
			
		||||
        defer.returnValue(
 | 
			
		||||
            list(itertools.chain(*res))
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def _get_events_txn(self, txn, event_ids, check_redacted=True,
 | 
			
		||||
| 
						 | 
				
			
			@ -1007,6 +1022,139 @@ class SQLBaseStore(object):
 | 
			
		|||
            if e_id in event_map and event_map[e_id]
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def _fetch_events(self, events, check_redacted=True,
 | 
			
		||||
                      get_prev_content=False, allow_rejected=False):
 | 
			
		||||
        if not events:
 | 
			
		||||
            defer.returnValue([])
 | 
			
		||||
 | 
			
		||||
        event_map = {}
 | 
			
		||||
 | 
			
		||||
        for event_id in events:
 | 
			
		||||
            try:
 | 
			
		||||
                ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content)
 | 
			
		||||
 | 
			
		||||
                if allow_rejected or not ret.rejected_reason:
 | 
			
		||||
                    event_map[event_id] = ret
 | 
			
		||||
                else:
 | 
			
		||||
                    event_map[event_id] = None
 | 
			
		||||
            except KeyError:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
        missing_events = [
 | 
			
		||||
            e for e in events
 | 
			
		||||
            if e not in event_map
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        if missing_events:
 | 
			
		||||
            sql = (
 | 
			
		||||
                "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id "
 | 
			
		||||
                " FROM event_json as e"
 | 
			
		||||
                " LEFT JOIN rejections as rej USING (event_id)"
 | 
			
		||||
                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
 | 
			
		||||
                " WHERE e.event_id IN (%s)"
 | 
			
		||||
            ) % (",".join(["?"]*len(missing_events)),)
 | 
			
		||||
 | 
			
		||||
            rows = yield self._execute(
 | 
			
		||||
                "_fetch_events",
 | 
			
		||||
                None,
 | 
			
		||||
                sql,
 | 
			
		||||
                *missing_events
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            res_ds = [
 | 
			
		||||
                self._get_event_from_row(
 | 
			
		||||
                    row[0], row[1], row[2],
 | 
			
		||||
                    check_redacted=check_redacted,
 | 
			
		||||
                    get_prev_content=get_prev_content,
 | 
			
		||||
                    rejected_reason=row[3],
 | 
			
		||||
                )
 | 
			
		||||
                for row in rows
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
            res = yield defer.gatherResults(res_ds, consumeErrors=True)
 | 
			
		||||
 | 
			
		||||
            event_map.update({
 | 
			
		||||
                e.event_id: e
 | 
			
		||||
                for e in res if e
 | 
			
		||||
            })
 | 
			
		||||
 | 
			
		||||
            for e in res:
 | 
			
		||||
                self._get_event_cache.prefill(
 | 
			
		||||
                    e.event_id, check_redacted, get_prev_content, e
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        defer.returnValue([
 | 
			
		||||
            event_map[e_id] for e_id in events
 | 
			
		||||
            if e_id in event_map and event_map[e_id]
 | 
			
		||||
        ])
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def _get_event_from_row(self, internal_metadata, js, redacted,
 | 
			
		||||
                            check_redacted=True, get_prev_content=False,
 | 
			
		||||
                            rejected_reason=None):
 | 
			
		||||
 | 
			
		||||
        start_time = time.time() * 1000
 | 
			
		||||
 | 
			
		||||
        def update_counter(desc, last_time):
 | 
			
		||||
            curr_time = self._get_event_counters.update(desc, last_time)
 | 
			
		||||
            sql_getevents_timer.inc_by(curr_time - last_time, desc)
 | 
			
		||||
            return curr_time
 | 
			
		||||
 | 
			
		||||
        d = json.loads(js)
 | 
			
		||||
        start_time = update_counter("decode_json", start_time)
 | 
			
		||||
 | 
			
		||||
        internal_metadata = json.loads(internal_metadata)
 | 
			
		||||
        start_time = update_counter("decode_internal", start_time)
 | 
			
		||||
 | 
			
		||||
        if rejected_reason:
 | 
			
		||||
            rejected_reason = yield self._simple_select_one_onecol(
 | 
			
		||||
                desc="_get_event_from_row",
 | 
			
		||||
                table="rejections",
 | 
			
		||||
                keyvalues={"event_id": rejected_reason},
 | 
			
		||||
                retcol="reason",
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        ev = FrozenEvent(
 | 
			
		||||
            d,
 | 
			
		||||
            internal_metadata_dict=internal_metadata,
 | 
			
		||||
            rejected_reason=rejected_reason,
 | 
			
		||||
        )
 | 
			
		||||
        start_time = update_counter("build_frozen_event", start_time)
 | 
			
		||||
 | 
			
		||||
        if check_redacted and redacted:
 | 
			
		||||
            ev = prune_event(ev)
 | 
			
		||||
 | 
			
		||||
            redaction_id = yield self._simple_select_one_onecol(
 | 
			
		||||
                desc="_get_event_from_row",
 | 
			
		||||
                table="redactions",
 | 
			
		||||
                keyvalues={"redacts": ev.event_id},
 | 
			
		||||
                retcol="event_id",
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            ev.unsigned["redacted_by"] = redaction_id
 | 
			
		||||
            # Get the redaction event.
 | 
			
		||||
 | 
			
		||||
            because = yield self.get_event_txn(
 | 
			
		||||
                redaction_id,
 | 
			
		||||
                check_redacted=False
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if because:
 | 
			
		||||
                ev.unsigned["redacted_because"] = because
 | 
			
		||||
            start_time = update_counter("redact_event", start_time)
 | 
			
		||||
 | 
			
		||||
        if get_prev_content and "replaces_state" in ev.unsigned:
 | 
			
		||||
            prev = yield self.get_event(
 | 
			
		||||
                ev.unsigned["replaces_state"],
 | 
			
		||||
                get_prev_content=False,
 | 
			
		||||
            )
 | 
			
		||||
            if prev:
 | 
			
		||||
                ev.unsigned["prev_content"] = prev.get_dict()["content"]
 | 
			
		||||
            start_time = update_counter("get_prev_content", start_time)
 | 
			
		||||
 | 
			
		||||
        defer.returnValue(ev)
 | 
			
		||||
 | 
			
		||||
    def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
 | 
			
		||||
                                check_redacted=True, get_prev_content=False,
 | 
			
		||||
                                rejected_reason=None):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -85,13 +85,13 @@ class StateStore(SQLBaseStore):
 | 
			
		|||
 | 
			
		||||
        @defer.inlineCallbacks
 | 
			
		||||
        def c(vals):
 | 
			
		||||
            vals[:] = yield self.runInteraction(
 | 
			
		||||
                "_get_state_groups_ev",
 | 
			
		||||
                self._fetch_events_txn, vals
 | 
			
		||||
            )
 | 
			
		||||
            vals[:] = yield self._fetch_events(vals, get_prev_content=False)
 | 
			
		||||
 | 
			
		||||
        yield defer.gatherResults(
 | 
			
		||||
            [c(vals) for vals in states.values()],
 | 
			
		||||
            [
 | 
			
		||||
                c(vals)
 | 
			
		||||
                for vals in states.values()
 | 
			
		||||
            ],
 | 
			
		||||
            consumeErrors=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -298,6 +298,7 @@ class StreamStore(SQLBaseStore):
 | 
			
		|||
 | 
			
		||||
        return self.runInteraction("paginate_room_events", f)
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def get_recent_events_for_room(self, room_id, limit, end_token,
 | 
			
		||||
                                   with_feedback=False, from_token=None):
 | 
			
		||||
        # TODO (erikj): Handle compressed feedback
 | 
			
		||||
| 
						 | 
				
			
			@ -349,20 +350,21 @@ class StreamStore(SQLBaseStore):
 | 
			
		|||
            else:
 | 
			
		||||
                token = (str(end_token), str(end_token))
 | 
			
		||||
 | 
			
		||||
            events = self._get_events_txn(
 | 
			
		||||
                txn,
 | 
			
		||||
                [r["event_id"] for r in rows],
 | 
			
		||||
                get_prev_content=True
 | 
			
		||||
            )
 | 
			
		||||
            return rows, token
 | 
			
		||||
 | 
			
		||||
            self._set_before_and_after(events, rows)
 | 
			
		||||
 | 
			
		||||
            return events, token
 | 
			
		||||
 | 
			
		||||
        return self.runInteraction(
 | 
			
		||||
        rows, token = yield self.runInteraction(
 | 
			
		||||
            "get_recent_events_for_room", get_recent_events_for_room_txn
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        events = yield self._get_events(
 | 
			
		||||
            [r["event_id"] for r in rows],
 | 
			
		||||
            get_prev_content=True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._set_before_and_after(events, rows)
 | 
			
		||||
 | 
			
		||||
        defer.returnValue((events, token))
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def get_room_events_max_id(self, direction='f'):
 | 
			
		||||
        token = yield self._stream_id_gen.get_max_token(self)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue