Source code for sqlalchemy_continuum.schema

import sqlalchemy as sa


def get_end_tx_column_query(
    table,
    end_tx_column_name='end_transaction_id',
    tx_column_name='transaction_id'
):

    v1 = sa.alias(table, name='v')
    v2 = sa.alias(table, name='v2')
    v3 = sa.alias(table, name='v3')

    primary_keys = [c.name for c in table.c if c.primary_key]

    tx_criterion = sa.select(
        [sa.func.min(getattr(v3.c, tx_column_name))]
    ).where(
        sa.and_(
            getattr(v3.c, tx_column_name) > getattr(v1.c, tx_column_name),
            *[
                getattr(v3.c, pk) == getattr(v1.c, pk)
                for pk in primary_keys
                if pk != tx_column_name
            ]
        )
    )
    return sa.select(
        columns=[
            getattr(v1.c, column)
            for column in primary_keys
        ] + [
            getattr(v2.c, tx_column_name).label(end_tx_column_name)
        ],
        from_obj=v1.outerjoin(
            v2,
            sa.and_(
                getattr(v2.c, tx_column_name) ==
                tx_criterion
            )
        )
    ).order_by(getattr(v1.c, tx_column_name))


[docs]def update_end_tx_column( table, end_tx_column_name='end_transaction_id', tx_column_name='transaction_id', conn=None ): """ Calculates end transaction columns and updates the version table with the calculated values. This function can be used for migrating between subquery versioning strategy and validity versioning strategy. :param table: SQLAlchemy table object :param end_tx_column_name: Name of the end transaction column :param tx_column_name: Transaction column name :param conn: Either SQLAlchemy Connection, Engine, Session or Alembic Operations object. Basically this should be an object that can execute the queries needed to update the end transaction column values. If no object is given then this function tries to use alembic.op for executing the queries. """ if conn is None: from alembic import op conn = op.get_bind() query = get_end_tx_column_query( table, end_tx_column_name=end_tx_column_name, tx_column_name=tx_column_name ) stmt = conn.execute(query) primary_keys = [c.name for c in table.c if c.primary_key] for row in stmt: if row[end_tx_column_name]: criteria = [ getattr(table.c, pk) == row[pk] for pk in primary_keys ] update_stmt = ( table.update() .where(sa.and_(*criteria)) .values({end_tx_column_name: row[end_tx_column_name]}) ) conn.execute(update_stmt)
def get_property_mod_flags_query( table, tracked_columns, mod_suffix='_mod', end_tx_column_name='end_transaction_id', tx_column_name='transaction_id', ): v1 = sa.alias(table, name='v') v2 = sa.alias(table, name='v2') primary_keys = [c.name for c in table.c if c.primary_key] return sa.select( columns=[ getattr(v1.c, column) for column in primary_keys ] + [ (sa.or_( getattr(v1.c, column) != getattr(v2.c, column), getattr(v2.c, tx_column_name).is_(None) )).label(column + mod_suffix) for column in tracked_columns ], from_obj=v1.outerjoin( v2, sa.and_( getattr(v2.c, end_tx_column_name) == getattr(v1.c, tx_column_name), *[ getattr(v2.c, pk) == getattr(v1.c, pk) for pk in primary_keys if pk != tx_column_name ] ) ) ).order_by(getattr(v1.c, tx_column_name))
[docs]def update_property_mod_flags( table, tracked_columns, mod_suffix='_mod', end_tx_column_name='end_transaction_id', tx_column_name='transaction_id', conn=None ): """ Update property modification flags for given table and given columns. This function can be used for migrating an existing schema to use property mod flags (provided by PropertyModTracker plugin). :param table: SQLAlchemy table object :param mod_suffix: Modification tracking columns suffix :param end_tx_column_name: Name of the end transaction column :param tx_column_name: Transaction column name :param conn: Either SQLAlchemy Connection, Engine, Session or Alembic Operations object. Basically this should be an object that can execute the queries needed to update the property modification flags. If no object is given then this function tries to use alembic.op for executing the queries. """ if conn is None: from alembic import op conn = op.get_bind() query = get_property_mod_flags_query( table, tracked_columns, mod_suffix=mod_suffix, end_tx_column_name=end_tx_column_name, tx_column_name=tx_column_name, ) stmt = conn.execute(query) primary_keys = [c.name for c in table.c if c.primary_key] for row in stmt: values = dict([ (column + mod_suffix, row[column + mod_suffix]) for column in tracked_columns if row[column + mod_suffix] ]) if values: criteria = [ getattr(table.c, pk) == row[pk] for pk in primary_keys ] query = table.update().where(sa.and_(*criteria)).values(values) conn.execute(query)