Один из наиболее сложных вопросов при построении сложной информационной системы состоит в организации взаимодействия между её компонентами. Для «обычного»клиент-серверного приложения этот вопрос практически не актуален. Но, как быть в случае сложных распределённых систем, систем состоящих из нескольких разнородных модулей (каждый из которых даже не просто написан на своём языке программирования, а выполняется в своей среде) или систем реализующих сложный параллельные вычисления? Как заставить составные части таких программ, включающие различные процессы и потоки, работать согласованно?
Один из способов решения данной задачи состоит в том, чтобы организовать между компонентами программной системы нечто наподобие «электронной почты», когда один компонент направляет одному или нескольким другим компонентам сообщение с некоторой информацией. В свою очередь компоненты, которые получают это сообщение, в зависимости от его содержания выполняют те или иные действия.
В настоящее время существует уже целый ряд готовых платформ для подобного обмена сообщениями (брокеров сообщений). Одним из наиболее популярных брокеров сообщений является RabbitMQ.
Подготовка к работе
Перед началом работы необходимо загрузить клиент RabbitMQ для .NET и реализовать подключение к службе брокера сообщений и соответствующей очереди.
Официальный клиент RabbitMQ так и называется RabbitMQ Client. Он может быть легко добавлен к проекту при помощи NuGet. На скриншоте ниже он уже установлен.
В процессе установки NuGet автоматически скачает необходимые сборки, подключит их проекту и создаст требуемые ссылки. Поэтому данный способ загрузки наиболее предпочтителен.
После загрузки клиента можно приступить к подключению.
Подключаем необходимые пространства имён:
1 2 3 |
using RabbitMQ.Client; using RabbitMQ.Client.MessagePatterns; using RabbitMQ.Client.Events; |
Для собственно подключения к службе брокера сообщений нам потребуется:
- Имя пользователя и пароль (по умолчанию «guest»);
- Имя хоста (обычно IP адрес или «localhost» для локального подключения);
- Виртуальный хост (по умолчанию «/»).
Вышеперечисленные сведения передаются классу ConnectionFactory, с помощью которого и создаётся подключение.
1 2 3 4 5 6 7 8 9 10 11 12 |
private IConnection GetRabbitConnection() { ConnectionFactory factory = new ConnectionFactory { UserName = "guest", Password = "guest", VirtualHost = "/", HostName = "localhost" }; IConnection conn = factory.CreateConnection(); return conn; } |
Соединение с RabbitMQ открывается сразу же после создания подключения.
Далее подключаемся к очереди.
Чтобы подключиться к очереди необходимо знать:
- Собственно имя очереди;
- Имя точки обмена;
- Ключ маршрутизации (определяет связь между точкой обмена и очередью);
Подключение к очереди создаётся при помощи метода CreateModel на основе подключения к службе брокера сообщений.
После создания подключения к очереди его необходимо сконфигурировать. С помощью методов ExchangeDeclare, QueueDeclare и QueueBind мы конфигурируем соответственно точку обмена, очередь и связь между ними.
Подключение к очереди также открывается автоматически.
Ниже приведён пример создания и конфигурирования подключения к очереди.
1 2 3 4 5 6 7 8 9 10 11 12 |
string exchangeName = "test"; string queueName = "test"; string routingKey = "test"; . . . private IModel GetRabbitChannel(string exchangeName, string queueName, string routingKey) { IModel model = conn.CreateModel(); model.ExchangeDeclare(exchangeName, ExchangeType.Direct); model.QueueDeclare(queueName, false, false, false, null); model.QueueBind(queueName, exchangeName, routingKey, null); return model; } |
Когда будет создано и сконфигурировано подключение к очереди можно приступить к обмену сообщениями.
Отправка сообщения
В основном практикуется обмен строковыми сообщениями. Но, в любом случае, RabbitMQ работает с сообщениями в виде массива байт.
Поэтому перед отправкой сообщения его необходимо соответсвующим образом преобразовать. После этого полученный массив байт передаётся в качестве параметра методу BasicPublish, который в числе прочего принимает также имя точки обмена и ключ маршрутизации.
Ниже приведён пример отправки сообщения в виде строки, которую хотя бы один раз в жизни писал каждый (или почти каждый) программист.
1 2 3 4 5 6 |
private void SendMessage() { IModel model = GetRabbitChannel(exchangeName, queueName, routingKey); byte[] messageBodyBytes = Encoding.UTF8.GetBytes("Hello, world!"); model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes); } |
Получение одного сообщения
RabbitMQ поддерживает два режима доставки сообщений:
-
Доставка единичного сообщения по запросу;
-
Подписка на очередь (постоянный мониторинг очереди с доставкой всех сообщений).
Рассмотрим пока первый из них.
Для отображения единичного сообщения в RabbitMQ Client предусмотрен класс BasicGetResult. Чтобы получить такое сообщение нужно вызвать метод BasicGet, как показано в примере ниже. Если в очереди есть сообщения метод BasicGet возвратит объект BasicGetResult в противном случае он возвратит null.
Метод BasicGet принимает два параметра. Первый – имя очереди. Второй определяет помечать ли сообщение как не доставленное.
Полученное сообщение хранится в свойстве Body данного объекта в виде массива байт. Поэтому для того чтобы прочитать текст сообщения его требуется преобразовать обратно в строку.
Ниже представлен один из возможных вариантов реализации получения единичного сообщения по запросу.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
private string ReceiveIndividualMessage() { string originalMessage = ""; model = GetRabbitChannel(exchangeName, queueName, routingKey); BasicGetResult result = model.BasicGet(queueName, false); if (result == null) { // В настоящее время нет доступных сообщений. } else { byte[] body = result.Body; originalMessage = Encoding.UTF8.GetString(body); } return originalMessage; } |
Если в очереди есть сообщения метод BasicGet возвратит объект BasicGetResult в противном случае он возвратит null.
Подписка на очередь (получение всех сообщений)
Для реализации подписки предназначен класс Subscription. Чтобы создать подписку на очередь обычно используется конструктор данного класса с тремя параметрами. Первые два, это подключение к очереди и имя очереди. А, третий определяет помечать сообщения как доставленные автоматически (true) или «вручную» (false).
После создания подписки запускается бесконечный цикл, который прослушивает очередь и при появлении в ней нового сообщения автоматически загружает его в программу.
Само сообщение хранится в виде массива байт в свойстве Body объекта BasicDeliverEventArgs. Этот объект возвращает метод Next класса Subscription, который собственно и прослушивает очередь.
Метод Ack класса Subscription принимающий в виде аргумента объект BasicDeliverEventArgs отмечает сообщение как доставленное.
В следствие того, что при подписке мониторинг очереди, в силу особенностей RabbitMQ Client может производиться только в бесконечном цикле, неизбежно возникает проблема «зависания» того потока в котором он выполняется. Поэтому мониторинг очереди настоятельно рекомендуется производить в отдельном потоке.
Далее приведён пример реализации подписки на очередь с выводом полученных сообщений в TextBox (Windows Forms).
Собственно подписка на очередь:
1 2 3 4 5 6 7 8 9 10 11 12 |
private void RabbitListener() { model = GetRabbitChannel(exchangeName, queueName, routingKey); var subscription = new Subscription(model, queueName, false); while (true) { BasicDeliverEventArgs basicDeliveryEventArgs = subscription.Next(); string messageContent = Encoding.UTF8.GetString(basicDeliveryEventArgs.Body); messagesTextBox.Invoke((MethodInvoker)delegate { messagesTextBox.Text += messageContent + "\r\n"; }); subscription.Ack(basicDeliveryEventArgs); } } |
И её запуск по клику на кнопке (как вариант):
1 2 3 4 5 |
private void fetchAllBtn_Click(object sender, EventArgs e) { thread = new Thread(new ThreadStart(RabbitListener)); thread.Start(); } |
Завершение работы
При работе с RabbitMQ важно не только создать требуемые подключения и обмениваться сообщениями, но и корректно эту работу завершить.
Когда RabbitMQ для работы приложения уже не нужен следует:
- Завершить работу всех потоков в которых выполняется мониторинг очередей (см. «Подписка на очередь»);
- Закрыть все подключения к очередям;
- Закрыть все подключения к службе брокера сообщений.
1 2 3 4 5 6 7 8 9 10 11 12 |
if (thread != null) { thread.Abort(); } if (model != null) { model.Close(); } if (conn != null) { conn.Close(); } |
После этого работа с RabbiMQ в программе будет завершена..
Добавить комментарий