Finish appservice txn storage impl and tests.
							parent
							
								
									1ead1caa18
								
							
						
					
					
						commit
						0a60bbf4fa
					
				| 
						 | 
					@ -429,15 +429,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
 | 
				
			||||||
        # The highest id may be the last one sent (in which case it is last_txn)
 | 
					        # The highest id may be the last one sent (in which case it is last_txn)
 | 
				
			||||||
        # or it may be the highest in the txns list (which are waiting to be/are
 | 
					        # or it may be the highest in the txns list (which are waiting to be/are
 | 
				
			||||||
        # being sent)
 | 
					        # being sent)
 | 
				
			||||||
        result = txn.execute(
 | 
					        last_txn_id = self._get_last_txn(txn, service.id)
 | 
				
			||||||
            "SELECT last_txn FROM application_services_state WHERE as_id=?",
 | 
					 | 
				
			||||||
            (service.id,)
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        last_txn_id = result.fetchone()
 | 
					 | 
				
			||||||
        if last_txn_id is None:  # no row exists
 | 
					 | 
				
			||||||
            last_txn_id = 0
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            last_txn_id = int(last_txn_id[0])  # select 'last_txn' col
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        result = txn.execute(
 | 
					        result = txn.execute(
 | 
				
			||||||
            "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
 | 
					            "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
 | 
				
			||||||
| 
						 | 
					@ -467,12 +459,43 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
 | 
				
			||||||
            service(ApplicationService): The application service which was sent
 | 
					            service(ApplicationService): The application service which was sent
 | 
				
			||||||
            this transaction.
 | 
					            this transaction.
 | 
				
			||||||
        Returns:
 | 
					        Returns:
 | 
				
			||||||
            A Deferred which resolves to True if this transaction was completed
 | 
					            A Deferred which resolves if this transaction was stored
 | 
				
			||||||
            successfully.
 | 
					            successfully.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        # TODO: Set current txn_id for AS to 'txn_id'
 | 
					        return self.runInteraction(
 | 
				
			||||||
        # TODO: Delete txn contents
 | 
					            "complete_appservice_txn",
 | 
				
			||||||
        pass
 | 
					            self._complete_appservice_txn,
 | 
				
			||||||
 | 
					            txn_id, service
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _complete_appservice_txn(self, txn, txn_id, service):
 | 
				
			||||||
 | 
					        txn_id = int(txn_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Debugging query: Make sure the txn being completed is EXACTLY +1 from
 | 
				
			||||||
 | 
					        # what was there before. If it isn't, we've got problems (e.g. the AS
 | 
				
			||||||
 | 
					        # has probably missed some events), so whine loudly but still continue,
 | 
				
			||||||
 | 
					        # since it shouldn't fail completion of the transaction.
 | 
				
			||||||
 | 
					        last_txn_id = self._get_last_txn(txn, service.id)
 | 
				
			||||||
 | 
					        if (last_txn_id + 1) != txn_id:
 | 
				
			||||||
 | 
					            logger.error(
 | 
				
			||||||
 | 
					                "appservice: Completing a transaction which has an ID > 1 from "
 | 
				
			||||||
 | 
					                "the last ID sent to this AS. We've either dropped events or "
 | 
				
			||||||
 | 
					                "sent it to the AS out of order. FIX ME. last_txn=%s "
 | 
				
			||||||
 | 
					                "completing_txn=%s service_id=%s", last_txn_id, txn_id,
 | 
				
			||||||
 | 
					                service.id
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Set current txn_id for AS to 'txn_id'
 | 
				
			||||||
 | 
					        self._simple_upsert_txn(
 | 
				
			||||||
 | 
					            txn, "application_services_state", dict(as_id=service.id),
 | 
				
			||||||
 | 
					            dict(last_txn=txn_id)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Delete txn contents
 | 
				
			||||||
 | 
					        self._simple_delete_txn(
 | 
				
			||||||
 | 
					            txn, "application_services_txns",
 | 
				
			||||||
 | 
					            dict(txn_id=txn_id, as_id=service.id)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_oldest_unsent_txn(self, service):
 | 
					    def get_oldest_unsent_txn(self, service):
 | 
				
			||||||
        """Get the oldest transaction which has not been sent for this
 | 
					        """Get the oldest transaction which has not been sent for this
 | 
				
			||||||
| 
						 | 
					@ -484,6 +507,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
 | 
				
			||||||
            A Deferred which resolves to an AppServiceTransaction or
 | 
					            A Deferred which resolves to an AppServiceTransaction or
 | 
				
			||||||
            None.
 | 
					            None.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        # TODO: Monotonically increasing txn ids, so just select the smallest
 | 
					        return self.runInteraction(
 | 
				
			||||||
 | 
					            "get_oldest_unsent_appservice_txn",
 | 
				
			||||||
 | 
					            self._get_oldest_unsent_txn,
 | 
				
			||||||
 | 
					            service
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get_oldest_unsent_txn(self, txn, service):
 | 
				
			||||||
 | 
					        # Monotonically increasing txn ids, so just select the smallest
 | 
				
			||||||
        # one in the txns table (we delete them when they are sent)
 | 
					        # one in the txns table (we delete them when they are sent)
 | 
				
			||||||
        pass
 | 
					        result = txn.execute(
 | 
				
			||||||
 | 
					            "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?",
 | 
				
			||||||
 | 
					            (service.id,)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        entry = self.cursor_to_dict(result)[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not entry or entry["txn_id"] is None:
 | 
				
			||||||
 | 
					            # the min(txn_id) part will force a row, so entry may not be None
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return AppServiceTransaction(
 | 
				
			||||||
 | 
					            service=service, id=entry["txn_id"], events=json.loads(
 | 
				
			||||||
 | 
					                entry["content"]
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get_last_txn(self, txn, service_id):
 | 
				
			||||||
 | 
					        result = txn.execute(
 | 
				
			||||||
 | 
					            "SELECT last_txn FROM application_services_state WHERE as_id=?",
 | 
				
			||||||
 | 
					            (service_id,)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        last_txn_id = result.fetchone()
 | 
				
			||||||
 | 
					        if last_txn_id is None:  # no row exists
 | 
				
			||||||
 | 
					            return 0
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            return int(last_txn_id[0])  # select 'last_txn' col
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -15,7 +15,7 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
CREATE TABLE IF NOT EXISTS application_services_state(
 | 
					CREATE TABLE IF NOT EXISTS application_services_state(
 | 
				
			||||||
    as_id INTEGER PRIMARY KEY,
 | 
					    as_id INTEGER PRIMARY KEY,
 | 
				
			||||||
    state TEXT NOT NULL,
 | 
					    state TEXT,
 | 
				
			||||||
    last_txn TEXT,
 | 
					    last_txn TEXT,
 | 
				
			||||||
    FOREIGN KEY(as_id) REFERENCES application_services(id)
 | 
					    FOREIGN KEY(as_id) REFERENCES application_services(id)
 | 
				
			||||||
);
 | 
					);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -304,6 +304,74 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
 | 
				
			||||||
        self.assertEquals(txn.events, events)
 | 
					        self.assertEquals(txn.events, events)
 | 
				
			||||||
        self.assertEquals(txn.service, service)
 | 
					        self.assertEquals(txn.service, service)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @defer.inlineCallbacks
 | 
				
			||||||
 | 
					    def test_complete_appservice_txn_first_txn(self):
 | 
				
			||||||
 | 
					        service = Mock(id=self.as_list[0]["id"])
 | 
				
			||||||
 | 
					        events = [{"foo": "bar"}]
 | 
				
			||||||
 | 
					        txn_id = 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        yield self._insert_txn(service.id, txn_id, events)
 | 
				
			||||||
 | 
					        yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        res = yield self.db_pool.runQuery(
 | 
				
			||||||
 | 
					            "SELECT last_txn FROM application_services_state WHERE as_id=?",
 | 
				
			||||||
 | 
					            (service.id,)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEquals(1, len(res))
 | 
				
			||||||
 | 
					        self.assertEquals(str(txn_id), res[0][0])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        res = yield self.db_pool.runQuery(
 | 
				
			||||||
 | 
					            "SELECT * FROM application_services_txns WHERE txn_id=?",
 | 
				
			||||||
 | 
					            (txn_id,)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEquals(0, len(res))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @defer.inlineCallbacks
 | 
				
			||||||
 | 
					    def test_complete_appservice_txn_existing_in_state_table(self):
 | 
				
			||||||
 | 
					        service = Mock(id=self.as_list[0]["id"])
 | 
				
			||||||
 | 
					        events = [{"foo": "bar"}]
 | 
				
			||||||
 | 
					        txn_id = 5
 | 
				
			||||||
 | 
					        yield self._set_last_txn(service.id, 4)
 | 
				
			||||||
 | 
					        yield self._insert_txn(service.id, txn_id, events)
 | 
				
			||||||
 | 
					        yield self.store.complete_appservice_txn(txn_id=txn_id, service=service)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        res = yield self.db_pool.runQuery(
 | 
				
			||||||
 | 
					            "SELECT last_txn, state FROM application_services_state WHERE "
 | 
				
			||||||
 | 
					            "as_id=?",
 | 
				
			||||||
 | 
					            (service.id,)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEquals(1, len(res))
 | 
				
			||||||
 | 
					        self.assertEquals(str(txn_id), res[0][0])
 | 
				
			||||||
 | 
					        self.assertEquals(ApplicationServiceState.UP, res[0][1])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        res = yield self.db_pool.runQuery(
 | 
				
			||||||
 | 
					            "SELECT * FROM application_services_txns WHERE txn_id=?",
 | 
				
			||||||
 | 
					            (txn_id,)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self.assertEquals(0, len(res))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @defer.inlineCallbacks
 | 
				
			||||||
 | 
					    def test_get_oldest_unsent_txn_none(self):
 | 
				
			||||||
 | 
					        service = Mock(id=self.as_list[0]["id"])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        txn = yield self.store.get_oldest_unsent_txn(service)
 | 
				
			||||||
 | 
					        self.assertEquals(None, txn)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @defer.inlineCallbacks
 | 
				
			||||||
 | 
					    def test_get_oldest_unsent_txn(self):
 | 
				
			||||||
 | 
					        service = Mock(id=self.as_list[0]["id"])
 | 
				
			||||||
 | 
					        events = [{"type": "nothing"}, {"type": "here"}]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"})
 | 
				
			||||||
 | 
					        yield self._insert_txn(service.id, 10, events)
 | 
				
			||||||
 | 
					        yield self._insert_txn(service.id, 11, [{"foo":"bar"}])
 | 
				
			||||||
 | 
					        yield self._insert_txn(service.id, 12, [{"argh":"bargh"}])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        txn = yield self.store.get_oldest_unsent_txn(service)
 | 
				
			||||||
 | 
					        self.assertEquals(service, txn.service)
 | 
				
			||||||
 | 
					        self.assertEquals(10, txn.id)
 | 
				
			||||||
 | 
					        self.assertEquals(events, txn.events)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @defer.inlineCallbacks
 | 
					    @defer.inlineCallbacks
 | 
				
			||||||
    def test_get_appservices_by_state_single(self):
 | 
					    def test_get_appservices_by_state_single(self):
 | 
				
			||||||
        yield self._set_state(
 | 
					        yield self._set_state(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue