Расширяемость
Асинхронная обработка
16
Авторские права
© Postgres Professional, 2017–2024
Авторы: Егор Рогов, Павел Лузанов, Илья Баштанов, Игорь Гнатюк
Фото: Олег Бартунов (монастырь Пху и пик Бхрикути, Непал)
Использование материалов курса
Некоммерческое использование материалов курса (презентации,
демонстрации) разрешается без ограничений. Коммерческое
использование возможно только с письменного разрешения компании
Postgres Professional. Запрещается внесение изменений в материалы
курса.
Обратная связь
Отзывы, замечания и предложения направляйте по адресу:
Отказ от ответственности
Компания Postgres Professional не несет никакой ответственности за
любые повреждения и убытки, включая потерю дохода, нанесенные
прямым или непрямым, специальным или случайным использованием
материалов курса. Компания Postgres Professional не предоставляет
каких-либо гарантий на материалы курса. Материалы курса
предоставляются на основе принципа «как есть» и компания Postgres
Professional не обязана предоставлять сопровождение, поддержку,
обновления, расширения и изменения.
2
Темы
Зачем нужна асинхронная обработка данных
Доступные решения
Реализация очереди средствами PostgreSQL
3
Асинхронная обработка
Разнесение во времени возникновения события
и его обработки
Соображения производительности
клиенту не требуется ждать ответа
возможность управлять ресурсами для обработки
Реализация
очередь сообщений
более сложный вариант: модель публикация — подписка
Идея асинхронной обработки событий состоит в том, что
возникновение события и его обработка разносятся во времени.
Например, пользователь хочет получить детализацию расходов на
мобильную связь. Детализация формируется несколько минут. Можно
показать пользователю «песочные часы» и заставить его ждать
(синхронная обработка), а можно выслать детализацию по электронной
почте, когда она будет готова (асинхронная обработка).
Другой пример: интеграция двух систем. Первая система обращается
ко второй, передавая пакет сообщений. Обработка одного сообщения
занимает несколько секунд, но в пакете может оказаться тысяча
сообщений. Можно заставить первую систему ожидать получения
результата (синхронная обработка), а можно ответить «работаем»,
обработать сообщения асинхронно и уже затем сообщить результат.
Асинхронная обработка сложнее синхронной, но часто оказывается
очень удобной. Она позволяет работать эффективнее (клиенту не надо
простаивать, дожидаясь ответа) и управлять ресурсами (обрабатывать
события с удобной скоростью и в удобное время, а не немедленно).
(Асинхронная обработка широко применяется и в ядре PostgreSQL.
Вспомните режим асинхронной фиксации; процесс контрольной точки;
процесс автоочистки.)
Обычная реализация состоит в наличии очереди событий (сообщений):
одни процессы создают события, другие — обрабатывают. Возможны
более сложные модели, в которых есть возможность публиковать
события и подписываться на события нужного типа.
4
Внешние системы
RabbitMQ, ActiveMQ, ZeroMQ и т. п.
Плюсы
эти системы работают
следование стандартам (AMQP, JMP, STOMP, MQTT...)
гибкость, масштабирование, производительность
Возможные минусы
отдельная система (включающая отдельную СУБД)
со своими особенностями настройки, администрирования, мониторинга
все сложности построения распределенных систем
(отсутствие глобальных транзакций)
Одним из вариантов реализации очередей событий являются внешние
системы. Названия многих из них традиционно заканчиваются на MQ —
Message Queuing.
Как правило, это большие серьезные системы, обеспечивающие
гибкость, масштабируемость, высокую производительность и прочие
полезные свойства. К тому же они реализуют один или несколько
стандартных протоколов работы с сообщениями, что позволяет
интегрировать их с другими системами, понимающими те же протоколы.
Но надо понимать, что любая большая система потребует серьезных
затрат на ее изучение и внедрение. Потребуется разобраться
с особенностями настройки, администрирования, мониторинга.
Заметим, что в состав систем работы с очередями входит и отдельная
СУБД для надежного хранения очередей.
Кроме того, использование внешней системы приводит ко всем
сложностям построения распределенных систем. При отсутствии
глобальных транзакций, объединяющих разные системы, возможны
случаи потери сообщений в результате сбоев.
5
Очередь внутри базы: PgQ
Плюсы
система давно на рынке и широко используется
Возможные минусы
мало гибкости, например, исключительно пакетная обработка
плохо документирована
внешняя программа-демон
Более простым решением может служить реализация очереди
в самой СУБД. Особенно это имеет смысл, если события возникают
и обрабатываются на сервере баз данных.
Наиболее известна система PgQ, разработанная в свое время
компанией Skype (https://github.com/pgq). Эта система достаточно
широко используется и про нее известно, что она работает. Если
требуется готовое решение, то ей можно и воспользоваться.
Поддерживаются версии PostgreSQL 10+.
Из минусов этого решения отметим:
Недостаточную гибкость. Например, возможна только пакетная
обработка событий. Пока обработчик не пометит пакет, как
полностью обработанный, все события пакета могут быть
доставлены повторно в случае сбоя.
Отсутствие качественной документации (есть описание API:
Необходимость во внешней тносительно СУБД) программе,
обеспечивающей работу очереди.
6
Очередь внутри базы: pgmq
Плюсы
система активно развивается
легковесная реализация
все компоненты внутри базы
Возможные минусы
недостаточная гибкость в управлении на низком уровне
слабо документирована
только исходные коды, необходимость сборки
Решение заявлено компанией Tembo как легковесная реализация
очередей. Расширение pgmq написано на Rust и использует
инфраструктуру сборки расширений pgrx.
Очереди и метаинформация представлены таблицами в базе данных,
а элементы очереди («сообщения») — записями в таблицах.
Сообщения можно помещать в очередь (в том числе по несколько
сразу), читать из очереди, удалять и архивировать. Для выполнения
этих основных действий (и еще ряда дополнительных) в расширении
предусмотрен набор функций. Детали реализации очереди скрыты
от пользователя, что упрощает работу.
Из минусов pgmq отметим следующие:
Недостаточная гибкость в управлении на низком уровне. Связано
с тем, что расширение pgmq базируется на фреймворке pgrx,
в который встроено, в том числе, управление соединениями
с сервером СУБД. Повлиять на него средствами уже собранного
pgmq нельзя.
Документация недостаточно подробна.
В репозитории отсутствуют готовые пакеты, а сборка расширения
может оказаться нетривиальной и ресурсоемкой.
8
Очередь своими руками
Возможные плюсы
не требуются внешние зависимости
простые требования — простая реализация
Минусы
требуется отладка и тестирование
при усложнении требований готовая система может обойтись дешевле
Для решения простой задачи, требующей асинхронной обработки,
использование сторонних систем может оказаться невыгодным.
Возможно, проще написать собственную реализацию, чем
приспосабливаться к особенностям сторонней системы.
Конечно, нужно понимать, что:
реализация должна быть сделана аккуратно, иначе она может
привести к проблемам эксплуатации;
если к системе очередей предъявляются серьезные требования
(или есть шанс, что такие требования появятся в будущем),
то развитие, тестирование и поддержка собственного решения,
наоборот, может оказаться невыгодной.
Далее мы посмотрим, как реализовать очередь сообщений
в PostgreSQL своими руками, и какие подводные камни есть на этом
пути.
10
Вспоминаем про горизонт
Что получилось: одна большая транзакция
take_message();
-- обработка
complete_message();
take_message();
-- обработка
complete_message();
take_message();
-- обработка
complete_message();
COMMIT;
Показанное решение имеет существенный недостаток: вся обработка
выполняется в одной длинной транзакции. Вспоминая темы
«Многоверсионность» и «Очистка» модуля «Архитектура», можно
с уверенностью сказать, что обработка очереди будет мешать
нормальной работе очистки.
11
Вспоминаем про горизонт
Что надо: каждое событие в отдельной транзакции
take_message();
-- обработка
complete_message();
COMMIT;
take_message();
-- обработка
complete_message();
COMMIT;
take_message();
-- обработка
complete_message();
COMMIT;
Чтобы таких проблем не возникало, надо раздробить длинную
транзакцию на несколько более коротких. В нашем случае
обрабатывать каждое событие в собственной транзакции.
12
Вспоминаем про горизонт
Еще лучше: позволить обработке события состоять
из нескольких транзакций
take_message();
COMMIT;
-- обработка
complete_message();
COMMIT;
take_message();
COMMIT;
-- обработка
COMMIT;
-- обработка
COMMIT;
take_message();
COMMIT;
-- обработка
COMMIT;
-- обработка
complete_message();
COMMIT;
-- обработка
complete_message();
COMMIT;
Более того, обработка одного события тоже может (в принципе)
разбиваться на несколько транзакций.
В таком случае мы сначала фиксируем изменение статуса события
в очереди («в работе»), затем выполняем обработку, а в конце
фиксируем факт завершения работы с событием (например, удаляем
его из таблицы).
14
Чего не хватает
Зависшие сообщения
остаются в статусе «в работе» при аварийном завершении обработчика
Решение
проверять существование процесса-обработчика, указанного в таблице
при отсутствии возвращать сообщение в статус «новый» (возможно)
take_message();
COMMIT;
-- обработка
Чего не хватает в нашей реализации?
Во-первых, обработчик может завершиться аварийно. Если мы
фиксируем изменение статуса обработки, то событие «повиснет»
в статусе «в работе» и не будет больше обрабатываться.
В нашей реализации мы уже сделали шаг в нужную сторону: в таблице
сохраняется номер обслуживающего процесса (pid), который взял
событие в работу. Можно написать простую проверку: если pid имеется
в таблице, но процесса с таким номером нет в системе — значит,
произошел сбой.
Что делать в таком случае? Если обработка события выполнялась
в одной транзакции, то она была прервана и, следовательно, можно
безопасно вернуть событие в статус «новое» — оно будет обработано
повторно.
Если же обработка делится на несколько транзакций, надо быть
уверенным в том, что обработку можно запускать повторно.
15
Чего не хватает
Корректная обработка исключительных ситуаций
Сохранение результатов обработки
Решение
не удалять обработанные сообщения, а помечать отдельным статусом
потребуется правильный индекс
потребуется периодическое удаление исторических данных
обращаем внимание на очистку
Во-вторых, наша реализация никак не обрабатывает исключительные
ситуации. Это, конечно, несложно добавить. При возникновении
исключения хотелось бы иметь информацию о том, что случилось.
Да и если событие обработано без ошибок, может быть полезным
сохранять какую-то информацию об обработке. Это, конечно, зависит
от конкретной задачи.
Наша реализация удаляет обработанные события из очереди, но
вместо этого можно оставлять их, помечая специальным статусом
(«завершено», «ошибка» и т. п.). Тогда всю информацию об обработке
можно иметь непосредственно в таблице с событиями. В таком случае
потребуется эффективный доступ к еще не обработанным сообщениям:
частичный индекс с условием pid IS NULL. (Другим решением может
быть перенос обработанных событий в отдельную таблицу.)
За удобство потребуется платить реализацией периодического
удаления «хвоста» очереди — исторических данных. Если период
достаточно большой, то, возможно, удаление надо выполнять
пакетами — чтобы не допускать лишнего разрастания таблицы
и не мешать очистке.
И, поскольку таблица очередей изменяется довольно активно,
надо обеспечить ее своевременную очистку, о чем говорилось
в демонстрации.
16
Итоги
Асинхронная обработка полезна во многих случаях
Внешние системы имеет смысл использовать, если
вписываются в общую архитектуру информационной системы
к реализации предъявляются серьезные требования
Очередь сообщений в базе данных —
простое решение для простых задач
важна правильная реализация:
эффективное получение очередного события (SKIP LOCKED),
избегание долгих транзакций,
своевременная очистка таблицы очереди
чем больше требований, тем сложнее будет реализация
17
Практика
1. В приложении предусмотрен механизм фоновых заданий,
но серверная часть обработки очереди отсутствует.
Напишите недостающие функции:
- take_task — получает очередное задание из очереди;
- complete_task — завершает обработку задания;
- process_tasks — основной цикл обработки заданий.
2. Запустите процедуру обработки очереди заданий в фоновом
режиме. Проверьте, что фоновые задания, поставленные
в очередь в приложении, выполняются, а результаты их
работы доступны для просмотра.
1. Фоновые задания позволяют запустить специально
зарегистрированную функцию из пользовательского интерфейса
и затем просматривать состояние и результат выполнения.
В качестве результата функция может возвращать множество строк,
то есть в простейшем виде тело функции может состоять из одного
SQL-запроса. На вход функция должна принимать один параметр типа
jsonb. Пример задания: public.greeting_program.
Напишите подпрограммы take_task, complete_task и process_tasks
по аналогии с показанными в демонстрации примерами. Учтите:
take_task должна возвращать задачу в статусе «scheduled»
и заполнить подходящие поля таблицы tasks:
started = текущее время, status = «running», pid = номер процесса.
complete_task должна не удалять задание, а заполнить поля tasks:
finished = текущее время,
при нормальном завершении: status = «finished», result = результат,
в случае ошибки: status = «error», result = сообщение об ошибке.
process_tasks не должна завершаться; организуйте бесконечный
цикл с задержкой в 1 сек между задачами. Убедитесь, что в режиме
ожидания не возникает долгой транзакции. Для удобства установите
параметр application_name в значение «process_tasks».
Для фактического выполнения задания процедура должна вызвать
функцию empapi.run(task tasks). В случае успешного выполнения
функция вернет результат, оформленный в виде текстовой строки.
В случае ошибки будет сгенерировано исключение.
18
Практика+
1. Напишите тест, проверяющий, что обработка очереди,
показанная в демонстрации, работает корректно при
выполнении в несколько потоков.
Убедитесь, что тест не проходит, если убрать предложение
FOR UPDATE SKIP LOCKED.
2. Добавьте в реализацию проверку «зависших» сообщений.
Если такая ситуация будет обнаружена, зависшее сообщение
должно быть снова принято в работу.
1. Вставьте в таблицу сообщений большое количество строк
и проверьте, что:
а) было обработано каждое сообщение;
б) каждое сообщение было обработано ровно один раз.
Уберите из реализации секундную задержку (имитацию работы), чтобы
тест выполнялся быстрее и с достаточным уровнем конкурентности
между процессами.