Я пытаюсь создать статический кластер из двух приложений Spring Boot со встроенными серверами HornetQ. Одно приложение/сервер будет обрабатывать внешние события и генерировать сообщения для отправки в очередь сообщений. Другое приложение/сервер будет прослушивать очередь сообщений и обрабатывать входящие сообщения. Поскольку связь между двумя приложениями ненадежна, каждое из них будет использовать только локальные клиенты/inVM для создания/использования сообщений на своем соответствующем сервере и, полагаясь на функции кластеризации, перенаправлять сообщения в очередь на другом сервере кластера.
Я использую HornetQConfigurationCustomizer
для настройки встроенного сервера HornetQ, потому что по умолчанию он поставляется только с InVMConnectorFactory
.
Я создал несколько примеров приложений, которые иллюстрируют эту настройку, в этом примере «Отправление сервера» относится к серверу, который будет создавать сообщения, а «Получение сервера» относится к серверу, который будет получать сообщения.
pom.xml для обоих приложений содержит:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-server</artifactId>
</dependency>
Демохорнет-серверсендприложение:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Server: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverReceiveConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverSendConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
приложение.свойства (серверная ссылка):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
Demohornetqсерверприменение:
@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
}
@JmsListener(destination="${spring.hornetq.embedded.queues}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverSendConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverReceiveConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
приложение.свойства (ServerReceive):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
После запуска обоих приложений вывод журнала показывает следующее:
Серверная передача:
2015-04-09 11:11:58.471 INFO 7536 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:Users****AppDataLocalTemphornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)
2015-04-09 11:11:58.501 INFO 7536 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO
2015-04-09 11:11:58.595 INFO 7536 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE
2015-04-09 11:11:58.720 INFO 7536 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue
2015-04-09 11:11:59.568 INFO 7536 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445
2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221007: Server is now live
2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
Получение сервера:
2015-04-09 11:12:04.401 INFO 4528 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:Users****AppDataLocalTemphornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)
2015-04-09 11:12:04.410 INFO 4528 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO
2015-04-09 11:12:04.520 INFO 4528 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE
2015-04-09 11:12:04.629 INFO 4528 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue
2015-04-09 11:12:05.545 INFO 4528 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446
2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221007: Server is now live
2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
Я вижу clustered=true
в обоих выводах, и это покажет false
, если я удалю конфигурацию кластера из HornetQConfigurationCustomizer
, так что это должно иметь какой-то эффект.
Теперь ServerSend показывает это в выводе консоли:
Sending message: Timestamp from Server: 1428574324910
Sending message: Timestamp from Server: 1428574329899
Sending message: Timestamp from Server: 1428574334904
Однако ServerReceive ничего не показывает.
Похоже, что сообщения не пересылаются с сервера отправки на сервер получения.
Я провел еще некоторое тестирование, создав еще два приложения Spring Boot (ClientSend и ClientReceive), в которые не встроен сервер HornetQ, а вместо этого подключается к «родному» серверу.
pom.xml для обоих клиентских приложений содержит:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
DemoHornetqClientSendApplication:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Client: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
}
приложение.свойства (отправка клиенту):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446
queue=jms.testqueue
demohornetqclientreceiveприложение:
@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
}
@JmsListener(destination="${queue}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
приложение.свойства (клиент получает):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445
queue=jms.testqueue
Теперь консоль показывает это:
ServerReveive:
Received message: Timestamp from Client: 1428574966630
Received message: Timestamp from Client: 1428574971600
Received message: Timestamp from Client: 1428574976595
Клиент получает:
Received message: Timestamp from Server: 1428574969436
Received message: Timestamp from Server: 1428574974438
Received message: Timestamp from Server: 1428574979446
Если у меня какое-то время работает ServerSend
, а затем запускается ClientReceive
, он также получает все сообщения, поставленные в очередь до этого момента, так что это показывает, что сообщения не просто исчезают где-то или потребляются откуда-то еще.
Для полноты картины я также указал ClientSend
на ServerSend
и ClientReceive
на ServerReceive
, чтобы узнать, есть ли какие-либо проблемы с кластеризацией и клиентами InVM, но опять же не было никаких сообщений, указывающих на то, что какое-либо сообщение было получено либо в ClientReceive
, либо в ServerReceive
.
Таким образом, похоже, что доставка сообщений каждому из встроенных брокеров напрямую подключенным внешним клиентам работает нормально, но между брокерами в кластере сообщения не пересылаются.
Итак, после всего этого возникает большой вопрос: что не так с настройкой, при которой сообщения не пересылаются внутри кластера?
http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595
«Ядро HornetQ разработано как набор простых POJO, поэтому, если у вас есть приложение, для которого требуется внутренняя функциональность обмена сообщениями, но вы не хотите использовать его в качестве сервера HornetQ, вы можете напрямую создать экземпляр и встроить серверы HornetQ в свое собственное приложение.»
Если вы внедряете его, вы не предоставляете его в качестве сервера. У каждого из ваших контейнеров есть отдельный экземпляр. Это эквивалентно запуску 2 копий hornet и присвоению им одного и того же имени очереди. Один записывает в эту очередь в первом экземпляре, а другой прослушивает очередь во втором экземпляре.
Если вы хотите таким образом разделить свои приложения, вам нужно иметь одно место, которое действует как сервер. Вероятно, вы хотите сгруппироваться. Кстати, это не относится конкретно к Хорнету. Вы часто будете находить эту закономерность.
ID31495934 — Этот ответ меня не убедил. 1. В приведенных документах конкретно не указано, что то, что я пытаюсь сделать, невозможно сделать. 2. Я уже предоставляю встроенный сервер для внешнего доступа, т. Е. подключаюсь к нему с внешнего клиента, с этим проблем нет. 3. Да, кластеризация-это именно то, что я ищу, и это именно то, о чем идет речь в этом вопросе, и это также именно то, что не работает
ID31495934 — ci на самом деле кластеризация-это НЕ то, что вы ищете. Кластеры используются для обеспечения избыточности, а также для распределения и совместного использования нагрузки. Вы пытаетесь использовать один сервер для отправки непосредственно на другой сервер. Можно ли это сделать? да. Является ли это целью кластеризации? Нет.