More fixes.

clokep/psycopg3
Patrick Cloke 2023-09-22 14:26:40 -04:00
parent be042ce2c6
commit 2d8cbbd53a
4 changed files with 26 additions and 25 deletions

View File

@ -43,7 +43,11 @@ from . import engines
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -741,10 +745,10 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index. The named index will be dropped upon completion of the new index.
""" """
def create_index_psql(conn: Connection) -> None: def create_index_psql(conn: "LoggingDatabaseConnection") -> None:
conn.rollback() conn.rollback()
# postgres insists on autocommit for the index # postgres insists on autocommit for the index
conn.set_session(autocommit=True) # type: ignore conn.engine.attempt_to_set_autocommit(conn.conn, True)
try: try:
c = conn.cursor() c = conn.cursor()
@ -788,9 +792,9 @@ class BackgroundUpdater:
undo_timeout_sql = f"SET statement_timeout = {default_timeout}" undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql) conn.cursor().execute(undo_timeout_sql)
conn.set_session(autocommit=False) # type: ignore conn.engine.attempt_to_set_autocommit(conn.conn, False)
def create_index_sqlite(conn: Connection) -> None: def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
# Sqlite doesn't support concurrent creation of indexes. # Sqlite doesn't support concurrent creation of indexes.
# #
# We assume that sqlite doesn't give us invalid indices; however # We assume that sqlite doesn't give us invalid indices; however

View File

@ -272,7 +272,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we have to set autocommit, because postgres refuses to # we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it. # CREATE INDEX CONCURRENTLY without it.
conn.set_session(autocommit=True) conn.engine.attempt_to_set_autocommit(conn.conn, True)
try: try:
c = conn.cursor() c = conn.cursor()
@ -298,7 +298,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# we should now be able to delete the GIST index. # we should now be able to delete the GIST index.
c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally: finally:
conn.set_session(autocommit=False) conn.engine.attempt_to_set_autocommit(conn.conn, False)
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
await self.db_pool.runWithConnection(create_index) await self.db_pool.runWithConnection(create_index)
@ -320,7 +320,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
def create_index(conn: LoggingDatabaseConnection) -> None: def create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback() conn.rollback()
conn.set_session(autocommit=True) conn.attempt_to_set_autocommit(True)
c = conn.cursor() c = conn.cursor()
# We create with NULLS FIRST so that when we search *backwards* # We create with NULLS FIRST so that when we search *backwards*
@ -337,7 +337,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
""" """
) )
conn.set_session(autocommit=False) conn.attempt_to_set_autocommit(False)
await self.db_pool.runWithConnection(create_index) await self.db_pool.runWithConnection(create_index)

View File

@ -479,7 +479,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
conn.rollback() conn.rollback()
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
# postgres insists on autocommit for the index # postgres insists on autocommit for the index
conn.set_session(autocommit=True) conn.engine.attempt_to_set_autocommit(conn.conn, True)
try: try:
txn = conn.cursor() txn = conn.cursor()
txn.execute( txn.execute(
@ -488,7 +488,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
) )
txn.execute("DROP INDEX IF EXISTS state_groups_state_id") txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
finally: finally:
conn.set_session(autocommit=False) conn.engine.attempt_to_set_autocommit(conn.conn, False)
else: else:
txn = conn.cursor() txn = conn.cursor()
txn.execute( txn.execute(

View File

@ -959,10 +959,14 @@ def setup_test_homeserver(
if USE_POSTGRES_FOR_TESTS: if USE_POSTGRES_FOR_TESTS:
test_db = "synapse_test_%s" % uuid.uuid4().hex test_db = "synapse_test_%s" % uuid.uuid4().hex
if USE_POSTGRES_FOR_TESTS == "psycopg":
name = "psycopg"
else:
name = "psycopg2"
database_config = { database_config = {
"name": "psycopg2", "name": name,
"args": { "args": {
"database": test_db, "dbname": test_db,
"host": POSTGRES_HOST, "host": POSTGRES_HOST,
"password": POSTGRES_PASSWORD, "password": POSTGRES_PASSWORD,
"user": POSTGRES_USER, "user": POSTGRES_USER,
@ -1018,17 +1022,14 @@ def setup_test_homeserver(
# Create the database before we actually try and connect to it, based off # Create the database before we actually try and connect to it, based off
# the template database we generate in setupdb() # the template database we generate in setupdb()
if isinstance(db_engine, PostgresEngine): if isinstance(db_engine, PostgresEngine):
import psycopg2.extensions
db_conn = db_engine.module.connect( db_conn = db_engine.module.connect(
database=POSTGRES_BASE_DB, dbname=POSTGRES_BASE_DB,
user=POSTGRES_USER, user=POSTGRES_USER,
host=POSTGRES_HOST, host=POSTGRES_HOST,
port=POSTGRES_PORT, port=POSTGRES_PORT,
password=POSTGRES_PASSWORD, password=POSTGRES_PASSWORD,
) )
assert isinstance(db_conn, psycopg2.extensions.connection) db_engine.attempt_to_set_autocommit(db_conn, True)
db_conn.autocommit = True
cur = db_conn.cursor() cur = db_conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
cur.execute( cur.execute(
@ -1058,9 +1059,6 @@ def setup_test_homeserver(
# We need to do cleanup on PostgreSQL # We need to do cleanup on PostgreSQL
def cleanup() -> None: def cleanup() -> None:
import psycopg2
import psycopg2.extensions
# Close all the db pools # Close all the db pools
database_pool._db_pool.close() database_pool._db_pool.close()
@ -1068,14 +1066,13 @@ def setup_test_homeserver(
# Drop the test database # Drop the test database
db_conn = db_engine.module.connect( db_conn = db_engine.module.connect(
database=POSTGRES_BASE_DB, dbname=POSTGRES_BASE_DB,
user=POSTGRES_USER, user=POSTGRES_USER,
host=POSTGRES_HOST, host=POSTGRES_HOST,
port=POSTGRES_PORT, port=POSTGRES_PORT,
password=POSTGRES_PASSWORD, password=POSTGRES_PASSWORD,
) )
assert isinstance(db_conn, psycopg2.extensions.connection) db_engine.attempt_to_set_autocommit(db_conn, True)
db_conn.autocommit = True
cur = db_conn.cursor() cur = db_conn.cursor()
# Try a few times to drop the DB. Some things may hold on to the # Try a few times to drop the DB. Some things may hold on to the
@ -1087,7 +1084,7 @@ def setup_test_homeserver(
cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
db_conn.commit() db_conn.commit()
dropped = True dropped = True
except psycopg2.OperationalError as e: except db_engine.OperationalError as e:
warnings.warn( warnings.warn(
"Couldn't drop old db: " + str(e), "Couldn't drop old db: " + str(e),
category=UserWarning, category=UserWarning,