Split setuping up and processing of tables

pull/123/head
Erik Johnston 2015-04-27 17:53:40 +01:00
parent 40cbd6b6ee
commit 5b8b1a43bd
1 changed files with 23 additions and 8 deletions

View File

@ -168,7 +168,7 @@ class Porter(object):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_table(self, table): def setup_table(self, table):
def delete_all(txn): def delete_all(txn):
txn.execute( txn.execute(
"DELETE FROM port_from_sqlite3 WHERE table_name = %s", "DELETE FROM port_from_sqlite3 WHERE table_name = %s",
@ -287,6 +287,10 @@ class Porter(object):
postgres_size = yield self.postgres_store.execute(get_table_size) postgres_size = yield self.postgres_store.execute(get_table_size)
defer.returnValue((table, postgres_size, table_size, next_chunk))
@defer.inlineCallbacks
def handle_table(self, table, postgres_size, table_size, next_chunk):
if not table_size: if not table_size:
return return
@ -364,14 +368,14 @@ class Porter(object):
self.postgres_store = Store(postgres_db_pool, postgres_engine) self.postgres_store = Store(postgres_db_pool, postgres_engine)
# Step 1. Set up databases. # Step 1. Set up databases.
self.progress.on_prepare_sqlite() self.progress.set_state("Preparing SQLite3")
self.setup_db(sqlite_config, sqlite_engine) self.setup_db(sqlite_config, sqlite_engine)
self.progress.on_prepare_postgres() self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine) self.setup_db(postgres_config, postgres_engine)
# Step 2. Get tables. # Step 2. Get tables.
self.progress.fetching_tables() self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store._simple_select_onecol( sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master", table="sqlite_master",
keyvalues={ keyvalues={
@ -390,7 +394,7 @@ class Porter(object):
tables = set(sqlite_tables) & set(postgres_tables) tables = set(sqlite_tables) & set(postgres_tables)
self.progress.preparing_tables() self.progress.set_state("Creating tables")
logger.info("Found %d tables", len(tables)) logger.info("Found %d tables", len(tables))
@ -409,10 +413,12 @@ class Porter(object):
except Exception as e: except Exception as e:
logger.info("Failed to create port table: %s", e) logger.info("Failed to create port table: %s", e)
# Process tables. self.progress.set_state("Preparing tables")
yield defer.gatherResults(
# Set up tables.
setup_res = yield defer.gatherResults(
[ [
self.handle_table(table) self.setup_table(table)
for table in tables for table in tables
if table not in ["schema_version", "applied_schema_deltas"] if table not in ["schema_version", "applied_schema_deltas"]
and not table.startswith("sqlite_") and not table.startswith("sqlite_")
@ -420,6 +426,15 @@ class Porter(object):
consumeErrors=True, consumeErrors=True,
) )
# Process tables.
yield defer.gatherResults(
[
self.handle_table(*res)
for res in setup_res
],
consumeErrors=True,
)
self.progress.done() self.progress.done()
except: except:
global end_error_exec_info global end_error_exec_info