Start filling out and using new events tables

pull/12/head
Erik Johnston 2014-10-29 16:59:24 +00:00
parent a10c2ec88d
commit e7858b6d7e
8 changed files with 162 additions and 96 deletions

View File

@ -48,8 +48,8 @@ class PduCodec(object):
kwargs["room_id"] = pdu.context
kwargs["etype"] = pdu.pdu_type
kwargs["prev_events"] = [
encode_event_id(i, o)
for i, o in pdu.prev_pdus
(encode_event_id(i, o), s)
for i, o, s in pdu.prev_pdus
]
if hasattr(pdu, "prev_state_id") and hasattr(pdu, "prev_state_origin"):
@ -82,7 +82,13 @@ class PduCodec(object):
d["pdu_type"] = event.type
if hasattr(event, "prev_events"):
d["prev_pdus"] = [decode_event_id(e) for e in event.prev_events]
def f(e, s):
i, o = decode_event_id(e, self.server_name)
return i, o, s
d["prev_pdus"] = [
f(e, s)
for e, s in event.prev_events
]
if hasattr(event, "prev_state"):
d["prev_state_id"], d["prev_state_origin"] = (

View File

@ -16,6 +16,8 @@
from twisted.internet import defer
from synapse.api.errors import LimitExceededError
from synapse.util.async import run_on_reactor
class BaseHandler(object):
def __init__(self, hs):
@ -45,6 +47,8 @@ class BaseHandler(object):
@defer.inlineCallbacks
def _on_new_room_event(self, event, snapshot, extra_destinations=[],
extra_users=[], suppress_auth=False):
yield run_on_reactor()
snapshot.fill_out_prev_events(event)
yield self.state_handler.annotate_state_groups(event)

View File

@ -22,6 +22,7 @@ from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.federation.pdu_codec import PduCodec, encode_event_id
from synapse.api.errors import SynapseError
from synapse.util.async import run_on_reactor
from twisted.internet import defer, reactor
@ -81,6 +82,8 @@ class FederationHandler(BaseHandler):
processing.
"""
yield run_on_reactor()
pdu = self.pdu_codec.pdu_from_event(event)
if not hasattr(pdu, "destinations") or not pdu.destinations:
@ -102,6 +105,8 @@ class FederationHandler(BaseHandler):
self.room_queues[event.room_id].append(pdu)
return
logger.debug("Processing event: %s", event.event_id)
if state:
state = [self.pdu_codec.event_from_pdu(p) for p in state]
@ -216,58 +221,65 @@ class FederationHandler(BaseHandler):
assert(event.state_key == joinee)
assert(event.room_id == room_id)
event.outlier = False
self.room_queues[room_id] = []
event.event_id = self.event_factory.create_event_id()
event.content = content
state = yield self.replication_layer.send_join(
target_host,
self.pdu_codec.pdu_from_event(event)
)
state = [self.pdu_codec.event_from_pdu(p) for p in state]
logger.debug("do_invite_join state: %s", state)
is_new_state = yield self.state_handler.annotate_state_groups(
event,
state=state
)
try:
yield self.store.store_room(
room_id=room_id,
room_creator_user_id="",
is_public=False
)
except:
# FIXME
pass
event.event_id = self.event_factory.create_event_id()
event.content = content
for e in state:
# FIXME: Auth these.
is_new_state = yield self.state_handler.annotate_state_groups(
e,
state = yield self.replication_layer.send_join(
target_host,
self.pdu_codec.pdu_from_event(event)
)
state = [self.pdu_codec.event_from_pdu(p) for p in state]
logger.debug("do_invite_join state: %s", state)
is_new_state = yield self.state_handler.annotate_state_groups(
event,
state=state
)
logger.debug("do_invite_join event: %s", event)
try:
yield self.store.store_room(
room_id=room_id,
room_creator_user_id="",
is_public=False
)
except:
# FIXME
pass
for e in state:
# FIXME: Auth these.
e.outlier = True
yield self.state_handler.annotate_state_groups(
e,
)
yield self.store.persist_event(
e,
backfilled=False,
is_new_state=False
)
yield self.store.persist_event(
e,
event,
backfilled=False,
is_new_state=False
is_new_state=is_new_state
)
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
yield self.store.persist_event(
event,
backfilled=False,
is_new_state=is_new_state
)
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
for p in room_queue:
yield self.on_receive_pdu(p, backfilled=False)
for p in room_queue:
yield self.on_receive_pdu(p, backfilled=False)
defer.returnValue(True)

View File

@ -143,7 +143,9 @@ class StateHandler(object):
defer.returnValue(False)
return
new_state = yield self.resolve_state_groups(event.prev_events)
new_state = yield self.resolve_state_groups(
[e for e, _ in event.prev_events]
)
event.old_state_events = copy.deepcopy(new_state)
@ -157,12 +159,11 @@ class StateHandler(object):
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
# FIXME: HACK!
pdus = yield self.store.get_latest_pdus_in_context(room_id)
events = yield self.store.get_latest_events_in_room(room_id)
event_ids = [
encode_event_id(pdu_id, origin)
for pdu_id, origin, _ in pdus
e_id
for e_id, _ in events
]
res = yield self.resolve_state_groups(event_ids)

View File

@ -71,6 +71,7 @@ SCHEMAS = [
"state",
"signatures",
"event_edges",
"event_signatures",
]
@ -134,7 +135,8 @@ class DataStore(RoomMemberStore, RoomStore,
"type",
"room_id",
"content",
"unrecognized_keys"
"unrecognized_keys",
"depth",
],
allow_none=allow_none,
)
@ -263,7 +265,12 @@ class DataStore(RoomMemberStore, RoomStore,
vals["unrecognized_keys"] = json.dumps(unrec)
try:
self._simple_insert_txn(txn, "events", vals)
self._simple_insert_txn(
txn,
"events",
vals,
or_replace=(not outlier),
)
except:
logger.warn(
"Failed to persist, probably duplicate: %s",
@ -307,13 +314,14 @@ class DataStore(RoomMemberStore, RoomStore,
}
)
signatures = event.signatures.get(event.origin, {})
if hasattr(event, "signatures"):
signatures = event.signatures.get(event.origin, {})
for key_id, signature_base64 in signatures.items():
signature_bytes = decode_base64(signature_base64)
self._store_event_origin_signature_txn(
txn, event.event_id, key_id, signature_bytes,
)
for key_id, signature_base64 in signatures.items():
signature_bytes = decode_base64(signature_base64)
self._store_event_origin_signature_txn(
txn, event.event_id, event.origin, key_id, signature_bytes,
)
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
@ -323,10 +331,10 @@ class DataStore(RoomMemberStore, RoomStore,
)
# TODO
(ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
self._store_event_reference_hash_txn(
txn, event.event_id, ref_alg, ref_hash_bytes
)
# (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
# self._store_event_reference_hash_txn(
# txn, event.event_id, ref_alg, ref_hash_bytes
# )
self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
@ -412,9 +420,7 @@ class DataStore(RoomMemberStore, RoomStore,
"""
def _snapshot(txn):
membership_state = self._get_room_member(txn, user_id, room_id)
prev_events = self._get_latest_events_in_room(
txn, room_id
)
prev_events = self._get_latest_events_in_room(txn, room_id)
if state_type is not None and state_key is not None:
prev_state_pdu = self._get_current_state_pdu(
@ -469,12 +475,12 @@ class Snapshot(object):
return
event.prev_events = [
(p_id, origin, hashes)
for p_id, origin, hashes, _ in self.prev_events
(event_id, hashes)
for event_id, hashes, _ in self.prev_events
]
if self.prev_events:
event.depth = max([int(v) for _, _, _, v in self.prev_events]) + 1
event.depth = max([int(v) for _, _, v in self.prev_events]) + 1
else:
event.depth = 0
@ -533,9 +539,10 @@ def prepare_database(db_conn):
db_conn.commit()
else:
sql_script = "BEGIN TRANSACTION;"
sql_script = "BEGIN TRANSACTION;\n"
for sql_loc in SCHEMAS:
sql_script += read_schema(sql_loc)
sql_script += "\n"
sql_script += "COMMIT TRANSACTION;"
c.executescript(sql_script)
db_conn.commit()

View File

@ -19,10 +19,12 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.api.events.utils import prune_event
from synapse.util.logutils import log_function
from syutil.base64util import encode_base64
import collections
import copy
import json
import sys
import time
@ -67,6 +69,9 @@ class LoggingTransaction(object):
return self.txn.execute(
sql, *args, **kwargs
)
except:
logger.exception("[SQL FAIL] {%s}", self.name)
raise
finally:
end = time.clock() * 1000
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
@ -85,14 +90,20 @@ class SQLBaseStore(object):
"""Wraps the .runInteraction() method on the underlying db_pool."""
def inner_func(txn, *args, **kwargs):
start = time.clock() * 1000
txn_id = str(SQLBaseStore._TXN_ID)
SQLBaseStore._TXN_ID += 1
txn_id = SQLBaseStore._TXN_ID
name = "%s-%s" % (desc, txn_id, )
# We don't really need these to be unique, so lets stop it from
# growing really large.
self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
name = "%s-%x" % (desc, txn_id, )
transaction_logger.debug("[TXN START] {%s}", name)
try:
return func(LoggingTransaction(txn, name), *args, **kwargs)
except:
logger.exception("[TXN FAIL] {%s}", name)
raise
finally:
end = time.clock() * 1000
transaction_logger.debug(
@ -189,7 +200,6 @@ class SQLBaseStore(object):
statement returns no rows
"""
return self._simple_selectupdate_one(
"_simple_select_one",
table, keyvalues, retcols=retcols, allow_none=allow_none
)
@ -215,11 +225,11 @@ class SQLBaseStore(object):
txn,
table=table,
keyvalues=keyvalues,
retcols=retcol,
retcol=retcol,
)
if ret:
return ret[retcol]
return ret[0]
else:
if allow_none:
return None
@ -434,6 +444,17 @@ class SQLBaseStore(object):
sql = "SELECT * FROM events WHERE event_id = ?"
for ev in events:
signatures = self._get_event_origin_signatures_txn(
txn, ev.event_id,
)
ev.signatures = {
k: encode_base64(v) for k, v in signatures.items()
}
prev_events = self._get_latest_events_in_room(txn, ev.room_id)
ev.prev_events = [(e_id, s,) for e_id, s, _ in prev_events]
if hasattr(ev, "prev_state"):
# Load previous state_content.
# TODO: Should we be pulling this out above?

View File

@ -24,6 +24,13 @@ logger = logging.getLogger(__name__)
class EventFederationStore(SQLBaseStore):
def get_latest_events_in_room(self, room_id):
return self.runInteraction(
"get_latest_events_in_room",
self._get_latest_events_in_room,
room_id,
)
def _get_latest_events_in_room(self, txn, room_id):
self._simple_select_onecol_txn(
txn,
@ -34,12 +41,25 @@ class EventFederationStore(SQLBaseStore):
retcol="event_id",
)
sql = (
"SELECT e.event_id, e.depth FROM events as e "
"INNER JOIN event_forward_extremities as f "
"ON e.event_id = f.event_id "
"WHERE f.room_id = ?"
)
txn.execute(sql, (room_id, ))
results = []
for pdu_id, origin, depth in txn.fetchall():
hashes = self._get_prev_event_hashes_txn(txn, pdu_id, origin)
sha256_bytes = hashes["sha256"]
prev_hashes = {"sha256": encode_base64(sha256_bytes)}
results.append((pdu_id, origin, prev_hashes, depth))
for event_id, depth in txn.fetchall():
hashes = self._get_prev_event_hashes_txn(txn, event_id)
prev_hashes = {
k: encode_base64(v) for k, v in hashes.items()
if k == "sha256"
}
results.append((event_id, prev_hashes, depth))
return results
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
@ -70,21 +90,21 @@ class EventFederationStore(SQLBaseStore):
def _handle_prev_events(self, txn, outlier, event_id, prev_events,
room_id):
for e_id in prev_events:
for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_insert_txn(
txn,
table="event_edges",
values={
"event_id": event_id,
"prev_event": e_id,
"prev_event_id": e_id,
"room_id": room_id,
}
)
# Update the extremities table if this is not an outlier.
if not outlier:
for e_id in prev_events:
for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_delete_txn(
txn,
@ -116,7 +136,7 @@ class EventFederationStore(SQLBaseStore):
# Insert all the prev_pdus as a backwards thing, they'll get
# deleted in a second if they're incorrect anyway.
for e_id in prev_events:
for e_id, _ in prev_events:
# TODO (erikj): This could be done as a bulk insert
self._simple_insert_txn(
txn,
@ -130,14 +150,11 @@ class EventFederationStore(SQLBaseStore):
# Also delete from the backwards extremities table all ones that
# reference pdus that we have already seen
query = (
"DELETE FROM %(event_back)s as b WHERE EXISTS ("
"SELECT 1 FROM %(events)s AS events "
"DELETE FROM event_backward_extremities WHERE EXISTS ("
"SELECT 1 FROM events "
"WHERE "
"b.event_id = events.event_id "
"event_backward_extremities.event_id = events.event_id "
"AND not events.outlier "
")"
) % {
"event_back": "event_backward_extremities",
"events": "events",
}
)
txn.execute(query)

View File

@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities(
CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
--
CREATE TABLE IF NOT EXISTS event_backward_extremities(
event_id TEXT,
@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities(
CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
--
CREATE TABLE IF NOT EXISTS event_edges(
event_id TEXT,
@ -28,7 +28,6 @@ CREATE TABLE IF NOT EXISTS event_edges(
CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
--
CREATE TABLE IF NOT EXISTS room_depth(
@ -38,7 +37,7 @@ CREATE TABLE IF NOT EXISTS room_depth(
);
CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
--
create TABLE IF NOT EXISTS event_destinations(
event_id TEXT,
@ -48,4 +47,3 @@ create TABLE IF NOT EXISTS event_destinations(
);
CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
--