Страницы

Поиск по вопросам

Показаны сообщения с ярлыком sqlalchemy. Показать все сообщения
Показаны сообщения с ярлыком sqlalchemy. Показать все сообщения

вторник, 25 февраля 2020 г.

sqlalchemy отношения

#python #flask #sqlalchemy




Товарищи, подскажите, как с помощью sqlalchemy организовать такой функционал. Есть
две таблицы: категории и под категории, во второй есть ключ на первую. Когда создается
компания, она должна указывать направление деятельности, но ее деятельность, может
быть гораздо уже и ограничиваться несколькими элементами из этой категории. Покажите,
пример, как описать классы, и чтобы можно было получить все категории компании, категории
и их под категории, а так же, чтобы по (под категориям и категориям) я мог найти все
компании.
    


Ответы

Ответ 1



А есть ли смысл разделять категории и подкатегории? Сделайте одну таблицу категорий, у которых есть ссылка на родительскую категорию (на себя же). Это позволит произвольную вложенность подкатегорий и уберет лишнюю сущность. Получится что-то вроде такого: from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship Base = declarative_base() class Company(Base): __tablename__ = 'company' id = Column(Integer, primary_key=True) name = Column(String) def __repr__(self): return '>> c1 = Company(name='Google') >>> c2 = Company(name='Yahoo') >>> cat1 = Category(name='IT') >>> cat2 = Category(name='Search engine', parent=cat1) >>> cat1.subcategories [>> cat2.parent >>> c1.categories.extend([Relationship(category=cat1), Relationship(category=cat2)]) >>> c1.categories [>> c1.categories[1].category.name 'Search engine' >>> c2.categories []

четверг, 2 января 2020 г.

Как правильно оформить модели SQLAlchemy?

#python #архитектура #sqlalchemy #model


Попробую сформулировать вопрос.

Сейчас я изучаю Python и SQLAlchemy делая проект для себя. Создал таблицу User:

Model = declarative_base()

class User(Model):
    __tablename__ = "users"

    id = Column(Integer, primary_key = True)
    nickname = Column(String)
    email    = Column(String)
    password = Column(String)


Добавил туда несколько записей.

Допустим теперь я хочу проверить - существует ли какой либо пользователь с указанным
email? Я делаю следующее:

bool(session.query(exists().where(User.email == email)).scalar())


Как по мне - этот код абсолютно не читаем. Поэтому я вынес его (и другие подобные
функции работы с базой) в отдельный класс Users. И вызываю их по мере необходимости
в логике приложения.

class Users:
    def exists(email):
        return bool(session.query(exists().where(User.email == email)).scalar())

    def add(nick, email, password):
        pass # Тут код

    def email_exists(email):
        pass # Тут код

    def update(id, nick, email, password):
        pass # Тут код


Теперь я хочу понять. Насколько я продвинулся в велосипедостроении? Как нужно делать
"по хорошему"? Я открыл несколько проектов с открытым исходным кодом и не встретил
там ничего подобного. Поэтому у меня закрадывается мысль, что это не совсем верный путь.
    


Ответы

Ответ 1



Я когда-то делал подобным образом. Как по мне - нормальная реализация методов моделей. Неплохим решением будет создать базовый класс, от которого можно наследовать другие модели, а в самих методах добавлять нужную логику. Для своих нужд, работая с фреймворком Flask, написал такую такую штуковину: from ._base import db from sqlalchemy.exc import IntegrityError, InterfaceError from flask import flash from sqlalchemy import event from sqlalchemy.event import listen from sqlalchemy.orm.interfaces import MapperExtension from ..utils.redis import redis_store from sqlalchemy import inspect from sqlalchemy.ext.declarative import as_declarative, declared_attr from pickle import dumps, loads @as_declarative() class BaseExtension(MapperExtension): def after_insert(self, mapper, connection, instance): row = instance.query.filter_by(id = instance.id).first() payload = {'model': instance.__class__.__name__, 'data': instance.id, 'type': 'INSERT', 'row': row} redis_store.publish('realtime', dumps(payload)) def after_update(self, mapper, connection, instance): row = instance.query.filter_by(id = instance.id).first() payload = {'model': instance.__class__.__name__, 'data': instance.id, 'type': 'UPDATE', 'row': row} redis_store.publish('realtime', dumps(payload)) def after_delete(self, mapper, connection, instance): row = instance.query.filter_by(id = instance.id).first() payload = {'model': instance.__class__.__name__, 'data': instance.id, 'type': 'DELETE', 'row': row} redis_store.publish('realtime', dumps(payload)) class Base(db.Model): __abstract__ = True __mapper_args__ = { 'extension': BaseExtension() } id = db.Column(db.Integer, primary_key=True) created_at = db.Column(db.DateTime, default=db.func.current_timestamp()) modified_at = db.Column(db.DateTime, default=db.func.current_timestamp(), onupdate=db.func.current_timestamp()) @classmethod def create(cls,**kwargs): c = cls(**kwargs) db.session.add(c) try: db.session.commit() flash((c.__tablename__).capitalize() + u' created successfully!', 'success') except IntegrityError: db.session.rollback() flash((c.__tablename__).capitalize() + u' created failed!' + u' IntegrityError', 'error') except InterfaceError: db.session.rollback() flash((c.__tablename__).capitalize() + u' created failed!' + u' InterfaceError', 'error') return c def __repr__(self): mapper = inspect(self).mapper ent = [] object_data = {col.key: getattr(self, col.key) if not col.key == 'created_at' and not col.key == 'password' and not col.key == 'modified_at' else None for col in mapper.column_attrs} return "{0}".format(object_data) Сейчас я вижу, сколько тут всего ненужного и неправильного, но во всяком случае, для саморазвития, это был хороший опыт. Во-первых, те поля, которые повторяются во всех моделях (id, created_at, modified_al) описываются единожды. Во-вторых, наследуемый create(), с обработкой некоторых исключений. В-третьих, события ORM (мой метод реализации которых считается устаревшим) after_insert, after_update, after_delete обрабатываются, и данные отдаются в redis. Я это делал для "real-time" передачи данных (нотификации, обновления данных в дашборде и пр.) В-четвёртых, __repr__, который так же универсален для всех моделей. Так же, сюда можно добавить так называемый 'soft-delete', который позволит не удалять данные из БД насовсем, а так же методы, чтобы доставать из базы "не удалённые", все и только удалённые. Кучу всего можно придумать при необходимости. В конечном счёте, один раз реализованный базовый класс сэкономит много времени и места в моделях.

Ответ 2



Вообще подход то правильный в духе ОО, но как правило session - не стоит использовать в модели, тем более как у вас в примере глобальную переменную session, лучше тогда ее передавать в метод/свойство. И в вашем примере есть некоторая неправильность на мой взгляд - методы указанные в примере должны быть classmethod - методами класса, ведь они не привязаны к экземпляру сущности Я бы посоветовал вам по возможности вместо обычных свойств/методов делать гибридные - на всякий случай. Вдруг потом понадобится делать запросы используя кусок логики свойства в конечных запросах они порой повышают читаемость.

понедельник, 23 декабря 2019 г.

Работа с большим количеством записей в сессии

#python #postgresql #flask #sqlalchemy


Каким образом правильно работать с большим количеством записей с SQLAlchemy?

У меня есть две таблички. В первой 5 миллионов записей вида: question_id, view_count,
counted. Во второй таблице находятся сумма view_count для каждого уникального question_id.
Если мы учли запись из первой таблицы во второй, counted выставляется в истину. 

Сейчас это выглядет так:

def update_most_viewed():
    query = QuestionViewHistory.query.filter_by(counted=False).distinct()
    question_count = query.count()
    frame_size = 1000
    counter = 0

    while counter <= question_count:
        all_questions = query.offset(counter*frame_size).limit(frame_size).all()
        counter = counter + frame_size

        for question in all_questions:
            most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()

            if most_viewed_question is None:
                most_viewed_question = MostViewedQuestion(question.question_id, question.view_count)
                db.session.add(most_viewed_question)
            else:
                most_viewed_question.view_count += question.view_count

            question.counted = True

        db.session.commit()


Вызываю функцию из консоли. Инициализация:

app = Flask(__name__)
db = SQLAlchemy(app) 


Проблема в том, что с каждым проходом время растет экспоненциально: после пятого
прохода все зависает. Если запустить программу повторно, все повторяется один в один.

На сколько я понимаю, проблема в том, что при каждом вызове commit, SQLAlchemy обновляет
все атрибуты всех объектов в сессии, но способа как это поправить, к сожалению, не нашел.

Обновление

Классы моделей, которые фигурируют в запросе.

class MostViewedQuestion(db.Model):
    __tablename__ = 'most_viewed_question'

    id = db.Column(db.Integer, primary_key=True)
    question_id = db.Column(db.Integer)
    view_count = db.Column(db.Integer)    
    is_associated = db.Column(db.Boolean)
    can_be_associated = db.Column(db.Boolean)
    title = db.Column(db.String(500))
    body = db.Column(db.String(30000))
    tags = db.Column(db.String(500))   
    last_update_date = db.Column(db.DateTime)  

    def __init__(self, question_id, view_count, is_associated=False):
        self.question_id = question_id
        self.view_count = view_count
        self.is_associated = is_associated
        self.can_be_associated = True
        self.last_update_date = datetime.datetime.now()

    def __repr__(self):
        return '' % str(self.id)        

class QuestionViewHistory(db.Model):
    __tablename__ = 'question_view_history'

    id = db.Column(db.Integer, primary_key=True)
    question_id = db.Column(db.Integer)
    view_count = db.Column(db.Integer)       
    view_date = db.Column(db.DateTime) 
    counted = db.Column(db.Boolean)

    def __init__(self, question_id, view_count, view_date):
        self.question_id = question_id
        self.view_count = view_count
        self.view_date = view_date
        self.counted = False

    def __repr__(self):
        return '' % str(self.id)        


Код всего проекта доступен на GitHub, все модели находятся в файле models.py, функция
update_most_viewed в файле database.py. В папке cvs_data_ru данные для тестов.
    


Ответы

Ответ 1



Стоит начать с того, как делать не нужно. Например, не нужно перебирать объекты в базе по-одному: for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first() этот цикл - совсем нет-нет-нет. Такого поведения нужно избегать любой ценой - НАМНОГО лучше запросить сразу миллион строк, чем миллион раз по одной строке. Если нужно получить все объекты MostViewedQuestion, то лучше это сделать одним запросом: most_viewed_questions = MostViewedQuestion.query.filter_by(question_id.in_=questions) В таком случае отпадает нужда и во внешнем цикле while, потому что по-прежнему, лучше запросить один раз миллион, чем тысячу раз по тысяче. После такого запроса БД вернет те most_viewed_questions, для которых есть соответствующая запись. Встает вопрос: что делать с теми, у которых такой записи нет? Такие запросы выполняются в БД очень часто и часто их называют UPSERT (UPDATE + INSERT) - нужно одновременно и обновить какую-то запись, а если ее нет, то создать ее. Все, что нужно - это выполнить вот этот вот upsert средствами sqlalchemy. Состоять запрос будет из двух подзапросов - один обновит существующие записи (update), другой создаст новые (insert). UPDATE в целом довольно прямолинейный: from sqlalchemy import not_, select, exists update_query = MostViewedQuestion.__table__.update().values( view_count=MostViewedQuestion.view_count + QuestionViewHistory.view_count ).where(and_( MostViewedQuestion.question_id == QuestionViewHistory.question_id, QuestionViewHistory.counted == True )) Оно генерирует вот такой SQL: UPDATE most_viewed_question SET view_count=(most_viewed_question.view_count + question_view_history.view_count) FROM question_view_history WHERE most_viewed_question.question_id = question_view_history.question_id AND question_view_history.counted = true Я использовал запись MostViewedQuestion.__table__, потому что мои модели наследуются от declarative_base(), а методы update(), delete(), insert() есть у класса Table(у Base их нет). Для declarative_base сама таблица находится в поле __table__. INSERT немного более запутанный, но самая мякотка - from_select(), который генерирует INSERT ... FROM SELECT: insert_query = MostViewedQuestion.__table__.insert().\ from_select([MostViewedQuestion.question_id, MostViewedQuestion.view_count], select([QuestionViewHistory.question_id, QuestionViewHistory.view_count]). where(and_(not_(exists([MostViewedQuestion.question_id]).where(MostViewedQuestion.question_id == QuestionViewHistory.question_id) ), # WHERE ... AND ... QuestionViewHistory.counted == True)) ) SESSION.execute(update_query) SESSION.execute(insert_query) SESSION.commit() SQL: INSERT INTO most_viewed_question (question_id, view_count) SELECT question_view_history.question_id, question_view_history.view_count FROM question_view_history WHERE NOT (EXISTS ( SELECT most_viewed_question.question_id FROM most_viewed_question WHERE most_viewed_question.question_id = question_view_history.question_id)) AND question_view_history.counted = true Я бы не сказал, что данный запрос - образец скорости, но самое главное в этих запросах - так это то, что работает БД. Питонский код в это время просто ждет ответа от БД и нам вообще не надо думать об оптимизации питоновского кода. Не нужно ломать голову об устройстве SQLAlchemy. Зато стоит подумать об оптимизации SQL, но с этим несколько легче, потому что UPSERT - операция типичная и по ней много всего написано. Но это не повод расслабляться, потому что при обновлении/вставке большого количества записей в БД есть свои нюансы (например, раздувание таблиц (table bloating) или индексы/триггеры, которые тормозят процесс и перед массовой вставкой их выключают).

Ответ 2



Что получилось в результате: def update_most_viewed(): reader_session = db_session() question_count = reader_session.query(func.count(QuestionViewHistory.id)).filter_by(counted=False).scalar() query = reader_session.query(QuestionViewHistory.id, QuestionViewHistory.question_id, QuestionViewHistory.view_count).filter_by(counted=False) frame_size = 1000 progress_index = 0 counter = 0 print "Questions to update: %s, frame size: %s" % (question_count, frame_size) while counter <= question_count: all_questions = query.offset(0).limit(frame_size).all() counter = counter + frame_size wiriter_session = db_session() for question in all_questions: record_id, question_id, view_count = question most_viewed_question = wiriter_session.query(MostViewedQuestion).filter_by(question_id=question_id).first() if most_viewed_question is None: most_viewed_question = MostViewedQuestion(question_id, view_count) wiriter_session.add(most_viewed_question) else: most_viewed_question.view_count += view_count qh = wiriter_session.query(QuestionViewHistory).filter_by(id=record_id).first() qh.counted = True wiriter_session.add(qh) print_progress_bar(progress_index, question_count, prefix = 'Progress:', suffix = 'Complete') progress_index +=1 wiriter_session.commit() wiriter_session.close() print "All questions were counted" В чем отличие от исходного варианта Если верить документации SQLAlchemy, ORM отслеживает все объекты находящиеся в сессии, для того, чтобы гарантировать их актуальность относительно данных в базе. Это приводит к тому, что после каждого вызова commit, ORM отмечает объекты сессии как требующие обновления, что приводит к большой нагрузке на БД. С ростом количества объектов в сессии растет и время их обновления. Решение сводится к тому, чтобы держать в сессии как можно меньше объектов. Я добавил две сессии: reader_session и wiriter_session, которые, судя по ответу, на самом деле являются одной и той же сессией. Далее (главный трюк) при обращении к базе я запрашиваю не весь объект целиком, а только нужную информацию (id, question_id, view_count), количество объектов считаю через функцию func.count(QuestionViewHistory.id). В результате, как я понимаю, у меня нет объектов в сессии для отслеживания. Все операции на запись добавляю в wiriter_session. Сейчас скорость выполнения функции не зависит от количества данных (от номера прохода), все же остается низкой. Буду думать над улучшением алгоритма. П.С. Как я понимаю, в исходном коде была допущена ошибка в работе с «окном»: в цикле мы меняем таблицу (обновляем counted), а потом сдвигаем окно, чем пропускаем часть записей. Как я понимаю, надо все время начинать с нулевой позиции. То есть: all_questions = query.offset(0).limit(frame_size).all()

вторник, 10 декабря 2019 г.

Как правильно обращаться к объекту сессии?

#python #postgresql #flask #sqlalchemy


Есть код:

app = Flask(__name__)
db = SQLAlchemy(app)  


def make_db_session(engine):
    return scoped_session(sessionmaker(autocommit=False,
        autoflush=True,
        bind=engine))    

def make_db_engine():
    return create_engine(app.config['SQLALCHEMY_DATABASE_URI'], convert_unicode=True)

engine = make_db_engine()
db_session = make_db_session(engine)


Далее, я вижу несколько примеров обращения к сессии.


Первый вариант

db.session.some_action()

Второй вариант

db_session.some_action()

Третий вариант

session = db_session()
session.some_action()



В чем их различия? Какой способ является более верным?

П.С. Еще интересно то, что если делать выборку следующим образом

model = SomeModel.query.filter(some_filter).first()
model.field = new_value


То сохранять необходимо как

db.session.commit()


То есть, создается впечатление, что по умолчанию используется сессия db.session.
    


Ответы

Ответ 1



Можно начать с отличий: Функция make_db_session возвращает объект типа ScopedSession (scoped_session - это класс такой, который зачем-то назвали как функцию). scoped_session - обычная такая сессия, но помимо бла-бла-бла про потоки, основное ее отличие, видимое пользователям, в следующем: # Обычные сессии Session = sessionmaker(__config_here__) session_1 = Session() session_2 = Session() my_db_entry = Entry(slug='Yolo') session_1.add(my_db_entry) # На этом моменте my_db_entry привязан в первой сессии session_2.add(my_db_entry) # На этом моменте возникнет исключение То есть добавить один и тот же объект с разных сессий не выйдет. ScopedSession - это на самом деле одна и та же сессия всегда (в данном потоке, пояснения ниже). Т.е. session_factory = sessionmaker(bind=some_engine) print(type(session_factory)) >>> session_registry = ScopedSession(session_factory) print(type(session_registry)) >>> При вызове session_registry() (__call__) без параметров возвращается то, что лежит в регистре. Регистр - экземпляр класса ThreadLocalRegistry. То, что туда однажды положили, изменить нельзя (На самом деле я не уверен - возможно, в методе баг или лишний код - он как-то странно написан - смотри с 64 строки в sqlalchemy/orm/scoping.py). Также, если вызовете session_registry() из другого потока - то в нем уже будет другая сессия, созданная специально для него. То есть "thread-local" в документации означает, что для каждого потока своя сессия, а не то, что сессии как-то хитро синхронизируются, ставят локи и т.д. Это достигается использованием threading.local() в регистре. session_1 = session_registry() session_2 = session_registry() print(type(session1), type(session_2)) >>> session_1 is session_2 >>> True это одна и та же сессия. Это означает, например, что регистр с сессией можно сделать глобальным. Также необязательно создавать объекты класса Session (session_1 и session-2) - так как сессия одна вполне зайдет session_registry.query(MyClass).all() Также в том же разделе есть наглядная диаграмма, показывающая когда нужно создавать регистр, когда нужно создавать и закрывать сессию. Что использовать "правильнее" сказать сложно, но я бы сказал, что scoped_session, а не создавать свою гольную Session(...), потому что сессии не потоко-безопасны и придется создавать на каждый поток новую сессию, либо синхронизироваться как-то. Чтобы использовать свою сессию, а не предоставленную flask-sqlalchemy, Есть несколько путей: вызывать query на нужной сессии: session.query(MyObject.prop, AnotherOne.prop2).filter(blah-blah) Также вы можете на свои модели установить "сессию по умолчанию". У каждой модели может быть поле query, куда можно присунуть "запросник" (на этом моменте все начинает быть настолько запутанно, что не поддается объяснению). Примерно как-то так. Также есть небольшое обсуждение разницы между этими подходами на ENSO. Update: Возвращаясь к тому, какой вариант "верный", скажу, что второй и третий равнозначны, но использовать стоит третий, чтобы точно знать, что используется объект Session, а не какой-то иной. Но тогда зачем использовать flask-SQLAlchemy? Выходит, что многое из того, что эта библиотека прячет от пользователя (настройки сессии, обратные вызовы типа before_rollback, after_commit, after_flush и т.д. навешаны на библиотечную сессию. Также используется свой особенный "запросник" - класс orm.Query со всякими плюшками, вроде автоматической отправки 404, если нет объекта в БД. Логи всякие, встроенная пагинация (постраничные запросы в БД). Также сессия по умолчанию - db.session - во время создания моделей именно она записывается в свойство query. Вы можете убедиться в этом, отыскав класс SQLAlchemy в пакете, в нем метод __init__, а в нем уже строчку, в которой инициализируется поле Model (которое затем используется как базовый класс). Это поле инициализируется результатом функции make_declarative_base и вот как эта функция выглядит (довольно простая): def make_declarative_base(self, metadata=None): """Creates the declarative base.""" base = declarative_base(cls=Model, name='Model', metadata=metadata, metaclass=_BoundDeclarativeMeta) base.query = _QueryProperty(self) return base Как видно, заветный query инициализируется своим классом, который создается именно с сессией flask-SQLAlchemy, а не какой-нибудь иной. Т.о. сессия по умолчанию была явно объявлена в базовом классе db.Model. Можно поменять ее, перезаписав query на тот, что нужен.

понедельник, 13 мая 2019 г.

sqlalchemy отношения


Товарищи, подскажите, как с помощью sqlalchemy организовать такой функционал. Есть две таблицы: категории и под категории, во второй есть ключ на первую. Когда создается компания, она должна указывать направление деятельности, но ее деятельность, может быть гораздо уже и ограничиваться несколькими элементами из этой категории. Покажите, пример, как описать классы, и чтобы можно было получить все категории компании, категории и их под категории, а так же, чтобы по (под категориям и категориям) я мог найти все компании.


Ответ

А есть ли смысл разделять категории и подкатегории? Сделайте одну таблицу категорий, у которых есть ссылка на родительскую категорию (на себя же). Это позволит произвольную вложенность подкатегорий и уберет лишнюю сущность. Получится что-то вроде такого:
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship
Base = declarative_base()
class Company(Base): __tablename__ = 'company'
id = Column(Integer, primary_key=True) name = Column(String)
def __repr__(self): return 'class Category(Base): __tablename__ = 'category'
id = Column(Integer, primary_key=True) name = Column(String) parent_id = Column(Integer, ForeignKey('category.id'))
parent = relationship('Category', remote_side=id, backref='subcategories')
def __repr__(self): return 'class Relationship(Base): __tablename__ = 'relationships'
company_id = Column(Integer, ForeignKey('company.id'), primary_key=True) category_id = Column(Integer, ForeignKey('category.id'), primary_key=True)
company = relationship('Company', backref='categories') category = relationship('Category', backref='companies')
def __repr__(self): return 'Вот пример использования:
>>> c1 = Company(name='Google') >>> c2 = Company(name='Yahoo') >>> cat1 = Category(name='IT') >>> cat2 = Category(name='Search engine', parent=cat1) >>> cat1.subcategories [>> cat2.parent >>> c1.categories.extend([Relationship(category=cat1), Relationship(category=cat2)]) >>> c1.categories [>> c1.categories[1].category.name 'Search engine' >>> c2.categories []

среда, 9 января 2019 г.

Как правильно оформить модели SQLAlchemy?

Попробую сформулировать вопрос.
Сейчас я изучаю Python и SQLAlchemy делая проект для себя. Создал таблицу User
Model = declarative_base()
class User(Model): __tablename__ = "users"
id = Column(Integer, primary_key = True) nickname = Column(String) email = Column(String) password = Column(String)
Добавил туда несколько записей.
Допустим теперь я хочу проверить - существует ли какой либо пользователь с указанным email? Я делаю следующее:
bool(session.query(exists().where(User.email == email)).scalar())
Как по мне - этот код абсолютно не читаем. Поэтому я вынес его (и другие подобные функции работы с базой) в отдельный класс Users. И вызываю их по мере необходимости в логике приложения.
class Users: def exists(email): return bool(session.query(exists().where(User.email == email)).scalar())
def add(nick, email, password): pass # Тут код
def email_exists(email): pass # Тут код
def update(id, nick, email, password): pass # Тут код
Теперь я хочу понять. Насколько я продвинулся в велосипедостроении? Как нужно делать "по хорошему"? Я открыл несколько проектов с открытым исходным кодом и не встретил там ничего подобного. Поэтому у меня закрадывается мысль, что это не совсем верный путь.


Ответ

Я когда-то делал подобным образом. Как по мне - нормальная реализация методов моделей. Неплохим решением будет создать базовый класс, от которого можно наследовать другие модели, а в самих методах добавлять нужную логику.
Для своих нужд, работая с фреймворком Flask, написал такую такую штуковину:
from ._base import db from sqlalchemy.exc import IntegrityError, InterfaceError from flask import flash from sqlalchemy import event from sqlalchemy.event import listen from sqlalchemy.orm.interfaces import MapperExtension from ..utils.redis import redis_store from sqlalchemy import inspect from sqlalchemy.ext.declarative import as_declarative, declared_attr from pickle import dumps, loads
@as_declarative() class BaseExtension(MapperExtension):
def after_insert(self, mapper, connection, instance): row = instance.query.filter_by(id = instance.id).first() payload = {'model': instance.__class__.__name__, 'data': instance.id, 'type': 'INSERT', 'row': row} redis_store.publish('realtime', dumps(payload))
def after_update(self, mapper, connection, instance): row = instance.query.filter_by(id = instance.id).first() payload = {'model': instance.__class__.__name__, 'data': instance.id, 'type': 'UPDATE', 'row': row} redis_store.publish('realtime', dumps(payload))
def after_delete(self, mapper, connection, instance): row = instance.query.filter_by(id = instance.id).first() payload = {'model': instance.__class__.__name__, 'data': instance.id, 'type': 'DELETE', 'row': row} redis_store.publish('realtime', dumps(payload))

class Base(db.Model):
__abstract__ = True __mapper_args__ = { 'extension': BaseExtension() }
id = db.Column(db.Integer, primary_key=True) created_at = db.Column(db.DateTime, default=db.func.current_timestamp()) modified_at = db.Column(db.DateTime, default=db.func.current_timestamp(), onupdate=db.func.current_timestamp())
@classmethod def create(cls,**kwargs): c = cls(**kwargs) db.session.add(c) try: db.session.commit() flash((c.__tablename__).capitalize() + u' created successfully!', 'success') except IntegrityError: db.session.rollback() flash((c.__tablename__).capitalize() + u' created failed!' + u' IntegrityError', 'error') except InterfaceError: db.session.rollback() flash((c.__tablename__).capitalize() + u' created failed!' + u' InterfaceError', 'error') return c
def __repr__(self): mapper = inspect(self).mapper ent = [] object_data = {col.key: getattr(self, col.key) if not col.key == 'created_at' and not col.key == 'password' and not col.key == 'modified_at' else None for col in mapper.column_attrs}
return "{0}".format(object_data)
Сейчас я вижу, сколько тут всего ненужного и неправильного, но во всяком случае, для саморазвития, это был хороший опыт.
Во-первых, те поля, которые повторяются во всех моделях (id, created_at, modified_al) описываются единожды.
Во-вторых, наследуемый create(), с обработкой некоторых исключений.
В-третьих, события ORM (мой метод реализации которых считается устаревшим) after_insert, after_update, after_delete обрабатываются, и данные отдаются в redis. Я это делал для "real-time" передачи данных (нотификации, обновления данных в дашборде и пр.)
В-четвёртых, __repr__, который так же универсален для всех моделей.
Так же, сюда можно добавить так называемый 'soft-delete', который позволит не удалять данные из БД насовсем, а так же методы, чтобы доставать из базы "не удалённые", все и только удалённые.
Кучу всего можно придумать при необходимости.
В конечном счёте, один раз реализованный базовый класс сэкономит много времени и места в моделях.

вторник, 20 ноября 2018 г.

Работа с большим количеством записей в сессии

Каким образом правильно работать с большим количеством записей с SQLAlchemy?
У меня есть две таблички. В первой 5 миллионов записей вида: question_id, view_count, counted. Во второй таблице находятся сумма view_count для каждого уникального question_id. Если мы учли запись из первой таблицы во второй, counted выставляется в истину.
Сейчас это выглядет так:
def update_most_viewed(): query = QuestionViewHistory.query.filter_by(counted=False).distinct() question_count = query.count() frame_size = 1000 counter = 0
while counter <= question_count: all_questions = query.offset(counter*frame_size).limit(frame_size).all() counter = counter + frame_size
for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
if most_viewed_question is None: most_viewed_question = MostViewedQuestion(question.question_id, question.view_count) db.session.add(most_viewed_question) else: most_viewed_question.view_count += question.view_count
question.counted = True
db.session.commit()
Вызываю функцию из консоли. Инициализация:
app = Flask(__name__) db = SQLAlchemy(app)
Проблема в том, что с каждым проходом время растет экспоненциально: после пятого прохода все зависает. Если запустить программу повторно, все повторяется один в один.
На сколько я понимаю, проблема в том, что при каждом вызове commit, SQLAlchemy обновляет все атрибуты всех объектов в сессии, но способа как это поправить, к сожалению, не нашел.
Обновление
Классы моделей, которые фигурируют в запросе.
class MostViewedQuestion(db.Model): __tablename__ = 'most_viewed_question'
id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) is_associated = db.Column(db.Boolean) can_be_associated = db.Column(db.Boolean) title = db.Column(db.String(500)) body = db.Column(db.String(30000)) tags = db.Column(db.String(500)) last_update_date = db.Column(db.DateTime)
def __init__(self, question_id, view_count, is_associated=False): self.question_id = question_id self.view_count = view_count self.is_associated = is_associated self.can_be_associated = True self.last_update_date = datetime.datetime.now()
def __repr__(self): return '' % str(self.id)
class QuestionViewHistory(db.Model): __tablename__ = 'question_view_history'
id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) view_date = db.Column(db.DateTime) counted = db.Column(db.Boolean)
def __init__(self, question_id, view_count, view_date): self.question_id = question_id self.view_count = view_count self.view_date = view_date self.counted = False
def __repr__(self): return '' % str(self.id)
Код всего проекта доступен на GitHub, все модели находятся в файле models.py, функция update_most_viewed в файле database.py. В папке cvs_data_ru данные для тестов.


Ответ

Стоит начать с того, как делать не нужно. Например, не нужно перебирать объекты в базе по-одному:
for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
этот цикл - совсем нет-нет-нет. Такого поведения нужно избегать любой ценой - НАМНОГО лучше запросить сразу миллион строк, чем миллион раз по одной строке. Если нужно получить все объекты MostViewedQuestion, то лучше это сделать одним запросом:
most_viewed_questions = MostViewedQuestion.query.filter_by(question_id.in_=questions)
В таком случае отпадает нужда и во внешнем цикле while, потому что по-прежнему, лучше запросить один раз миллион, чем тысячу раз по тысяче. После такого запроса БД вернет те most_viewed_questions, для которых есть соответствующая запись. Встает вопрос: что делать с теми, у которых такой записи нет? Такие запросы выполняются в БД очень часто и часто их называют UPSERT (UPDATE + INSERT) - нужно одновременно и обновить какую-то запись, а если ее нет, то создать ее. Все, что нужно - это выполнить вот этот вот upsert средствами sqlalchemy. Состоять запрос будет из двух подзапросов - один обновит существующие записи (update), другой создаст новые (insert).
UPDATE в целом довольно прямолинейный:
from sqlalchemy import not_, select, exists
update_query = MostViewedQuestion.__table__.update().values( view_count=MostViewedQuestion.view_count + QuestionViewHistory.view_count ).where(and_( MostViewedQuestion.question_id == QuestionViewHistory.question_id, QuestionViewHistory.counted == True ))
Оно генерирует вот такой SQL:
UPDATE most_viewed_question SET view_count=(most_viewed_question.view_count + question_view_history.view_count) FROM question_view_history WHERE most_viewed_question.question_id = question_view_history.question_id AND question_view_history.counted = true
Я использовал запись MostViewedQuestion.__table__, потому что мои модели наследуются от declarative_base(), а методы update(), delete(), insert() есть у класса Table(у Base их нет). Для declarative_base сама таблица находится в поле __table__
INSERT немного более запутанный, но самая мякотка - from_select(), который генерирует INSERT ... FROM SELECT
insert_query = MostViewedQuestion.__table__.insert().\ from_select([MostViewedQuestion.question_id, MostViewedQuestion.view_count], select([QuestionViewHistory.question_id, QuestionViewHistory.view_count]). where(and_(not_(exists([MostViewedQuestion.question_id]).where(MostViewedQuestion.question_id == QuestionViewHistory.question_id) ), # WHERE ... AND ... QuestionViewHistory.counted == True)) )
SESSION.execute(update_query) SESSION.execute(insert_query) SESSION.commit()
SQL:
INSERT INTO most_viewed_question (question_id, view_count) SELECT question_view_history.question_id, question_view_history.view_count FROM question_view_history WHERE NOT (EXISTS ( SELECT most_viewed_question.question_id FROM most_viewed_question WHERE most_viewed_question.question_id = question_view_history.question_id)) AND question_view_history.counted = true
Я бы не сказал, что данный запрос - образец скорости, но самое главное в этих запросах - так это то, что работает БД. Питонский код в это время просто ждет ответа от БД и нам вообще не надо думать об оптимизации питоновского кода. Не нужно ломать голову об устройстве SQLAlchemy. Зато стоит подумать об оптимизации SQL, но с этим несколько легче, потому что UPSERT - операция типичная и по ней много всего написано. Но это не повод расслабляться, потому что при обновлении/вставке большого количества записей в БД есть свои нюансы (например, раздувание таблиц (table bloating) или индексы/триггеры, которые тормозят процесс и перед массовой вставкой их выключают).