Регистрация

Уже есть аккаунт? Вход

Уже есть аккаунт? Sign In Now

Вход

Login to our social questions & Answers Engine to ask questions answer people’s questions & connect with other people.

Зарегистрироваться

Забыли пароль?

Вы не вошли, Зарегистрироваться

Forgot Password

Уже есть аккаунт? Sign In Now

Вы должны войти, чтобы задать вопрос.

Забыли пароль?

Необходима учетная запись, Зарегистрироваться
ВходНовый

Вопросы — CODERIDE.RU

Вопросы — CODERIDE.RU Logo Вопросы — CODERIDE.RU Logo

Вопросы — CODERIDE.RU Navigation

  • Главная
  • О нас
  • Контакты
Поиск
Задать вопрос

Mobile menu

Close
Задать вопрос
  • Главная
  • О нас
  • Контакты
  • Вопросы
    • Новые
    • Рейтинговые
    • Просматриваемые
  • Теги
Главная/ Вопросы/Q 348
In Process
  • 99

Встроенный в Spring Boot кластер HornetQ не пересылает сообщения

  • 99

Я пытаюсь создать статический кластер из двух приложений Spring Boot со встроенными серверами HornetQ. Одно приложение/сервер будет обрабатывать внешние события и генерировать сообщения для отправки в очередь сообщений. Другое приложение/сервер будет прослушивать очередь сообщений и обрабатывать входящие сообщения. Поскольку связь между двумя приложениями ненадежна, каждое из них будет использовать только локальные клиенты/inVM для создания/использования сообщений на своем соответствующем сервере и, полагаясь на функции кластеризации, перенаправлять сообщения в очередь на другом сервере кластера.

Я использую HornetQConfigurationCustomizer для настройки встроенного сервера HornetQ, потому что по умолчанию он поставляется только с InVMConnectorFactory.

Я создал несколько примеров приложений, которые иллюстрируют эту настройку, в этом примере «Отправление сервера» относится к серверу, который будет создавать сообщения, а «Получение сервера» относится к серверу, который будет получать сообщения.

pom.xml для обоих приложений содержит:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
    <groupId>org.hornetq</groupId>
    <artifactId>hornetq-jms-server</artifactId>
</dependency>

Демохорнет-серверсендприложение:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Server: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverReceiveConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverSendConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

приложение.свойства (серверная ссылка):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

Demohornetqсерверприменение:

@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
    }

    @JmsListener(destination="${spring.hornetq.embedded.queues}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverSendConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverReceiveConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

приложение.свойства (ServerReceive):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

После запуска обоих приложений вывод журнала показывает следующее:

Серверная передача:

2015-04-09 11:11:58.471  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:Users****AppDataLocalTemphornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:11:58.501  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:11:58.595  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:11:58.720  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:11:59.568  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 

Получение сервера:

2015-04-09 11:12:04.401  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:Users****AppDataLocalTemphornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:12:04.410  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:12:04.520  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:12:04.629  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:12:05.545  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 

Я вижу clustered=true в обоих выводах, и это покажет false, если я удалю конфигурацию кластера из HornetQConfigurationCustomizer, так что это должно иметь какой-то эффект.

Теперь ServerSend показывает это в выводе консоли:

Sending message: Timestamp from Server: 1428574324910  
Sending message: Timestamp from Server: 1428574329899  
Sending message: Timestamp from Server: 1428574334904  

Однако ServerReceive ничего не показывает.

Похоже, что сообщения не пересылаются с сервера отправки на сервер получения.

Я провел еще некоторое тестирование, создав еще два приложения Spring Boot (ClientSend и ClientReceive), в которые не встроен сервер HornetQ, а вместо этого подключается к «родному» серверу.

pom.xml для обоих клиентских приложений содержит:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>

DemoHornetqClientSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Client: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }
}

приложение.свойства (отправка клиенту):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446

queue=jms.testqueue

demohornetqclientreceiveприложение:

@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
    }

    @JmsListener(destination="${queue}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

приложение.свойства (клиент получает):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445

queue=jms.testqueue

Теперь консоль показывает это:

ServerReveive:

Received message: Timestamp from Client: 1428574966630  
Received message: Timestamp from Client: 1428574971600  
Received message: Timestamp from Client: 1428574976595  

Клиент получает:

Received message: Timestamp from Server: 1428574969436  
Received message: Timestamp from Server: 1428574974438  
Received message: Timestamp from Server: 1428574979446  

Если у меня какое-то время работает ServerSend, а затем запускается ClientReceive, он также получает все сообщения, поставленные в очередь до этого момента, так что это показывает, что сообщения не просто исчезают где-то или потребляются откуда-то еще.

Для полноты картины я также указал ClientSend на ServerSend и ClientReceive на ServerReceive, чтобы узнать, есть ли какие-либо проблемы с кластеризацией и клиентами InVM, но опять же не было никаких сообщений, указывающих на то, что какое-либо сообщение было получено либо в ClientReceive, либо в ServerReceive.

Таким образом, похоже, что доставка сообщений каждому из встроенных брокеров напрямую подключенным внешним клиентам работает нормально, но между брокерами в кластере сообщения не пересылаются.

Итак, после всего этого возникает большой вопрос: что не так с настройкой, при которой сообщения не пересылаются внутри кластера?

hornetqjavaspringspring-boot
  • Поделиться
ci_ 09.04.2015 10:52
  • 3 3 Ответа
  • 2,702 Просмотра
Ответ

    Похожие вопросы

    • Как я могу подсчитать целочисленное вхождение, когда int равен 0 в java?
    • Springboot с сгенерированной Зуулом войной дает 404 в Tomcat 8.5.68
    • Как нарисовать двойной пустой ромб ASCII?

    3 Ответа

    • Популярные
    • Старые
    • Новые
    1. http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

      «Ядро HornetQ разработано как набор простых POJO, поэтому, если у вас есть приложение, для которого требуется внутренняя функциональность обмена сообщениями, но вы не хотите использовать его в качестве сервера HornetQ, вы можете напрямую создать экземпляр и встроить серверы HornetQ в свое собственное приложение.»

      Если вы внедряете его, вы не предоставляете его в качестве сервера. У каждого из ваших контейнеров есть отдельный экземпляр. Это эквивалентно запуску 2 копий hornet и присвоению им одного и того же имени очереди. Один записывает в эту очередь в первом экземпляре, а другой прослушивает очередь во втором экземпляре.

      Если вы хотите таким образом разделить свои приложения, вам нужно иметь одно место, которое действует как сервер. Вероятно, вы хотите сгруппироваться. Кстати, это не относится конкретно к Хорнету. Вы часто будете находить эту закономерность.

      • Поделиться
      Joseph Spears 18.07.2015 21:49
      • 0
      • Ответить
      • ID31495934 — Этот ответ меня не убедил. 1. В приведенных документах конкретно не указано, что то, что я пытаюсь сделать, невозможно сделать. 2. Я уже предоставляю встроенный сервер для внешнего доступа, т. Е. подключаюсь к нему с внешнего клиента, с этим проблем нет. 3. Да, кластеризация-это именно то, что я ищу, и это именно то, о чем идет речь в этом вопросе, и это также именно то, что не работает

        • Поделиться
        • 0
        • Ответить
      • ID31495934 — ci на самом деле кластеризация-это НЕ то, что вы ищете. Кластеры используются для обеспечения избыточности, а также для распределения и совместного использования нагрузки. Вы пытаетесь использовать один сервер для отправки непосредственно на другой сервер. Можно ли это сделать? да. Является ли это целью кластеризации? Нет.

        • Поделиться
        • 0
        • Ответить

    Ответить
    Отменить ответ

    Вы должны авторизоваться, чтобы добавить ответ.

    Забыли пароль?

    Необходима учетная запись, Зарегистрироваться

    Explore

    • Главная
    • Вопросы
      • Новые
      • Просматриваемые
      • Рейтинговые
    • Теги
    • .net
    • android
    • arrays
    • asp-net
    • c
    • c#
    • c++
    • css
    • html
    • ios
    • java
    • javascript
    • jquery
    • mysql
    • node-js
    • php
    • r
    • ruby-on-rails
    • sql

    2021 CODERIDE.RU

    Вставить/изменить ссылку

    Введите адрес назначения (URL)

    Или сделайте ссылку на существующий материал

      Поисковый запрос не задан. Показаны недавние элементы. Воспользуйтесь поиском или клавишами вверх/вниз, чтобы выбрать элемент.