#python #postgresql #flask #sqlalchemy
Каким образом правильно работать с большим количеством записей с SQLAlchemy?
У меня есть две таблички. В первой 5 миллионов записей вида: question_id, view_count,
counted. Во второй таблице находятся сумма view_count для каждого уникального question_id.
Если мы учли запись из первой таблицы во второй, counted выставляется в истину.
Сейчас это выглядет так:
def update_most_viewed():
query = QuestionViewHistory.query.filter_by(counted=False).distinct()
question_count = query.count()
frame_size = 1000
counter = 0
while counter <= question_count:
all_questions = query.offset(counter*frame_size).limit(frame_size).all()
counter = counter + frame_size
for question in all_questions:
most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
if most_viewed_question is None:
most_viewed_question = MostViewedQuestion(question.question_id, question.view_count)
db.session.add(most_viewed_question)
else:
most_viewed_question.view_count += question.view_count
question.counted = True
db.session.commit()
Вызываю функцию из консоли. Инициализация:
app = Flask(__name__)
db = SQLAlchemy(app)
Проблема в том, что с каждым проходом время растет экспоненциально: после пятого
прохода все зависает. Если запустить программу повторно, все повторяется один в один.
На сколько я понимаю, проблема в том, что при каждом вызове commit, SQLAlchemy обновляет
все атрибуты всех объектов в сессии, но способа как это поправить, к сожалению, не нашел.
Обновление
Классы моделей, которые фигурируют в запросе.
class MostViewedQuestion(db.Model):
__tablename__ = 'most_viewed_question'
id = db.Column(db.Integer, primary_key=True)
question_id = db.Column(db.Integer)
view_count = db.Column(db.Integer)
is_associated = db.Column(db.Boolean)
can_be_associated = db.Column(db.Boolean)
title = db.Column(db.String(500))
body = db.Column(db.String(30000))
tags = db.Column(db.String(500))
last_update_date = db.Column(db.DateTime)
def __init__(self, question_id, view_count, is_associated=False):
self.question_id = question_id
self.view_count = view_count
self.is_associated = is_associated
self.can_be_associated = True
self.last_update_date = datetime.datetime.now()
def __repr__(self):
return '' % str(self.id)
class QuestionViewHistory(db.Model):
__tablename__ = 'question_view_history'
id = db.Column(db.Integer, primary_key=True)
question_id = db.Column(db.Integer)
view_count = db.Column(db.Integer)
view_date = db.Column(db.DateTime)
counted = db.Column(db.Boolean)
def __init__(self, question_id, view_count, view_date):
self.question_id = question_id
self.view_count = view_count
self.view_date = view_date
self.counted = False
def __repr__(self):
return '' % str(self.id)
Код всего проекта доступен на GitHub, все модели находятся в файле models.py, функция
update_most_viewed в файле database.py. В папке cvs_data_ru данные для тестов.
Ответы
Ответ 1
Стоит начать с того, как делать не нужно. Например, не нужно перебирать объекты в
базе по-одному:
for question in all_questions:
most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
этот цикл - совсем нет-нет-нет. Такого поведения нужно избегать любой ценой - НАМНОГО
лучше запросить сразу миллион строк, чем миллион раз по одной строке. Если нужно получить
все объекты MostViewedQuestion, то лучше это сделать одним запросом:
most_viewed_questions = MostViewedQuestion.query.filter_by(question_id.in_=questions)
В таком случае отпадает нужда и во внешнем цикле while, потому что по-прежнему, лучше
запросить один раз миллион, чем тысячу раз по тысяче. После такого запроса БД вернет
те most_viewed_questions, для которых есть соответствующая запись. Встает вопрос: что
делать с теми, у которых такой записи нет? Такие запросы выполняются в БД очень часто
и часто их называют UPSERT (UPDATE + INSERT) - нужно одновременно и обновить какую-то
запись, а если ее нет, то создать ее. Все, что нужно - это выполнить вот этот вот upsert
средствами sqlalchemy. Состоять запрос будет из двух подзапросов - один обновит существующие
записи (update), другой создаст новые (insert).
UPDATE в целом довольно прямолинейный:
from sqlalchemy import not_, select, exists
update_query = MostViewedQuestion.__table__.update().values(
view_count=MostViewedQuestion.view_count + QuestionViewHistory.view_count
).where(and_(
MostViewedQuestion.question_id == QuestionViewHistory.question_id,
QuestionViewHistory.counted == True
))
Оно генерирует вот такой SQL:
UPDATE most_viewed_question SET view_count=(most_viewed_question.view_count + question_view_history.view_count)
FROM question_view_history
WHERE most_viewed_question.question_id = question_view_history.question_id
AND question_view_history.counted = true
Я использовал запись MostViewedQuestion.__table__, потому что мои модели наследуются
от declarative_base(), а методы update(), delete(), insert() есть у класса Table(у
Base их нет). Для declarative_base сама таблица находится в поле __table__.
INSERT немного более запутанный, но самая мякотка - from_select(), который генерирует
INSERT ... FROM SELECT:
insert_query = MostViewedQuestion.__table__.insert().\
from_select([MostViewedQuestion.question_id, MostViewedQuestion.view_count],
select([QuestionViewHistory.question_id, QuestionViewHistory.view_count]).
where(and_(not_(exists([MostViewedQuestion.question_id]).where(MostViewedQuestion.question_id
== QuestionViewHistory.question_id)
), # WHERE ... AND ...
QuestionViewHistory.counted == True))
)
SESSION.execute(update_query)
SESSION.execute(insert_query)
SESSION.commit()
SQL:
INSERT INTO most_viewed_question (question_id, view_count)
SELECT question_view_history.question_id, question_view_history.view_count
FROM question_view_history
WHERE NOT (EXISTS (
SELECT most_viewed_question.question_id
FROM most_viewed_question
WHERE most_viewed_question.question_id = question_view_history.question_id))
AND question_view_history.counted = true
Я бы не сказал, что данный запрос - образец скорости, но самое главное в этих запросах
- так это то, что работает БД. Питонский код в это время просто ждет ответа от БД и
нам вообще не надо думать об оптимизации питоновского кода. Не нужно ломать голову
об устройстве SQLAlchemy. Зато стоит подумать об оптимизации SQL, но с этим несколько
легче, потому что UPSERT - операция типичная и по ней много всего написано. Но это
не повод расслабляться, потому что при обновлении/вставке большого количества записей
в БД есть свои нюансы (например, раздувание таблиц (table bloating) или индексы/триггеры,
которые тормозят процесс и перед массовой вставкой их выключают).
Ответ 2
Что получилось в результате:
def update_most_viewed():
reader_session = db_session()
question_count = reader_session.query(func.count(QuestionViewHistory.id)).filter_by(counted=False).scalar()
query = reader_session.query(QuestionViewHistory.id, QuestionViewHistory.question_id,
QuestionViewHistory.view_count).filter_by(counted=False)
frame_size = 1000
progress_index = 0
counter = 0
print "Questions to update: %s, frame size: %s" % (question_count, frame_size)
while counter <= question_count:
all_questions = query.offset(0).limit(frame_size).all()
counter = counter + frame_size
wiriter_session = db_session()
for question in all_questions:
record_id, question_id, view_count = question
most_viewed_question = wiriter_session.query(MostViewedQuestion).filter_by(question_id=question_id).first()
if most_viewed_question is None:
most_viewed_question = MostViewedQuestion(question_id, view_count)
wiriter_session.add(most_viewed_question)
else:
most_viewed_question.view_count += view_count
qh = wiriter_session.query(QuestionViewHistory).filter_by(id=record_id).first()
qh.counted = True
wiriter_session.add(qh)
print_progress_bar(progress_index, question_count, prefix = 'Progress:',
suffix = 'Complete')
progress_index +=1
wiriter_session.commit()
wiriter_session.close()
print "All questions were counted"
В чем отличие от исходного варианта
Если верить документации SQLAlchemy, ORM отслеживает все объекты находящиеся в сессии,
для того, чтобы гарантировать их актуальность относительно данных в базе. Это приводит
к тому, что после каждого вызова commit, ORM отмечает объекты сессии как требующие
обновления, что приводит к большой нагрузке на БД. С ростом количества объектов в сессии
растет и время их обновления. Решение сводится к тому, чтобы держать в сессии как можно
меньше объектов.
Я добавил две сессии: reader_session и wiriter_session, которые, судя по ответу,
на самом деле являются одной и той же сессией.
Далее (главный трюк) при обращении к базе я запрашиваю не весь объект целиком, а
только нужную информацию (id, question_id, view_count), количество объектов считаю
через функцию func.count(QuestionViewHistory.id). В результате, как я понимаю, у меня
нет объектов в сессии для отслеживания.
Все операции на запись добавляю в wiriter_session.
Сейчас скорость выполнения функции не зависит от количества данных (от номера прохода),
все же остается низкой. Буду думать над улучшением алгоритма.
П.С. Как я понимаю, в исходном коде была допущена ошибка в работе с «окном»: в цикле
мы меняем таблицу (обновляем counted), а потом сдвигаем окно, чем пропускаем часть
записей. Как я понимаю, надо все время начинать с нулевой позиции. То есть:
all_questions = query.offset(0).limit(frame_size).all()