Initial abstraction.

clokep/psycopg3
Patrick Cloke 2022-07-29 13:57:43 -04:00
parent 61fc1cb1e7
commit f5ef7e13d7
11 changed files with 80 additions and 24 deletions

View File

@ -50,7 +50,7 @@ class DatabaseConnectionConfig:
def __init__(self, name: str, db_config: dict): def __init__(self, name: str, db_config: dict):
db_engine = db_config.get("name", "sqlite3") db_engine = db_config.get("name", "sqlite3")
if db_engine not in ("sqlite3", "psycopg2"): if db_engine not in ("sqlite3", "psycopg2", "psycopg"):
raise ConfigError("Unsupported database type %r" % (db_engine,)) raise ConfigError("Unsupported database type %r" % (db_engine,))
if db_engine == "sqlite3": if db_engine == "sqlite3":

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Any, Mapping, NoReturn from typing import Any, Mapping, NoReturn, cast
from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
@ -20,26 +20,30 @@ from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
# and sqlite. But the database driver modules are both optional: they may not be # and sqlite. But the database driver modules are both optional: they may not be
# installed. To account for this, create dummy classes on import failure so we can # installed. To account for this, create dummy classes on import failure so we can
# still run `isinstance()` checks. # still run `isinstance()` checks.
def dummy_engine(name: str, module: str) -> BaseDatabaseEngine:
class Engine(BaseDatabaseEngine): # type: ignore[no-redef]
def __new__(cls, *args: object, **kwargs: object) -> NoReturn: # type: ignore[misc]
raise RuntimeError(
f"Cannot create {name}Engine -- {module} module is not installed"
)
return cast(BaseDatabaseEngine, Engine)
try: try:
from .postgres import PostgresEngine from .postgres import PostgresEngine
except ImportError: except ImportError:
PostgresEngine = dummy_engine("PostgresEngine", "psycopg2")
class PostgresEngine(BaseDatabaseEngine): # type: ignore[no-redef] try:
def __new__(cls, *args: object, **kwargs: object) -> NoReturn: # type: ignore[misc] from .psycopg import PsycopgEngine
raise RuntimeError( except ImportError:
f"Cannot create {cls.__name__} -- psycopg2 module is not installed" PsycopgEngine = dummy_engine("PsycopgEngine", "psycopg")
)
try: try:
from .sqlite import Sqlite3Engine from .sqlite import Sqlite3Engine
except ImportError: except ImportError:
Sqlite3Engine = dummy_engine("Sqlite3Engine", "sqlite3")
class Sqlite3Engine(BaseDatabaseEngine): # type: ignore[no-redef]
def __new__(cls, *args: object, **kwargs: object) -> NoReturn: # type: ignore[misc]
raise RuntimeError(
f"Cannot create {cls.__name__} -- sqlite3 module is not installed"
)
def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine: def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine:
@ -51,6 +55,9 @@ def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine:
if name == "psycopg2": if name == "psycopg2":
return PostgresEngine(database_config) return PostgresEngine(database_config)
if name == "psycopg":
return PsycopgEngine(database_config)
raise RuntimeError("Unsupported database engine '%s'" % (name,)) raise RuntimeError("Unsupported database engine '%s'" % (name,))

View File

@ -58,6 +58,18 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""Do we support the `RETURNING` clause in insert/update/delete?""" """Do we support the `RETURNING` clause in insert/update/delete?"""
... ...
@property
@abc.abstractmethod
def supports_select_distinct_on(self) -> bool:
"""Do we support the `DISTINCT ON` clause in SELECT?"""
...
@property
@abc.abstractmethod
def supports_sequences(self) -> bool:
"""Do we support the `CREATE SEQUENCE` clause?"""
...
@abc.abstractmethod @abc.abstractmethod
def check_database( def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False self, db_conn: ConnectionType, allow_outdated_version: bool = False

View File

@ -170,6 +170,16 @@ class PostgresEngine(
"""Do we support the `RETURNING` clause in insert/update/delete?""" """Do we support the `RETURNING` clause in insert/update/delete?"""
return True return True
@property
def supports_select_distinct_on(self) -> bool:
"""Do we support the `DISTINCT ON` clause in SELECT?"""
return True
@property
def supports_sequences(self) -> bool:
"""Do we support the `CREATE SEQUENCE` clause?"""
return True
def is_deadlock(self, error: Exception) -> bool: def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg2.DatabaseError): if isinstance(error, psycopg2.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html # https://www.postgresql.org/docs/current/static/errcodes-appendix.html

View File

@ -31,7 +31,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]): class PsycopgEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
def __init__(self, database_config: Mapping[str, Any]): def __init__(self, database_config: Mapping[str, Any]):
super().__init__(psycopg2, database_config) super().__init__(psycopg2, database_config)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
@ -175,6 +175,16 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
"""Do we support the `RETURNING` clause in insert/update/delete?""" """Do we support the `RETURNING` clause in insert/update/delete?"""
return True return True
@property
def supports_select_distinct_on(self) -> bool:
"""Do we support the `DISTINCT ON` clause in SELECT?"""
return True
@property
def supports_sequences(self) -> bool:
"""Do we support the `CREATE SEQUENCE` clause?"""
return True
def is_deadlock(self, error: Exception) -> bool: def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg2.DatabaseError): if isinstance(error, psycopg2.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html # https://www.postgresql.org/docs/current/static/errcodes-appendix.html

View File

@ -58,6 +58,16 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
"""Do we support the `RETURNING` clause in insert/update/delete?""" """Do we support the `RETURNING` clause in insert/update/delete?"""
return sqlite3.sqlite_version_info >= (3, 35, 0) return sqlite3.sqlite_version_info >= (3, 35, 0)
@property
def supports_select_distinct_on(self) -> bool:
"""Do we support the `DISTINCT ON` clause in SELECT?"""
return False
@property
def supports_sequences(self) -> bool:
"""Do we support the `CREATE SEQUENCE` clause?"""
return False
def check_database( def check_database(
self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
) -> None: ) -> None:

View File

@ -23,7 +23,7 @@ from typing_extensions import Counter as CounterType
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, PsycopgEngine
from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
from synapse.storage.types import Cursor from synapse.storage.types import Cursor
@ -255,7 +255,7 @@ def _setup_new_database(
for file_name in os.listdir(directory) for file_name in os.listdir(directory)
) )
if isinstance(database_engine, PostgresEngine): if isinstance(database_engine, (PostgresEngine, PsycopgEngine)):
specific = "postgres" specific = "postgres"
else: else:
specific = "sqlite" specific = "sqlite"
@ -399,7 +399,7 @@ def _upgrade_existing_database(
logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas) logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas)
if isinstance(database_engine, PostgresEngine): if isinstance(database_engine, (PostgresEngine, PsycopgEngine)):
specific_engine_extension = ".postgres" specific_engine_extension = ".postgres"
else: else:
specific_engine_extension = ".sqlite" specific_engine_extension = ".sqlite"

View File

@ -1,7 +1,6 @@
import logging import logging
from io import StringIO from io import StringIO
from synapse.storage.engines import PostgresEngine
from synapse.storage.prepare_database import execute_statements_from_stream from synapse.storage.prepare_database import execute_statements_from_stream
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -21,7 +20,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
def run_create(cur, database_engine, *args, **kwargs): def run_create(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine): if database_engine.supports_select_distinct_on:
select_clause = """ select_clause = """
SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
FROM user_filters FROM user_filters

View File

@ -18,7 +18,12 @@ This migration adds triggers to the partial_state_events tables to enforce uniqu
Triggers cannot be expressed in .sql files, so we have to use a separate file. Triggers cannot be expressed in .sql files, so we have to use a separate file.
""" """
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.engines import (
BaseDatabaseEngine,
PostgresEngine,
PsycopgEngine,
Sqlite3Engine,
)
from synapse.storage.types import Cursor from synapse.storage.types import Cursor
@ -43,7 +48,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
END; END;
""" """
) )
elif isinstance(database_engine, PostgresEngine): elif isinstance(database_engine, (PostgresEngine, PsycopgEngine)):
cur.execute( cur.execute(
""" """
CREATE OR REPLACE FUNCTION check_partial_state_events() RETURNS trigger AS $BODY$ CREATE OR REPLACE FUNCTION check_partial_state_events() RETURNS trigger AS $BODY$

View File

@ -21,7 +21,7 @@ from synapse.storage.engines import PostgresEngine
def run_create(cur, database_engine, *args, **kwargs): def run_create(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine): if database_engine.supports_sequences:
# If we already have some AS TXNs we want to start from the current # If we already have some AS TXNs we want to start from the current
# maximum value. There are two potential places this is stored - the # maximum value. There are two potential places this is stored - the
# actual TXNs themselves *and* the AS state table. At time of migration # actual TXNs themselves *and* the AS state table. At time of migration

View File

@ -59,6 +59,9 @@ def setupdb() -> None:
# If we're using PostgreSQL, set up the db once # If we're using PostgreSQL, set up the db once
if USE_POSTGRES_FOR_TESTS: if USE_POSTGRES_FOR_TESTS:
# create a PostgresEngine # create a PostgresEngine
if USE_POSTGRES_FOR_TESTS == "psycopg":
db_engine = create_engine({"name": "psycopg", "args": {}})
else:
db_engine = create_engine({"name": "psycopg2", "args": {}}) db_engine = create_engine({"name": "psycopg2", "args": {}})
# connect to postgres to create the base database. # connect to postgres to create the base database.
db_conn = db_engine.module.connect( db_conn = db_engine.module.connect(