Каким образом правильно работать с большим количеством записей с SQLAlchemy?
У меня есть две таблички. В первой 5 миллионов записей вида: question_id, view_count, counted. Во второй таблице находятся сумма view_count для каждого уникального question_id. Если мы учли запись из первой таблицы во второй, counted выставляется в истину.
Сейчас это выглядет так:
def update_most_viewed():
query = QuestionViewHistory.query.filter_by(counted=False).distinct()
question_count = query.count()
frame_size = 1000
counter = 0
while counter <= question_count:
all_questions = query.offset(counter*frame_size).limit(frame_size).all()
counter = counter + frame_size
for question in all_questions:
most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
if most_viewed_question is None:
most_viewed_question = MostViewedQuestion(question.question_id, question.view_count)
db.session.add(most_viewed_question)
else:
most_viewed_question.view_count += question.view_count
question.counted = True
db.session.commit()
Вызываю функцию из консоли. Инициализация:
app = Flask(__name__)
db = SQLAlchemy(app)
Проблема в том, что с каждым проходом время растет экспоненциально: после пятого прохода все зависает. Если запустить программу повторно, все повторяется один в один.
На сколько я понимаю, проблема в том, что при каждом вызове commit, SQLAlchemy обновляет все атрибуты всех объектов в сессии, но способа как это поправить, к сожалению, не нашел.
Обновление
Классы моделей, которые фигурируют в запросе.
class MostViewedQuestion(db.Model):
__tablename__ = 'most_viewed_question'
id = db.Column(db.Integer, primary_key=True)
question_id = db.Column(db.Integer)
view_count = db.Column(db.Integer)
is_associated = db.Column(db.Boolean)
can_be_associated = db.Column(db.Boolean)
title = db.Column(db.String(500))
body = db.Column(db.String(30000))
tags = db.Column(db.String(500))
last_update_date = db.Column(db.DateTime)
def __init__(self, question_id, view_count, is_associated=False):
self.question_id = question_id
self.view_count = view_count
self.is_associated = is_associated
self.can_be_associated = True
self.last_update_date = datetime.datetime.now()
def __repr__(self):
return '
class QuestionViewHistory(db.Model):
__tablename__ = 'question_view_history'
id = db.Column(db.Integer, primary_key=True)
question_id = db.Column(db.Integer)
view_count = db.Column(db.Integer)
view_date = db.Column(db.DateTime)
counted = db.Column(db.Boolean)
def __init__(self, question_id, view_count, view_date):
self.question_id = question_id
self.view_count = view_count
self.view_date = view_date
self.counted = False
def __repr__(self):
return '
Код всего проекта доступен на GitHub, все модели находятся в файле models.py, функция update_most_viewed в файле database.py. В папке cvs_data_ru данные для тестов.
Ответ
Стоит начать с того, как делать не нужно. Например, не нужно перебирать объекты в базе по-одному:
for question in all_questions:
most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
этот цикл - совсем нет-нет-нет. Такого поведения нужно избегать любой ценой - НАМНОГО лучше запросить сразу миллион строк, чем миллион раз по одной строке. Если нужно получить все объекты MostViewedQuestion, то лучше это сделать одним запросом:
most_viewed_questions = MostViewedQuestion.query.filter_by(question_id.in_=questions)
В таком случае отпадает нужда и во внешнем цикле while, потому что по-прежнему, лучше запросить один раз миллион, чем тысячу раз по тысяче. После такого запроса БД вернет те most_viewed_questions, для которых есть соответствующая запись. Встает вопрос: что делать с теми, у которых такой записи нет? Такие запросы выполняются в БД очень часто и часто их называют UPSERT (UPDATE + INSERT) - нужно одновременно и обновить какую-то запись, а если ее нет, то создать ее. Все, что нужно - это выполнить вот этот вот upsert средствами sqlalchemy. Состоять запрос будет из двух подзапросов - один обновит существующие записи (update), другой создаст новые (insert).
UPDATE в целом довольно прямолинейный:
from sqlalchemy import not_, select, exists
update_query = MostViewedQuestion.__table__.update().values(
view_count=MostViewedQuestion.view_count + QuestionViewHistory.view_count
).where(and_(
MostViewedQuestion.question_id == QuestionViewHistory.question_id,
QuestionViewHistory.counted == True
))
Оно генерирует вот такой SQL:
UPDATE most_viewed_question SET view_count=(most_viewed_question.view_count + question_view_history.view_count)
FROM question_view_history
WHERE most_viewed_question.question_id = question_view_history.question_id
AND question_view_history.counted = true
Я использовал запись MostViewedQuestion.__table__, потому что мои модели наследуются от declarative_base(), а методы update(), delete(), insert() есть у класса Table(у Base их нет). Для declarative_base сама таблица находится в поле __table__
INSERT немного более запутанный, но самая мякотка - from_select(), который генерирует INSERT ... FROM SELECT
insert_query = MostViewedQuestion.__table__.insert().\
from_select([MostViewedQuestion.question_id, MostViewedQuestion.view_count],
select([QuestionViewHistory.question_id, QuestionViewHistory.view_count]).
where(and_(not_(exists([MostViewedQuestion.question_id]).where(MostViewedQuestion.question_id == QuestionViewHistory.question_id)
), # WHERE ... AND ...
QuestionViewHistory.counted == True))
)
SESSION.execute(update_query)
SESSION.execute(insert_query)
SESSION.commit()
SQL:
INSERT INTO most_viewed_question (question_id, view_count)
SELECT question_view_history.question_id, question_view_history.view_count
FROM question_view_history
WHERE NOT (EXISTS (
SELECT most_viewed_question.question_id
FROM most_viewed_question
WHERE most_viewed_question.question_id = question_view_history.question_id))
AND question_view_history.counted = true
Я бы не сказал, что данный запрос - образец скорости, но самое главное в этих запросах - так это то, что работает БД. Питонский код в это время просто ждет ответа от БД и нам вообще не надо думать об оптимизации питоновского кода. Не нужно ломать голову об устройстве SQLAlchemy. Зато стоит подумать об оптимизации SQL, но с этим несколько легче, потому что UPSERT - операция типичная и по ней много всего написано. Но это не повод расслабляться, потому что при обновлении/вставке большого количества записей в БД есть свои нюансы (например, раздувание таблиц (table bloating) или индексы/триггеры, которые тормозят процесс и перед массовой вставкой их выключают).
Комментариев нет:
Отправить комментарий