Как правильно на сервере ожидать сообщений в очередях RabbitMQ

Мы получаем данные из очередей RabbitMQ (с помощью компоненты) при этом есть следующие особенности:

  1. Очередей может быть много от 1 до 50
  2. Как правило очереди могут обрабатываться параллельно (независимо) но некоторые очереди должны обрабатываться последовательно (вначале прочитали все сообщения из одной очереди, а только потом можем читать из другой)
  3. Сообщений относительно не много, т.е. система больше времени ожидает сообщения, чем их обрабатывает
  4. Желательно обрабатывать сообщения как можно быстрее, т.е. как только сообщение появилось в очереди его нужно обрабатывать

Вопрос: как лучше организовать ожидание и получение сообщений на сервере так, чтобы с одной стороны обеспечить выполнение требований (см. выше) а с другой не нагружать сервер лишней работой?

Варианты которые, мы пробовали (и которые не очень):

  1. Обычное регл. задание которое выполняется раз в 3 секунды и последовательно получает сообщения по списку очередей.
    Минусы:
  • сильно нагружает сервер (особенно если это ERP и в базе на момент запуска нет соединений, и в память каждый раз грузятся все метаданные)
  • т.к. очереди обрабатываются последовательно, то из каждой конкретной очереди данные забираются редко (ждем все остальные) и данные оперативно не приходят в систему
  • регл. задание не успевает выполниться за 3 сек
  1. Обычное регл. задание которое выполняется раз в 3 секунды и для каждой очереди (группы очередей) запускает отдельное фоновое задание (которые выполняются параллельно).
    Минусы:
  • сильно нагружает сервер (особенно если это ERP и в базе на момент запуска нет соединений, и в память каждый раз грузятся все метаданные)
  • фоновые здания “плодятся” и часто это приводит к нехватке памяти и падению rphostов
  1. Регл. задание работает редко (раз в 5 минут, и то если упало предыдущее) а внутри регл. задания опрашиваются очереди (последовательно или параллельно) а в качестве паузы используется внешняя компонента “sleep” (т.е. интервалы опроса задаются не через расписание, а через вызов “sleep” внутри одного процесса)
    Минусы:
  • приходиться использовать самодельную компоненту
  • баланс между оперативностью получения сообщений (параллельная обработка) и перегрузкой сервера (50 процессов которые постоянно опрашивают и пытаются забрать сообщения из 50 очередей убивают сервер) все равно приходиться искать руками

Наблюдается проблема с непониманием как работает получение сообщений в принципе я так понимаю.

Итак давайте по порядку

Вот код получения сообщений. Вопрос на засыпку - что будет с этим кодом - если в очереди нет сообщений ?

Второй вопрос - что будет если интервал ожидания увеличить скажем до 10 минут ?

Третий вопрос - нагружает ли сервер слушающий сокет ?

Четвертый вопрос - кто является инициатором отправки сигнала о наличии сообщения в очереди ? Сервер RMQ пуляет информацию в слушающий сокет или клиент 1С постоянно отпрашивает очередь нагружая сервер ?

Фактически - ИнтервалОжидания это и есть то что вам нужно, не нужно было делать sleep - он есть. На сервере RMQ если увеличить интервал ожидания вы будете наблюдать открытый Channel - канал доставки.

Гляньте нашу подсистему последних версий - в ней реализовано следующее поведение

  • Регламентное задание стартует раз в N минут
  • Его задача поднять ВСЕ слушающие сокеты и назначить им обработчики сообщений
  • Интервал ожидания задается в пользовательском режиме в справочнике Обработчики сообщений
  • Сокеты живут ровно столько сколько нужно, но не более N минут, чтобы 1С переподняло.

На своих проектах при высоких нагрузках мы используем подсистему Жени Павлюка с его реализацией МенеджераЗаданий https://github.com/wizi4d/TaskManagerFor1C

Я тоже не понял зачем там sleep и что может поедать память. При правильной организации прослушки такого не происходит

У вас 50 рпхостов слушает 50 очередей? Так похоже неправильно… должно быть 50 сеансов заданий, а рпхостов 1-2 или сколько там сервер создаст

вас 50 рпхостов слушает 50 очередей?

У нас именно 50 сеансов заданий (про 50 rphost вроде я нигде не писал)

Сегодня посмотрю, спасибо

про 50 rphost вроде я нигде не писал

Писал, писал, все ходы записаны :slight_smile:

Меня больше пугает постановка задачи: “сначала надо прочитать из одной очереди, а потом из другой”. Кажется имеет место неправильный подход к построению бизнес-архитектуры самих очередей.

Да, я не точно сформулировал, имелось в виду 50 фоновых заданий, а не 50 rphost ов

Какую версию нужно смотреть?

Я посмотрел 1.7 там Для каждой очереди создается элемент справочника ПодпискиНаОчередиСообщений, а для каждого элемента справочника формируется отдельное регл. задание, которое забирает данные из очереди.
Соответственно если нам требуется получать данные из очереди не позже чем через 3 секунды, после их появления, то и расписание регл. задания придется задать 1 раз в 3 секунды.

И соответственно такой подход наследует проблемы которые я описывал выше:

  1. Если на момент запуска регл. заданий в базе нет соединений, то целиком кэшируются метаданные конфигурации (а это частая ситуация ночью либо при автоматическом тестировании, либо в базах разаработчиков)
  2. При большом количестве очередей “плодятся” регл задания

Прям беда… Мы на разных языках Валер разговариваем. Забудь про регламетные задания на время - это 1с-ное понятие.

Ответь мне на 2 вопроса

  • сколько ты хочешь чтобы существовал слушающий канал между 1С и RabbitMQ который будет ждать появление собщений в очереди
  • как часто ты считаешь нужно обеспечить пересоздание слушающего канала.

тогда я скажу тебе где и как настроить эти времена.

Если существование этого канала не расходует ресурсы сервера 1С, то сколько угодно долго.

А зачем его пересоздавать? Т.е. если его пересоздание не влияет на его работоспособность, то пусть пересоздается только при рестарте сервера, либо при изменении параметров очереди.

Т.е. мне надо обеспечить получение сообщений из 50 очередей, с задержкой для каждой очереди не более 3х секунд так, чтобы это минимально нагружало сервер.Т.е. если сообщений в очередях нет, то и ресурсы сервера не тратятся.

Состав и имена очередей меняются достаточно редко

Я прошу дать конкретный ответ ;-). сколько угодно - это неконкретно :wink:

Попробую еще раз

ИмяМетодаВОбщемМодуле = "ОбщийМодуль.ПолучательСообщений.ОбработатьСообщение"
СрокПерезапускаКаналаПолучателя = ???;
ОжидатьБесконечноСообщенийВПустойОчередиДоПерезапускаКанала = Истина;

Сервер.ЗапуститьКаналПодпискиНаСообщение(
ИмяМетодаВОбщемМодуле, 
СрокПерезапускаКаналаПолучателя,
ОжидатьБесконечноСообщенийВПустойОчередиДоПерезапускаКанала,
"api.mdm.allinclusive"
);

меня интересует одно конкретное значение

СрокПерезапускаКаналаПолучателя

И твое согласие что ОжидатьБесконечноСообщенийВПустойОчередиДоПерезапускаКанала = Истина

Вся суть фонового задания в подсистеме, это создать канал взаимодействия фонового задания с сервером RMQ. После того как фоновое стартануло и канал создался, в игру вступает интервал ожидания, в течении которого внешняя компонента “слушает” очередь. Как только в очередь прилетает сообщение, сервер бросает его в открытый канал клиенту.
То есть в большинстве случаев для начала можно ставить интервал ожидания секунд 60 с интервалом перезапуска рег задания в теже 60 секунд.
Как только канал закрывается, он открывается снова, после перезапуска задания.
В момент пока компонента “слушает”, ресурсы не потребляются.
Важно еще понимать, что счетчик интервала ожидания сбрасывается при приеме сообщения. Таким образом если сообщения прилетают чаще чем интервал ожидания, то фоновое работает и держит канал открытым.

1 Симпатия

Не совсем понятно, как в этом случае должен выглядеть код в регл. задании?

т.е. если сделать так, как в конфигурации Очереди сообщений RMQ (1.7.0.0):

Процедура ПолучитьСообщение(
… _
_ Получатель = ОчередьСообщений.НовыйПодписчик(ИмяОчереди, КлючСоединения, ЧастотаПульса);

… _
_ Пока Получатель.ПолучитьСообщение(Данные, ИнтервалОжидания*1000) Цикл

То после получения всех сообщений, регл. задание завершиться, и система будет ждать следующего старта регл. задания до 60 секунд (по расписанию), а нам нужно обеспечить максимальное время получения сообщения из очереди после его появления 3 секунды.

Или нужно внутри регл задания устроить бесконечный цикл, из которого получать сообщения (но как тогда его прервать для перезапуска задания, или этого делать не нужно)?

Оно завершится по истечении интервала ожидания (60 секунд в нашем примере), если не будет новых сообщений.

Не будет. Пауза в 60 секунд - это пауза между моментами старта задания, если между стартом и окончанием прошло больше времени (а его в нашем случае прошло больше) - задание рестартует немедленно.

Разобрались, т.е. для того чтобы обеспечить непрерывность получения сообщений и при этом не нагружать сервер нужно:

  1. На каждую очередь сделать отдельное задание
  2. Установить ИнтервалОжидания в ПолучитьСообщение() большим или равным интервалу перезапуска регл. задания
  3. Интервалы можно сделать достаточно большими, скажем 60 секунд

Всем, кто помог, большое спасибо!!!

1 Симпатия