Make _persist_event_txn call _persist_events_txn
parent
4500529c73
commit
e2a241af08
|
@ -112,7 +112,6 @@ class EventsStore(SQLBaseStore):
|
|||
event=event,
|
||||
context=context,
|
||||
backfilled=backfilled,
|
||||
stream_ordering=stream_ordering,
|
||||
is_new_state=is_new_state,
|
||||
current_state=current_state,
|
||||
)
|
||||
|
@ -155,12 +154,7 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
@log_function
|
||||
def _persist_event_txn(self, txn, event, context, backfilled,
|
||||
stream_ordering=None, is_new_state=True,
|
||||
current_state=None):
|
||||
|
||||
# Remove the any existing cache entries for the event_id
|
||||
txn.call_after(self._invalidate_get_event_cache, event.event_id)
|
||||
|
||||
is_new_state=True, current_state=None):
|
||||
# We purposefully do this first since if we include a `current_state`
|
||||
# key, we *want* to update the `current_state_events` table
|
||||
if current_state:
|
||||
|
@ -188,236 +182,13 @@ class EventsStore(SQLBaseStore):
|
|||
}
|
||||
)
|
||||
|
||||
outlier = event.internal_metadata.is_outlier()
|
||||
|
||||
if not outlier:
|
||||
self._update_min_depth_for_room_txn(
|
||||
return self._persist_events_txn(
|
||||
txn,
|
||||
event.room_id,
|
||||
event.depth
|
||||
[(event, context)],
|
||||
backfilled=backfilled,
|
||||
is_new_state=is_new_state,
|
||||
)
|
||||
|
||||
have_persisted = self._simple_select_one_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": event.event_id},
|
||||
retcols=["event_id", "outlier"],
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
metadata_json = encode_json(
|
||||
event.internal_metadata.get_dict(),
|
||||
using_frozen_dicts=USE_FROZEN_DICTS
|
||||
).decode("UTF-8")
|
||||
|
||||
# If we have already persisted this event, we don't need to do any
|
||||
# more processing.
|
||||
# The processing above must be done on every call to persist event,
|
||||
# since they might not have happened on previous calls. For example,
|
||||
# if we are persisting an event that we had persisted as an outlier,
|
||||
# but is no longer one.
|
||||
if have_persisted:
|
||||
if not outlier and have_persisted["outlier"]:
|
||||
self._store_state_groups_txn(txn, event, context)
|
||||
|
||||
sql = (
|
||||
"UPDATE event_json SET internal_metadata = ?"
|
||||
" WHERE event_id = ?"
|
||||
)
|
||||
txn.execute(
|
||||
sql,
|
||||
(metadata_json, event.event_id,)
|
||||
)
|
||||
|
||||
sql = (
|
||||
"UPDATE events SET outlier = ?"
|
||||
" WHERE event_id = ?"
|
||||
)
|
||||
txn.execute(
|
||||
sql,
|
||||
(False, event.event_id,)
|
||||
)
|
||||
return
|
||||
|
||||
if not outlier:
|
||||
self._store_state_groups_txn(txn, event, context)
|
||||
|
||||
self._handle_mult_prev_events(
|
||||
txn,
|
||||
events=[event]
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
self._store_room_members_txn(txn, [event])
|
||||
elif event.type == EventTypes.Name:
|
||||
self._store_room_name_txn(txn, event)
|
||||
elif event.type == EventTypes.Topic:
|
||||
self._store_room_topic_txn(txn, event)
|
||||
elif event.type == EventTypes.Redaction:
|
||||
self._store_redaction(txn, event)
|
||||
|
||||
event_dict = {
|
||||
k: v
|
||||
for k, v in event.get_dict().items()
|
||||
if k not in [
|
||||
"redacted",
|
||||
"redacted_because",
|
||||
]
|
||||
}
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="event_json",
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"internal_metadata": metadata_json,
|
||||
"json": encode_json(
|
||||
event_dict, using_frozen_dicts=USE_FROZEN_DICTS
|
||||
).decode("UTF-8"),
|
||||
},
|
||||
)
|
||||
|
||||
content = encode_json(
|
||||
event.content, using_frozen_dicts=USE_FROZEN_DICTS
|
||||
).decode("UTF-8")
|
||||
|
||||
vals = {
|
||||
"topological_ordering": event.depth,
|
||||
"event_id": event.event_id,
|
||||
"type": event.type,
|
||||
"room_id": event.room_id,
|
||||
"content": content,
|
||||
"processed": True,
|
||||
"outlier": outlier,
|
||||
"depth": event.depth,
|
||||
}
|
||||
|
||||
unrec = {
|
||||
k: v
|
||||
for k, v in event.get_dict().items()
|
||||
if k not in vals.keys() and k not in [
|
||||
"redacted",
|
||||
"redacted_because",
|
||||
"signatures",
|
||||
"hashes",
|
||||
"prev_events",
|
||||
]
|
||||
}
|
||||
|
||||
vals["unrecognized_keys"] = encode_json(
|
||||
unrec, using_frozen_dicts=USE_FROZEN_DICTS
|
||||
).decode("UTF-8")
|
||||
|
||||
sql = (
|
||||
"INSERT INTO events"
|
||||
" (stream_ordering, topological_ordering, event_id, type,"
|
||||
" room_id, content, processed, outlier, depth)"
|
||||
" VALUES (?,?,?,?,?,?,?,?,?)"
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
stream_ordering, event.depth, event.event_id, event.type,
|
||||
event.room_id, content, True, outlier, event.depth
|
||||
)
|
||||
)
|
||||
|
||||
if context.rejected:
|
||||
self._store_rejections_txn(
|
||||
txn, event.event_id, context.rejected
|
||||
)
|
||||
|
||||
# for hash_alg, hash_base64 in event.hashes.items():
|
||||
# hash_bytes = decode_base64(hash_base64)
|
||||
# self._store_event_content_hash_txn(
|
||||
# txn, event.event_id, hash_alg, hash_bytes,
|
||||
# )
|
||||
|
||||
# for prev_event_id, prev_hashes in event.prev_events:
|
||||
# for alg, hash_base64 in prev_hashes.items():
|
||||
# hash_bytes = decode_base64(hash_base64)
|
||||
# self._store_prev_event_hash_txn(
|
||||
# txn, event.event_id, prev_event_id, alg,
|
||||
# hash_bytes
|
||||
# )
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_auth",
|
||||
values=[
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"auth_id": auth_id,
|
||||
}
|
||||
for auth_id, _ in event.auth_events
|
||||
],
|
||||
)
|
||||
|
||||
self._store_event_reference_hashes_txn(txn, [event])
|
||||
|
||||
if event.is_state():
|
||||
vals = {
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"type": event.type,
|
||||
"state_key": event.state_key,
|
||||
}
|
||||
|
||||
# TODO: How does this work with backfilling?
|
||||
if hasattr(event, "replaces_state"):
|
||||
vals["prev_state"] = event.replaces_state
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"state_events",
|
||||
vals,
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_edges",
|
||||
values=[
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"prev_event_id": e_id,
|
||||
"room_id": event.room_id,
|
||||
"is_state": True,
|
||||
}
|
||||
for e_id, h in event.prev_state
|
||||
],
|
||||
)
|
||||
|
||||
if is_new_state and not context.rejected:
|
||||
txn.call_after(
|
||||
self.get_current_state_for_key.invalidate,
|
||||
event.room_id, event.type, event.state_key
|
||||
)
|
||||
|
||||
if (event.type == EventTypes.Name
|
||||
or event.type == EventTypes.Aliases):
|
||||
txn.call_after(
|
||||
self.get_room_name_and_aliases.invalidate,
|
||||
event.room_id
|
||||
)
|
||||
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
"current_state_events",
|
||||
keyvalues={
|
||||
"room_id": event.room_id,
|
||||
"type": event.type,
|
||||
"state_key": event.state_key,
|
||||
},
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
}
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
@log_function
|
||||
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
|
||||
is_new_state=True):
|
||||
|
@ -507,10 +278,13 @@ class EventsStore(SQLBaseStore):
|
|||
events_and_contexts
|
||||
)
|
||||
|
||||
if not events_and_contexts:
|
||||
return
|
||||
|
||||
self._store_mult_state_groups_txn(txn, [
|
||||
(event, context)
|
||||
for event, context in events_and_contexts
|
||||
if not event.internal_metadata.is_outlier
|
||||
if not event.internal_metadata.is_outlier()
|
||||
])
|
||||
|
||||
self._handle_mult_prev_events(
|
||||
|
@ -627,9 +401,6 @@ class EventsStore(SQLBaseStore):
|
|||
events_and_contexts,
|
||||
)
|
||||
|
||||
for event, context in state_events_and_contexts:
|
||||
pass
|
||||
|
||||
state_values = []
|
||||
for event, context in state_events_and_contexts:
|
||||
vals = {
|
||||
|
@ -668,14 +439,13 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
if is_new_state:
|
||||
for event, _ in state_events_and_contexts:
|
||||
if not context.rejected and not event.internal_metadata.is_outlier():
|
||||
if not context.rejected:
|
||||
txn.call_after(
|
||||
self.get_current_state_for_key.invalidate,
|
||||
event.room_id, event.type, event.state_key
|
||||
)
|
||||
|
||||
if (event.type == EventTypes.Name
|
||||
or event.type == EventTypes.Aliases):
|
||||
if event.type in [EventTypes.Name, EventTypes.Aliases]:
|
||||
txn.call_after(
|
||||
self.get_room_name_and_aliases.invalidate,
|
||||
event.room_id
|
||||
|
|
Loading…
Reference in New Issue