#python #postgresql #flask #sqlalchemy
Каким образом правильно работать с большим количеством записей с SQLAlchemy? У меня есть две таблички. В первой 5 миллионов записей вида: question_id, view_count, counted. Во второй таблице находятся сумма view_count для каждого уникального question_id. Если мы учли запись из первой таблицы во второй, counted выставляется в истину. Сейчас это выглядет так: def update_most_viewed(): query = QuestionViewHistory.query.filter_by(counted=False).distinct() question_count = query.count() frame_size = 1000 counter = 0 while counter <= question_count: all_questions = query.offset(counter*frame_size).limit(frame_size).all() counter = counter + frame_size for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first() if most_viewed_question is None: most_viewed_question = MostViewedQuestion(question.question_id, question.view_count) db.session.add(most_viewed_question) else: most_viewed_question.view_count += question.view_count question.counted = True db.session.commit() Вызываю функцию из консоли. Инициализация: app = Flask(__name__) db = SQLAlchemy(app) Проблема в том, что с каждым проходом время растет экспоненциально: после пятого прохода все зависает. Если запустить программу повторно, все повторяется один в один. На сколько я понимаю, проблема в том, что при каждом вызове commit, SQLAlchemy обновляет все атрибуты всех объектов в сессии, но способа как это поправить, к сожалению, не нашел. Обновление Классы моделей, которые фигурируют в запросе. class MostViewedQuestion(db.Model): __tablename__ = 'most_viewed_question' id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) is_associated = db.Column(db.Boolean) can_be_associated = db.Column(db.Boolean) title = db.Column(db.String(500)) body = db.Column(db.String(30000)) tags = db.Column(db.String(500)) last_update_date = db.Column(db.DateTime) def __init__(self, question_id, view_count, is_associated=False): self.question_id = question_id self.view_count = view_count self.is_associated = is_associated self.can_be_associated = True self.last_update_date = datetime.datetime.now() def __repr__(self): return '' % str(self.id) class QuestionViewHistory(db.Model): __tablename__ = 'question_view_history' id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) view_date = db.Column(db.DateTime) counted = db.Column(db.Boolean) def __init__(self, question_id, view_count, view_date): self.question_id = question_id self.view_count = view_count self.view_date = view_date self.counted = False def __repr__(self): return ' ' % str(self.id) Код всего проекта доступен на GitHub, все модели находятся в файле models.py, функция update_most_viewed в файле database.py. В папке cvs_data_ru данные для тестов.
Ответы
Ответ 1
Стоит начать с того, как делать не нужно. Например, не нужно перебирать объекты в базе по-одному: for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first() этот цикл - совсем нет-нет-нет. Такого поведения нужно избегать любой ценой - НАМНОГО лучше запросить сразу миллион строк, чем миллион раз по одной строке. Если нужно получить все объекты MostViewedQuestion, то лучше это сделать одним запросом: most_viewed_questions = MostViewedQuestion.query.filter_by(question_id.in_=questions) В таком случае отпадает нужда и во внешнем цикле while, потому что по-прежнему, лучше запросить один раз миллион, чем тысячу раз по тысяче. После такого запроса БД вернет те most_viewed_questions, для которых есть соответствующая запись. Встает вопрос: что делать с теми, у которых такой записи нет? Такие запросы выполняются в БД очень часто и часто их называют UPSERT (UPDATE + INSERT) - нужно одновременно и обновить какую-то запись, а если ее нет, то создать ее. Все, что нужно - это выполнить вот этот вот upsert средствами sqlalchemy. Состоять запрос будет из двух подзапросов - один обновит существующие записи (update), другой создаст новые (insert). UPDATE в целом довольно прямолинейный: from sqlalchemy import not_, select, exists update_query = MostViewedQuestion.__table__.update().values( view_count=MostViewedQuestion.view_count + QuestionViewHistory.view_count ).where(and_( MostViewedQuestion.question_id == QuestionViewHistory.question_id, QuestionViewHistory.counted == True )) Оно генерирует вот такой SQL: UPDATE most_viewed_question SET view_count=(most_viewed_question.view_count + question_view_history.view_count) FROM question_view_history WHERE most_viewed_question.question_id = question_view_history.question_id AND question_view_history.counted = true Я использовал запись MostViewedQuestion.__table__, потому что мои модели наследуются от declarative_base(), а методы update(), delete(), insert() есть у класса Table(у Base их нет). Для declarative_base сама таблица находится в поле __table__. INSERT немного более запутанный, но самая мякотка - from_select(), который генерирует INSERT ... FROM SELECT: insert_query = MostViewedQuestion.__table__.insert().\ from_select([MostViewedQuestion.question_id, MostViewedQuestion.view_count], select([QuestionViewHistory.question_id, QuestionViewHistory.view_count]). where(and_(not_(exists([MostViewedQuestion.question_id]).where(MostViewedQuestion.question_id == QuestionViewHistory.question_id) ), # WHERE ... AND ... QuestionViewHistory.counted == True)) ) SESSION.execute(update_query) SESSION.execute(insert_query) SESSION.commit() SQL: INSERT INTO most_viewed_question (question_id, view_count) SELECT question_view_history.question_id, question_view_history.view_count FROM question_view_history WHERE NOT (EXISTS ( SELECT most_viewed_question.question_id FROM most_viewed_question WHERE most_viewed_question.question_id = question_view_history.question_id)) AND question_view_history.counted = true Я бы не сказал, что данный запрос - образец скорости, но самое главное в этих запросах - так это то, что работает БД. Питонский код в это время просто ждет ответа от БД и нам вообще не надо думать об оптимизации питоновского кода. Не нужно ломать голову об устройстве SQLAlchemy. Зато стоит подумать об оптимизации SQL, но с этим несколько легче, потому что UPSERT - операция типичная и по ней много всего написано. Но это не повод расслабляться, потому что при обновлении/вставке большого количества записей в БД есть свои нюансы (например, раздувание таблиц (table bloating) или индексы/триггеры, которые тормозят процесс и перед массовой вставкой их выключают).Ответ 2
Что получилось в результате: def update_most_viewed(): reader_session = db_session() question_count = reader_session.query(func.count(QuestionViewHistory.id)).filter_by(counted=False).scalar() query = reader_session.query(QuestionViewHistory.id, QuestionViewHistory.question_id, QuestionViewHistory.view_count).filter_by(counted=False) frame_size = 1000 progress_index = 0 counter = 0 print "Questions to update: %s, frame size: %s" % (question_count, frame_size) while counter <= question_count: all_questions = query.offset(0).limit(frame_size).all() counter = counter + frame_size wiriter_session = db_session() for question in all_questions: record_id, question_id, view_count = question most_viewed_question = wiriter_session.query(MostViewedQuestion).filter_by(question_id=question_id).first() if most_viewed_question is None: most_viewed_question = MostViewedQuestion(question_id, view_count) wiriter_session.add(most_viewed_question) else: most_viewed_question.view_count += view_count qh = wiriter_session.query(QuestionViewHistory).filter_by(id=record_id).first() qh.counted = True wiriter_session.add(qh) print_progress_bar(progress_index, question_count, prefix = 'Progress:', suffix = 'Complete') progress_index +=1 wiriter_session.commit() wiriter_session.close() print "All questions were counted" В чем отличие от исходного варианта Если верить документации SQLAlchemy, ORM отслеживает все объекты находящиеся в сессии, для того, чтобы гарантировать их актуальность относительно данных в базе. Это приводит к тому, что после каждого вызова commit, ORM отмечает объекты сессии как требующие обновления, что приводит к большой нагрузке на БД. С ростом количества объектов в сессии растет и время их обновления. Решение сводится к тому, чтобы держать в сессии как можно меньше объектов. Я добавил две сессии: reader_session и wiriter_session, которые, судя по ответу, на самом деле являются одной и той же сессией. Далее (главный трюк) при обращении к базе я запрашиваю не весь объект целиком, а только нужную информацию (id, question_id, view_count), количество объектов считаю через функцию func.count(QuestionViewHistory.id). В результате, как я понимаю, у меня нет объектов в сессии для отслеживания. Все операции на запись добавляю в wiriter_session. Сейчас скорость выполнения функции не зависит от количества данных (от номера прохода), все же остается низкой. Буду думать над улучшением алгоритма. П.С. Как я понимаю, в исходном коде была допущена ошибка в работе с «окном»: в цикле мы меняем таблицу (обновляем counted), а потом сдвигаем окно, чем пропускаем часть записей. Как я понимаю, надо все время начинать с нулевой позиции. То есть: all_questions = query.offset(0).limit(frame_size).all()
Комментариев нет:
Отправить комментарий