Страницы

Поиск по вопросам

вторник, 20 ноября 2018 г.

Работа с большим количеством записей в сессии

Каким образом правильно работать с большим количеством записей с SQLAlchemy?
У меня есть две таблички. В первой 5 миллионов записей вида: question_id, view_count, counted. Во второй таблице находятся сумма view_count для каждого уникального question_id. Если мы учли запись из первой таблицы во второй, counted выставляется в истину.
Сейчас это выглядет так:
def update_most_viewed(): query = QuestionViewHistory.query.filter_by(counted=False).distinct() question_count = query.count() frame_size = 1000 counter = 0
while counter <= question_count: all_questions = query.offset(counter*frame_size).limit(frame_size).all() counter = counter + frame_size
for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
if most_viewed_question is None: most_viewed_question = MostViewedQuestion(question.question_id, question.view_count) db.session.add(most_viewed_question) else: most_viewed_question.view_count += question.view_count
question.counted = True
db.session.commit()
Вызываю функцию из консоли. Инициализация:
app = Flask(__name__) db = SQLAlchemy(app)
Проблема в том, что с каждым проходом время растет экспоненциально: после пятого прохода все зависает. Если запустить программу повторно, все повторяется один в один.
На сколько я понимаю, проблема в том, что при каждом вызове commit, SQLAlchemy обновляет все атрибуты всех объектов в сессии, но способа как это поправить, к сожалению, не нашел.
Обновление
Классы моделей, которые фигурируют в запросе.
class MostViewedQuestion(db.Model): __tablename__ = 'most_viewed_question'
id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) is_associated = db.Column(db.Boolean) can_be_associated = db.Column(db.Boolean) title = db.Column(db.String(500)) body = db.Column(db.String(30000)) tags = db.Column(db.String(500)) last_update_date = db.Column(db.DateTime)
def __init__(self, question_id, view_count, is_associated=False): self.question_id = question_id self.view_count = view_count self.is_associated = is_associated self.can_be_associated = True self.last_update_date = datetime.datetime.now()
def __repr__(self): return '' % str(self.id)
class QuestionViewHistory(db.Model): __tablename__ = 'question_view_history'
id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) view_date = db.Column(db.DateTime) counted = db.Column(db.Boolean)
def __init__(self, question_id, view_count, view_date): self.question_id = question_id self.view_count = view_count self.view_date = view_date self.counted = False
def __repr__(self): return '' % str(self.id)
Код всего проекта доступен на GitHub, все модели находятся в файле models.py, функция update_most_viewed в файле database.py. В папке cvs_data_ru данные для тестов.


Ответ

Стоит начать с того, как делать не нужно. Например, не нужно перебирать объекты в базе по-одному:
for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first()
этот цикл - совсем нет-нет-нет. Такого поведения нужно избегать любой ценой - НАМНОГО лучше запросить сразу миллион строк, чем миллион раз по одной строке. Если нужно получить все объекты MostViewedQuestion, то лучше это сделать одним запросом:
most_viewed_questions = MostViewedQuestion.query.filter_by(question_id.in_=questions)
В таком случае отпадает нужда и во внешнем цикле while, потому что по-прежнему, лучше запросить один раз миллион, чем тысячу раз по тысяче. После такого запроса БД вернет те most_viewed_questions, для которых есть соответствующая запись. Встает вопрос: что делать с теми, у которых такой записи нет? Такие запросы выполняются в БД очень часто и часто их называют UPSERT (UPDATE + INSERT) - нужно одновременно и обновить какую-то запись, а если ее нет, то создать ее. Все, что нужно - это выполнить вот этот вот upsert средствами sqlalchemy. Состоять запрос будет из двух подзапросов - один обновит существующие записи (update), другой создаст новые (insert).
UPDATE в целом довольно прямолинейный:
from sqlalchemy import not_, select, exists
update_query = MostViewedQuestion.__table__.update().values( view_count=MostViewedQuestion.view_count + QuestionViewHistory.view_count ).where(and_( MostViewedQuestion.question_id == QuestionViewHistory.question_id, QuestionViewHistory.counted == True ))
Оно генерирует вот такой SQL:
UPDATE most_viewed_question SET view_count=(most_viewed_question.view_count + question_view_history.view_count) FROM question_view_history WHERE most_viewed_question.question_id = question_view_history.question_id AND question_view_history.counted = true
Я использовал запись MostViewedQuestion.__table__, потому что мои модели наследуются от declarative_base(), а методы update(), delete(), insert() есть у класса Table(у Base их нет). Для declarative_base сама таблица находится в поле __table__
INSERT немного более запутанный, но самая мякотка - from_select(), который генерирует INSERT ... FROM SELECT
insert_query = MostViewedQuestion.__table__.insert().\ from_select([MostViewedQuestion.question_id, MostViewedQuestion.view_count], select([QuestionViewHistory.question_id, QuestionViewHistory.view_count]). where(and_(not_(exists([MostViewedQuestion.question_id]).where(MostViewedQuestion.question_id == QuestionViewHistory.question_id) ), # WHERE ... AND ... QuestionViewHistory.counted == True)) )
SESSION.execute(update_query) SESSION.execute(insert_query) SESSION.commit()
SQL:
INSERT INTO most_viewed_question (question_id, view_count) SELECT question_view_history.question_id, question_view_history.view_count FROM question_view_history WHERE NOT (EXISTS ( SELECT most_viewed_question.question_id FROM most_viewed_question WHERE most_viewed_question.question_id = question_view_history.question_id)) AND question_view_history.counted = true
Я бы не сказал, что данный запрос - образец скорости, но самое главное в этих запросах - так это то, что работает БД. Питонский код в это время просто ждет ответа от БД и нам вообще не надо думать об оптимизации питоновского кода. Не нужно ломать голову об устройстве SQLAlchemy. Зато стоит подумать об оптимизации SQL, но с этим несколько легче, потому что UPSERT - операция типичная и по ней много всего написано. Но это не повод расслабляться, потому что при обновлении/вставке большого количества записей в БД есть свои нюансы (например, раздувание таблиц (table bloating) или индексы/триггеры, которые тормозят процесс и перед массовой вставкой их выключают).

Комментариев нет:

Отправить комментарий