Source code for sqlalchemy_continuum.relationship_builder

import sqlalchemy as sa

from .exc import ClassNotVersioned
from .expression_reflector import VersionExpressionReflector
from .operation import Operation
from .table_builder import TableBuilder
from .utils import adapt_columns, version_class, option


[docs]class RelationshipBuilder(object): def __init__(self, versioning_manager, model, property_): self.manager = versioning_manager self.property = property_ self.model = model def one_to_many_subquery(self, obj): tx_column = option(obj, 'transaction_column_name') remote_alias = sa.orm.aliased(self.remote_cls) primary_keys = [ getattr(remote_alias, column.name) for column in sa.inspect(remote_alias).mapper.columns if column.primary_key and column.name != tx_column ] return sa.exists( sa.select( [1] ).where( sa.and_( getattr(remote_alias, tx_column) <= getattr(obj, tx_column), *[ getattr(remote_alias, pk.name) == getattr(self.remote_cls, pk.name) for pk in primary_keys ] ) ).group_by( *primary_keys ).having( sa.func.max(getattr(remote_alias, tx_column)) == getattr(self.remote_cls, tx_column) ).correlate(self.local_cls, self.remote_cls) ) def many_to_one_subquery(self, obj): tx_column = option(obj, 'transaction_column_name') reflector = VersionExpressionReflector(obj, self.property) return getattr(self.remote_cls, tx_column) == ( sa.select( [sa.func.max(getattr(self.remote_cls, tx_column))] ).where( sa.and_( getattr(self.remote_cls, tx_column) <= getattr(obj, tx_column), reflector(self.property.primaryjoin) ) ) ) def query(self, obj): session = sa.orm.object_session(obj) return ( session.query(self.remote_cls) .filter( self.criteria(obj) ) )
[docs] def process_query(self, query): """ Process given SQLAlchemy Query object depending on the associated RelationshipProperty object. :param query: SQLAlchemy Query object """ if self.property.lazy == 'dynamic': return query if self.property.uselist is False: return query.first() return query.all()
def criteria(self, obj): direction = self.property.direction if self.versioned: if direction.name == 'ONETOMANY': return self.one_to_many_criteria(obj) elif direction.name == 'MANYTOMANY': return self.many_to_many_criteria(obj) elif direction.name == 'MANYTOONE': return self.many_to_one_criteria(obj) else: reflector = VersionExpressionReflector(obj, self.property) return reflector(self.property.primaryjoin)
[docs] def many_to_many_criteria(self, obj): """ Returns the many-to-many query. Looks up remote items through associations and for each item returns returns the last version with a transaction less than or equal to the transaction of `obj`. This must hold true for both the association and the remote relation items. Example ------- Select all tags of article with id 3 and transaction 5 .. code-block:: sql SELECT tags_version.* FROM tags_version WHERE EXISTS ( SELECT 1 FROM article_tag_version WHERE article_id = 3 AND tag_id = tags_version.id AND operation_type != 2 AND EXISTS ( SELECT 1 FROM article_tag_version as article_tag_version2 WHERE article_tag_version2.tag_id = article_tag_version.tag_id AND article_tag_version2.tx_id <= 5 GROUP BY article_tag_version2.tag_id HAVING MAX(article_tag_version2.tx_id) = article_tag_version.tx_id ) ) AND EXISTS ( SELECT 1 FROM tags_version as tags_version_2 WHERE tags_version_2.id = tags_version.id AND tags_version_2.tx_id <= 5 GROUP BY tags_version_2.id HAVING MAX(tags_version_2.tx_id) = tags_version.tx_id ) AND operation_type != 2 """ return sa.and_( self.association_subquery(obj), self.one_to_many_subquery(obj), self.remote_cls.operation_type != Operation.DELETE )
[docs] def many_to_one_criteria(self, obj): """Returns the many-to-one query. Returns the item on the 'one' side with the highest transaction id as long as it is less or equal to the transaction id of the `obj`. Example ------- Look up the Article of a Tag with article_id = 4 and transaction_id = 5 .. code-block:: sql SELECT * FROM articles_version WHERE id = 4 AND transaction_id = ( SELECT max(transaction_id) FROM articles_version WHERE transaction_id <= 5 AND id = 4 ) AND operation_type != 2 """ reflector = VersionExpressionReflector(obj, self.property) return sa.and_( reflector(self.property.primaryjoin), self.many_to_one_subquery(obj), self.remote_cls.operation_type != Operation.DELETE )
[docs] def one_to_many_criteria(self, obj): """ Returns the one-to-many query. For each item on the 'many' side, returns its latest version as long as the transaction of that version is less than equal of the transaction of `obj`. Example ------- Using the Article-Tags relationship, where we look for tags of article_version with id = 3 and transaction = 5 the sql produced is .. code-block:: sql SELECT tags_version.* FROM tags_version WHERE tags_version.article_id = 3 AND tags_version.operation_type != 2 AND EXISTS ( SELECT 1 FROM tags_version as tags_version_last WHERE tags_version_last.transaction_id <= 5 AND tags_version_last.id = tags_version.id GROUP BY tags_version_last.id HAVING MAX(tags_version_last.transaction_id) = tags_version.transaction_id ) """ reflector = VersionExpressionReflector(obj, self.property) return sa.and_( reflector(self.property.primaryjoin), self.one_to_many_subquery(obj), self.remote_cls.operation_type != Operation.DELETE )
@property def reflected_relationship(self): """ Builds a reflected one-to-many, one-to-one and many-to-one relationship between two version classes. """ @property def relationship(obj): query = self.query(obj) return self.process_query(query) return relationship
[docs] def association_subquery(self, obj): """ Returns an EXISTS clause that checks if an association exists for given SQLAlchemy declarative object. This query is used by many_to_many_criteria method. Example query: .. code-block:: sql EXISTS ( SELECT 1 FROM article_tag_version WHERE article_id = 3 AND tag_id = tags_version.id AND operation_type != 2 AND EXISTS ( SELECT 1 FROM article_tag_version as article_tag_version2 WHERE article_tag_version2.tag_id = article_tag_version.tag_id AND article_tag_version2.tx_id <=5 GROUP BY article_tag_version2.tag_id HAVING MAX(article_tag_version2.tx_id) = article_tag_version.tx_id ) ) :param obj: SQLAlchemy declarative object """ tx_column = option(obj, 'transaction_column_name') reflector = VersionExpressionReflector(obj, self.property) association_table_alias = self.association_version_table.alias() association_cols = [ association_table_alias.c[association_col.name] for _, association_col in self.remote_to_association_column_pairs ] association_exists = sa.exists( sa.select( [1] ).where( sa.and_( association_table_alias.c[tx_column] <= getattr(obj, tx_column), *[association_col == self.association_version_table.c[association_col.name] for association_col in association_cols] ) ).group_by( *association_cols ).having( sa.func.max(association_table_alias.c[tx_column]) == self.association_version_table.c[tx_column] ).correlate(self.association_version_table) ) return sa.exists( sa.select( [1] ).where( sa.and_( reflector(self.property.primaryjoin), association_exists, self.association_version_table.c.operation_type != Operation.DELETE, adapt_columns(self.property.secondaryjoin), ) ).correlate(self.local_cls, self.remote_cls) )
[docs] def build_association_version_tables(self): """ Builds many-to-many association version table for given property. Association version tables are used for tracking change history of many-to-many associations. """ column = list(self.property.remote_side)[0] self.manager.association_tables.add(column.table) builder = TableBuilder( self.manager, column.table ) metadata = column.table.metadata if builder.parent_table.schema: table_name = builder.parent_table.schema + '.' + builder.table_name elif metadata.schema: table_name = metadata.schema + '.' + builder.table_name else: table_name = builder.table_name if table_name not in metadata.tables: self.association_version_table = table = builder() self.manager.association_version_tables.add(table) else: # may have already been created if we visiting the 'other' side of # a self-referential many-to-many relationship self.association_version_table = metadata.tables[table_name]
def __call__(self): """ Builds reflected relationship between version classes based on given parent object's RelationshipProperty. """ self.local_cls = version_class(self.model) self.versioned = False try: self.remote_cls = version_class(self.property.mapper.class_) self.versioned = True except (AttributeError, KeyError): return except ClassNotVersioned: self.remote_cls = self.property.mapper.class_ if (self.property.secondary is not None and not self.property.viewonly and not self.manager.is_excluded_property( self.model, self.property.key)): self.build_association_version_tables() # store remote cls to association table column pairs self.remote_to_association_column_pairs = [] for column_pair in self.property.local_remote_pairs: if column_pair[0] in self.property.table.c.values(): self.remote_to_association_column_pairs.append(column_pair) setattr( self.local_cls, self.property.key, self.reflected_relationship )