Очереди сообщений
При работе с большими системами, которые состоят из множества компонентов, возникает вопрос: «Как интегрировать несколько приложений для работы друг с другом?». Для этого нужен надёжный, асинхронный и быстрый способ передачи данных, который также позволит передавать информацию в разных форматах.

Все эти свойства есть в системе обмена сообщениями. Они полезны когда:
- Нужно отправить данные из точки А в точку Б.
- Нужно интегрировать несколько систем.
- Нужно масштабирование.
- Нужна возможность мониторить потоки данных.
- Нужна асинхронная обработка.
- Нужна буферизация.
Концепт обмена сообщениями
Обмен сообщениями неновое изобретение. Он активно используется внутри операционной системы для обмена информацией между несколькими процессами (inter-process communication) и для обмена между несколькими потоками внутри процесса (inter-thread communication).
Самый простой способ представить очередь сообщений как длинную трубу в которую можно помещать шарики. Мы можем написать сообщение на шаре и закинуть его в трубу и кто-то получит его на другой стороне. Из такой концепции можно понять несколько вещей.
- Отправитель не имеет представления о том кто именно примет сообщение.
- Отправитель не переживает о том когда его сообщение будет получено.
- Отправитель может засовывать сколько угодно сообщений с удобной ему скорость.
- Это не повлияет на получателя, ведь он сам решает сколько сообщений ему удобно доставать.
- Ни отправитель, ни получатель не знают об устройстве друг друга.
- Ни отправитель, ни получатель не переживают про нагрузку и вместительность друг друга.
- Также они не знаю о том где каждый из них находиться. Они могут быть в одной комнате, в разных комнатах или зданиях.
Все эти свойства позволяют разделить две системы с точки зрения ответственности, времени, пропускной способности, внутреннего устройства, нагрузки и географии. Само по себе разделение является очень важной частью большой распределенной системы. Чем больше независимых компонентов, тем проще их разрабатывать, тестировать и запускать независимо от других.
Также очереди работают по модели «издатель-подписчик». В ней издатель оправляет сообщение в очередь, а подписчик смотрит в очередь и извлекает интересные ему сообщения, после чего занимается их обработкой. Благодаря этой концепции, осуществляется асинхронная коммуникация.

Причины использовать очереди сообщений
Избыточность и поддержка транзакций
Перед тем как сообщение будет удалено может потребоваться подтверждение, что приложение, которое прочитало сообщение, успешно его обработало. В случае возникновения проблем, сообщение не потеряется, что позволит повторно его обработать.
Масштабирование
Очереди распределяют процессы обработки информации. Это дает возможность гибко реагировать на нагрузку и добавлять или убирать дополнительные обработчики.
Эластичность
Очереди сообщений позволяют выровнять нагрузку на приложение за счет буферизации данных. В случае большой нагрузки на систему очередь будет накапливать сообщения, а сами обработчики будут работать в нормальном режиме самым облегчая обработку данных и не допуская отказа системы.
Отказоустойчивость
В случае отказа обработчиков, очередь продолжит сохранять все сообщения, что позволит обработать их когда система поднимется, тем самым не потеряв никаких данных.
Decoupling and coupling
С помощью очередей можно достичь двух противоположных целей.
- Decoupling. Если у нас большой монолит, то будет сложно интегрировать новые фичи. В таком случае очередь сообщений позволяет разъединить одно приложения на несколько независимых компонентов и настроить коммуникацию между ними.
- Coupling. Иногда нужно, чтобы несколько систем работали как одно целое, очередь позволяют создать промежуточный слой для коммуникации между разными элементами системы.
Понимание потоков данных
По сути, очереди накладывают ограничения на передачу данных между компонентами, что позволяет очень легко мониторить как обрабатываются данные. Благодаря этой информации можно понять как работает система и ее компоненты. Например, некоторые очереди могут простаивать, а некоторые всегда забиты, тем самым показывая где у нас есть проблемы с обработкой.
Гарантированная доставка
Гарантированная доставка является одной из ключевых характеристик системы обмена сообщений. Всего есть три типа доставки:
- at least once
- at most once
- exactly once
At least once
Самый простой способ доставки при котором обработчик получает одно и то же сообщение до тех пор, пока не удалит его из очереди или не подтвердит получение. Это значит что возможны ситуации, когда приложение обрабатывает одно сообщение несколько раз.
Такая особенность подразумевает, что обработчик будет корректно работать в случае дублирования сообщений. А такое может случаться часто. Например, упала сеть в момент, когда приложение подтверждало получение — оно получит его ещё раз.
Если в сообщениях хранятся результаты каких-то измерений, у которых есть временная метка, то ничего страшного не произойдёт, если мы получим хоть миллион дубликатов. Но если в сообщениях храниться информация о финансовых транзакциях то будет совсем не круто обработать его дважды. Но такую ситуацию можно решить используя уникальные идентификаторы для сообщений и хранить список сообщений, которые мы уже обработали.
С другой стороны, такой подход гарантирует 100% получение сообщения. Даже если получатель сообщения упадёт, до того как подтвердит обработку, то он просто ещё раз его обработает после перезапуска.
At most once
Очень редкий сценарий, к нему прибегают когда двойная обработка сообщения может привести к серьезным проблемам. В таких очередях мы предпочтем потерять сообщение чем обработать его дважды.
Если приложение упадет во время обработки сообщения, то мы его повторно не получим и оно будет считаться утерянным.
Exactly Once
Идеальный сценарий работы очереди, но проблема в том что его нереально воплотить в жизнь из-за множества проблема, которые сами по себе решить сложно, не говоря уже об их совокупности.
Все они происходят из двух утверждений:
- Отправители и получатели не идеальны
- Сеть не идеальна.
Что порождает такие проблемы как:
- Отправитель может забыть отправить сообщение
- Сеть между отправителем и очередью может упасть
- Сеть между очередью и получателем может упасть
- База данных самой очереди может не сохранить сообщение
- Подтверждение что сообщение обработано может не дойти до очереди и отправителя
Именно поэтому очень сложно гарантировать одноразовую доставку сообщения. Намного проще сделать систему устойчивой к дубликатам сообщений и использовать подход at-least-once.
Компоненты очереди сообщений
Система обмена сообщениями состоит из нескольких компонентов, которые позволяют ей нормально функционировать и выполнять свои задачи.

Сообщения — любые данные, которые нужно передать через очередь конвертируются в сообщение, которые состоят из двух частей:
- Заголовки (headers) — в них расположена служебная информация, которая используется самой очередью для правильной обработки сообщения.
- Тело (body) — информация, которую мы передаем с помощью очереди.
Каналы (channels) — логические соединения между приложениями и системой очередей, которые предоставляют изолированную коммуникацию. Каналы позволяют передавать сообщение в одном из двух режимов:
- point-to-point — протокол, который обеспечивает прямую коммуникацию между двумя приложениями.
- publish-subscribe — протокол, в котором отправитель сообщения не знает конкретного получателя, а просто отправляет сообщение в очередь, на которую могут быть подписаны потребители.
Маршрутизатор (router) — помещает сообщения из каналов по разным очередям используя ключ маршрутизации из заголовков сообщения.
Очередь (queue) — хранилище для наших сообщений, которое может находиться как в оперативной памяти так и на диске.
Виды очередей
Очереди сообщений можно разделить на несколько видов. Обычно несколько видом могут быть реализованы в одном продукте.
broadcast (fanout) exchange
В таком случае отправляется копия сообщения во все доступные очереди, ключ маршрутизации игнорируется.

direct exchange
Все сообщения имеют свой ключ маршрутизации, который определяет в какую очередь нужно положить сообщение. В дальнейшем сообщения будут переданы по принципу Round Robin подписанным обработчикам. Это значит что только один обработчик получит сообщение.

topic exchange (multicast)
Такие очереди подписаны на получение сообщений чей ключ подпадает под определенный паттерн. Если ключ маршрутизации подходит для нескольких очередей, то каждая получит по своей копии.
Обычно ключи маршрутизации стараются делать в иерархическом виде. Это достигается за счет разделения логических частей (слов) точками. Например вот так:
[region].[availability-zone].[service].[instance] eu-east.az1.computer.homepc
Сами же паттерны создаются с использованием специальных символов:
- * — заменитель только для одного слова.
- # — заменитель для нескольких слов
Что позволяет сделать такие шаблоны:
- *.*.computer.* — очередь, которая может обработать сообщения только от компьютеров без разницы где они находятся.
- eu-east.# — очередь, которая может обрабатывать сообщения только из зоны eu-east

Протоколы
Исторически сложилось так, что почти все существующие протоколы для работы с очередями были проприетарными. Это накладывало массу ограничений, потому что не все языки поддерживали тот или иной протокол.
Спустя какое-то время появились три открытых стандарта, которые сейчас повсеместно используются:
- Advanced Message Queuing Protocol (AMQP) — бинарный протокол, который проектировался для взаимодействия между различными вендорами и стал заменой существующих проприетарных протоколов. Основными особенностями AMQP является надежности и совместимость.
- Streaming Text Oriented Messaging Protocol (STOMP) — простой текстовый протокол обмена сообщениями, который очень похож на HTTP и работает поверх TCP.
- MQTT (formerly MQ Telemetry Transport) — очень простой и легковесный протокол, который разрабатывался для минимального использования трафика и работы в нестабильной сети. Все эти качества идеально подошли для использования протокола для общения между устройствами.
Материалы
- Message queues — отличная статья, которая описывает основные концепты работы очередей.
- The Big Little Guide to Message Queues — большой гайд в котором описано про причины создания очередей их свойства и особенности работы, а также кратный разбор самых популярных реализаций.
Очереди сообщений
![]()
Очередь сообщений – это форма асинхронного обмена информацией между сервисами, применяемая в бессерверных и микросервисных архитектурах. Сообщения хранятся в очереди, пока не будут обработаны и удалены. Каждое сообщение обрабатывается только один раз и только одним потребителем. Очереди сообщений могут использоваться для разделения сложных процессов обработки, для буферизации или организации пакетной обработки, а также для сглаживания пиковых нагрузок.
Ниже приведены несколько ресурсов, которые помогут вам лучше понять очереди сообщений в целом. Чтобы узнать об очередях сообщений в AWS, посетите веб-сайт Простого сервиса очередей Amazon (SQS).
![]()
Основные сведения об очередях сообщений
В современной облачной архитектуре приложения разделяют на небольшие независимые элементы, которые проще разрабатывать, развертывать и обслуживать. Очереди сообщений обеспечивают для таких распределенных приложений возможность взаимодействия и координации. Очереди сообщений могут значительно упростить написание кода приложений с разделенными компонентами, а также повысить их производительность, надежность и масштабируемость.
С помощью очередей сообщений различные части системы могут обмениваться информацией и обрабатывать операции асинхронно. Очередь сообщений состоит из простого буфера, в котором временно хранятся сообщения, и адресов, позволяющих программным компонентам подключаться к очереди для отправки и получения сообщений. Сообщения обычно небольшие и могут представлять собой запросы, ответы, сообщения об ошибках или просто информацию. Чтобы отправить сообщение, компонент, называемый источником, добавляет сообщение в очередь. Сообщение хранится в очереди до тех пор, пока другой компонент, называемый получателем, не получит сообщение и не сделает с ним что-то.

Многие источники и получатели могут использовать одну очередь, но каждое сообщение обрабатывается одним получателем только один раз. Поэтому такой шаблон обмена сообщениями часто называют обменом информацией «один к одному» или «точка-точка». Когда сообщение должно обрабатываться несколькими получателями, очереди сообщений можно сочетать с моделью отправки сообщений «издатель-подписчик» в шаблоне проектирования распространения. Дополнительные сведения об обмене сообщениями «издатель-подписчик» на AWS приведены в разделе Что такое обмен сообщениями «издатель-подписчик»? и на веб-сайте Простого сервиса уведомлений Amazon (SNS).
Очереди сообщений в микросервисной архитектуре
Как известно, схемы синхронного и асинхронного взаимодействия на основе REST API имеют свои недостатки. Чтобы эти недостатки нивелировать, существуют очереди сообщений — Message Queues. Поговорим о принципах их работы.
Очереди предоставляют буфер, обеспечивающий временное хранение сообщений, и конечные точки, позволяющие в свою очередь подключаться к очереди в целях отправки/получения сообщений в асинхронном режиме. В таких сообщениях могут быть: — запросы; — ответы; — ошибки; — прочие данные, которые передаются между программными компонентами.
В очередях сообщений существует компонент, который называют производителем — Producer. Он служи для добавления сообщения в очередь, где оно станет храниться до тех пор, пока другой компонент с именем потребитель (Consumer) не извлечет это сообщение и не выполнит с ним нужную операцию.
Вот как можно представить очередь сообщений:

На практике очереди могут поддерживать получение сообщений и посредством метода Push, и посредством метода Pull. При этом: • в случае с Pull подразумевается периодический опрос очереди получателем на предмет наличия новых сообщений; • в случае с Push подразумевается отправка уведомления получателю в случае прихода сообщения. Также здесь реализуется модель «Издатель/Подписчик» (Publisher/Subscriber).
Как известно, очереди могут работать одновременно с несколькими производителями и потребителями. По этой причине очереди, как правило, реализуют посредством дополнительной системы, которую называют брокер.
Message Broker — это брокер сообщений, который занимается как сбором, так и маршрутизацией сообщений, используя для этого предопределенную логику. При этом сообщения можно передавать с некоторым ключом — как раз таки по этому ключу брокер и понимает, в какую именно очередь (в одну либо в несколько) должно попасть нужное сообщение.
Давайте приведем пример, связанный с отправкой рецензии (отзыва) на сайте. Существует часть сервиса, к которой обращается пользователь. Эта часть выступает в роли производителя, который направляет запросы на создание отзывов в очередь сообщений. В момент добавления сообщения в очередь юзеру можно сразу направлять уведомление, что операция прошла успешно. В результате вся последующая логика обработки станет выполняться вне зависимости от пользователя.
После окончания обработки потребитель выполнит отправку подтверждения в очередь, в результате чего исходное сообщение удалится. Однако если в процессе обработки произойдет какой-нибудь сбой и подтверждение вовремя получено не будет, сообщение можно повторно извлечь потребителем из очереди.
Вот как выглядит один из вариантов асинхронного взаимодействия, построенный на основе очереди сообщений:

Выводы
Итак, применение очередей сообщений позволит решить две задачи одновременно: — сокращение времени ожидания пользователя благодаря асинхронной обработке; — предотвращение потери информации при сбоях.
Однако не стоит рассматривать очереди в качестве универсального средства для любого вида приложений. Дело в том, что у такого подхода тоже есть и плюсы, и минусы. Но о них мы поговорим в следующей статье, следите за обновлениями!
Автотесты приложений через AMQP
В статье разбираем протокол AMQP, его частную реализацию — RabbitMQ и пишем автотест с помощью PyTest для тестирования очередей сообщений.
Тестирование — одна из самых горячих тем в разработке программного обеспечения. Все согласны с необходимостью качественных проверок и определённого покрытия кода всевозможными тестами. Но как тестировать приложения, работающие не по привычному HTTP протоколу? В статье мы рассмотрим протокол AMQP, его частную реализацию RabbitMQ и протестируем наше простое приложение, разработав автотесты для него.
Андрей Мальчук
Бэкенд разработчик группы частных облаков КРОК.
Введение
AMQP (Advanced Message Queuing Protocol) — открытый протокол прикладного уровня для передачи сообщений между компонентами системы. Идея в том, что отдельные подсистемы или независимые приложения могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.
AMQP основан на трёх понятиях:
Сообщение ( message ) — единица передаваемых данных, основная его часть (содержание) никак не интерпретируется сервером, к сообщению могут быть присоединены структурированные заголовки.
Точка обмена ( exchange ) — в неё отправляются сообщения. Она распределяет сообщения в одну или несколько очередей. При этом в точке обмена сообщения не хранятся.
Очередь ( queue ) — здесь сообщения хранятся до тех пор, пока не будут забраны клиентом. Клиент всегда забирает сообщения из одной или нескольких очередей.
Producer — клиентское приложение, которое публикует сообщения в exchange.
Consumer — клиентское приложение, которое получает сообщения из очереди.
По сравнению с HTTP, у систем, построенных на очередях сообщений AMQP, есть ряд преимуществ и недостатков.
Плюсы HTTP
- Отладка HTTP-запросов проще, чем в AMQP. Подключаться к очереди сообщений в AMQP придётся только через сторонние утилиты, тогда как HTTP можно отлаживать прямо в браузере.
- Это популярный протокол — его используют практически всегда и везде, а значит и понимает его гораздо больше людей.
Плюсы AMQP
- Надёжность доставки сообщений реализуется «из коробки» — значит не нужно об этом волноваться. Сообщение, которое будет отправлено в AMQP-брокер, будет доставлено и обработано одним из обработчиков очередей AMQP.
- Broadcast — функционал, который позволяет уведомить разные компоненты системы в рамках одного сообщения. Таким образом, можно сократить количество отправляемых сообщений в единицу времени.
Коротко о RabbitMQ и RPC
RabbitMQ — это брокер сообщений с открытым исходным кодом. Он маршрутизирует сообщения по всем принципам протокола AMQP. Отправитель передаёт сообщение брокеру, а тот доставляет его получателю. RabbitMQ реализует и дополняет протокол AMQP.
RPC (Remote Procedure Call) — один из шаблонов взаимодействия в распределённых приложениях. Этот протокол позволяет программам вызывать функции и процедуры удалённо таким образом, как будто они представлены локально.
Совмещая RPC и RabbitMQ, мы в итоге получаем отказоустойчивую, распределённую систему для простого вызова функций и последующего агрегирования результатов.
Тестирование AMQP
Существуют разные способы тестирования приложений, основанных на протоколе AMQP. Вот несколько их них:
- Функциональное тестирование;
- Ручное тестирование;
- Автоматизированное тестирование;
- Интеграционное тестирование.
Под функциональным тестированием чаще всего подразумевается непосредственная проверка каждого элемента в тестируемой среде.
Ручное тестирование — проверка всех компонентов или отдельной, в частности, непосредственно человеком, вручную.
Но для автоматизации рутинных действий, проводимых человеком, существует автоматизированное тестирование — когда все действия, проводимые для тестирования системы человеком, описаны процедурно и могут исполняться автоматически.
При тестировании систем важно, чтобы система работала корректно полностью, целиком, в условиях продакшена. Для этого мало провести функциональное тестирование каждой компоненты, необходимо получить результат при тестировании всей системы. Поэтому существует интеграционное тестирование — когда программные модули объединяются и тестируются в группе.
Автотесты на Python
На мой взгляд, самый удобный способ тестировать API и MQ-сервисы — с помощью языка программирования Python, а также нескольких библиотек, о которых расскажу подробнее далее.
Основным инструментом при разработке автотестов будет pytest — библиотека с простым интерфейсом для написания тестов.
Для подключения к RabbitMQ есть множество библиотек, самая распространённая и хорошо документированная — pika.
Устанавливается всё с помощью утилиты pip:
$ pip install pytest pika
Далее, представим, что у нас есть сервис, который работает по протоколу AMQP и непосредственно через RabbitMQ, в интерфейсе которого реализована простая функция, возвращающая последнее число из последовательности Фибоначчи:
import pika def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) response = fib(n) ch.basic_publish( exchange="", routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id ), body=str(response) ) ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection( pika.ConnectionParameters(host="localhost") ) channel = connection.channel() channel.queue_declare(queue="rpc_queue") channel.basic_qos(prefetch_count=1) channel.basic_consume(queue="rpc_queue", on_message_callback=on_request) channel.start_consuming()
Данный сервис подключается к RabbitMQ, создаёт очередь rpc_queue, из которой принимает сообщения, где тело запроса — это число N, от которого нужно вычислить последнее число из последовательности чисел Фибоначчи.
Основная идея при тестировании сервисов, которые работают на внешнем источнике данных (в данном случае, очередь сообщений RabbitMQ) — реализация клиента к данному источнику данных, эмулируя ввод пользовательских данных.
Напишем клиент, который взаимодействует с нашим сервисом с помощью очереди сообщений через RabbitMQ:
import pika import uuid class FibonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host="localhost") ) self.channel = self.connection.channel() result = self.channel.queue_declare(queue="", exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True ) self.response = None self.corr_id = None def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n) ) self.connection.process_data_events(time_limit=None) return int(self.response)
В данном примере:
- Устанавливаем соединение к RabbitMQ и подключаемся к очереди rpc_queue;
- Подписываемся на очередь, в которую возвращаются ответы от сервиса при вызове RPC-методов;
- Реализуем функцию on_response, которая проверяет каждый ответ и сверяет correlation_id с тем, который мы отправили. Если идентификатор ответа совпадает — это ответ конкретного запроса, и функция сохраняет ответ в self.response.
Далее мы реализуем автотесты к нашему сервису. Создадим папку tests и в ней файлы conftest.py и test_fibonacci_rpc.py :
. └── tests ├── conftest.py └── test_fibonacci_rpc.py
Файл conftest.py содержит в себе всевозможные надстройки для pytest . В частности, добавим наш клиент туда для того, чтобы его можно было переиспользовать во всех подтестах, не импортируя вручную. Наш клиент будет иметь свойство Test Fixture — это объект, который можно рассматривать как набор условий, необходимых тесту для выполнения. Например, зачастую фикстуры создаются, чтобы генерировать какие-то данные ещё до теста и возвращать их для использования в тесте.
import pytest # Класс FibonacciRpcClient из примера выше: class FibonacciRpcClient: . @pytest.fixture(scope="module") def fibonacci_rpc(): return FibonacciRpcClient()
Таким образом, наша фикстура fibonacci_rpc может переиспользоваться во всех тестах как аргумент к каждой тестовой функции.
Файлы с префиксом test_ — это файлы, в которых непосредственно реализованы сами тесты. Напишем несколько тестов, которые проверяют реализацию RPC-метода fib с нашими пользовательскими значениями.
Содержимое файла test_fibonacci_rpc.py :
def test_fibonacci(fibonacci_rpc): # Последний элемент последовательности чисел Фибоначчи от N=0 это 0: assert fibonacci_rpc.call(0) == 0 # От N=10 это 55: assert fibonacci_rpc.call(10) == 55 # От N=25 это 75025: assert fibonacci_rpc.call(25) == 75025 def test_non_number(fibonacci_rpc): try: fibonacci_rpc.call("non-number") except Exception: pass else: assert False, "fib must consume only ints"
Функции с префиксом test_ означают, что это тестируемые методы, которые запускаются с помощью pytest .
Наш пример в test_fibonacci реализует проверку метода fib на стороне сервиса с положительными аргументами. Директивная assert проверяет условие, выполняющееся в её блоке на True или False . Если условие не выполнилось, тест упадёт с ошибкой, уведомив нас о некорректной работе сервиса.
Пример в test_non_number реализует проверку того же метода fib , только с заведомо некорректными входными параметрами. При вызове такого метода должна произойти ошибка. Если её не произошло, то мы закончим проверку метода с текстом fib must consume only ints.
Таким образом, мы полностью покрыли тестами функцию в нашем сервисе.
Заключение
В статье мы рассмотрели протокол AMQP и его частную реализацию RabbitMQ, реализовали тестовый сервис, реализующий простой удалённый вызов процедур с одним методом, а также протестировали работоспособность нашего сервиса с помощью автотестов, написанных на Python с помощью pytest .
Особенность тестирования сервисов, которые работают на протоколе AMQP, это корректная реализация объекта, который будет эмулировать запросы клиента. Для этого нужно знать некоторые особенности работы самого протокола AMQP, а также нюансы, которые использовали при разработке тестируемого сервиса (название очередей, публичные RPC-методы и прочее).