JMS Connector
Overview
Connecting streams to JMS with Reactive Messaging couldn’t be easier.
Maven Coordinates
To enable JMS Connector, add the following dependency to your project’s pom.xml (see Managing Dependencies).
xml
<dependency>
<groupId>io.helidon.messaging.jms</groupId>
<artifactId>helidon-messaging-jms</artifactId>
</dependency>Configuration
Connector name: helidon-jms
Configuration options
| Key | Kind | Type | Default Value | Description |
|---|---|---|---|---|
acknowledge-mode | VALUE | i.h.m.c.j.AcknowledgeMode | AUTO_ACKNOWLEDGE | JMS acknowledgement mode |
destination | VALUE | String | Queue or topic name | |
jndi-initial-context-properties | MAP | String | Environment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url | |
jndi-initial-factory | VALUE | String | JNDI initial factory | |
jndi-jms-factory | VALUE | String | JNDI name of JMS factory | |
jndi-provider-url | VALUE | String | JNDI provider url | |
message-selector | VALUE | String | JMS API message selector expression based on a subset of the SQL92 | |
named-factory | VALUE | String | To select from manually configured jakarta.jms.ConnectionFactory ConnectionFactories over JmsConnector.JmsConnectorBuilder#connectionFactory(String, jakarta.jms.ConnectionFactory) JmsConnectorBuilder#connectionFactory() | |
password | VALUE | String | Password used for creating JMS connection | |
period-executions | VALUE | Long | 100 | Period for executing poll cycles in millis |
poll-timeout | VALUE | Long | 50 | Timeout for polling for next message in every poll cycle in millis |
queue | VALUE | String | Use supplied destination name and Type#QUEUE QUEUE as type | |
session-group-id | VALUE | String | When multiple channels share same session-group-id, they share same JMS session | |
topic | VALUE | String | Use supplied destination name and Type#TOPIC TOPIC as type | |
transacted | VALUE | Boolean | false | Indicates whether the session will use a local transaction |
type | VALUE | i.h.m.c.j.Type | QUEUE | Specify if connection is Type#QUEUE queue or Type#TOPIC topic |
username | VALUE | String | User name used for creating JMS connection |
TIP
Besides the configuration options above, custom attributes can be passed over configuration.
jndi.destination | JNDI destination identifier. |
jndi.env-properties | Environment properties used for creating initial context java.naming.factory.initial, java.naming.provider.url … |
producer.someproperty | property with producer prefix is set to producer instance (for example WLS Unit-of-Order WLMessageProducer.setUnitOfOrder("unit-1") can be configured as producer.unit-of-order=unit-1) |
Custom Attributes Examples
Configured JMS factory
The simplest possible usage is looking up JMS ConnectionFactory in the naming context.
Example of connector config:
yaml
mp.messaging:
incoming.from-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
outgoing.to-jms:
connector: helidon-jms
destination: messaging-test-queue-1
type: queue
connector:
helidon-jms:
user: Gandalf
password: mellon
jndi:
jms-factory: ConnectionFactory
env-properties:
java.naming:
factory.initial: org.apache.activemq.jndi.ActiveMQInitialContextFactory
provider.url: tcp://localhost:61616Injected JMS factory
In case you need more advanced setup, connector can work with injected factory instance.
Inject:
java
@Produces
@ApplicationScoped
@Named("active-mq-factory")
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(config.get("jms.url").asString().get());
}Config:
yaml
jms:
url: tcp://127.0.0.1:61616
mp:
messaging:
connector:
helidon-jms:
named-factory: active-mq-factory
outgoing.to-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queue
incoming.from-jms:
connector: helidon-jms
session-group-id: order-connection-1
destination: TESTQUEUE
type: queueUsage
Consuming
Consuming one by one unwrapped value:
java
@Incoming("from-jms")
public void consumeJms(String msg) {
System.out.println("JMS says: " + msg);
}Consuming one by one, manual ack:
java
@Incoming("from-jms")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeJms(JmsMessage<String> msg) {
System.out.println("JMS says: " + msg.getPayload());
return msg.ack();
}Producing
Example of producing to JMS:
java
@Outgoing("to-jms")
public PublisherBuilder<String> produceToJms() {
return ReactiveStreams.of("test1", "test2");
}Example of more advanced producing to JMS:
java
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.correlationId(UUID.randomUUID().toString())
.property("stringProp", "cool property")
.property("byteProp", 4)
.property("intProp", 5)
.onAck(() -> CompletableFuture.completedStage(null)
.thenRun(() -> System.out.println("Acked!")))
.build());
}Example of even more advanced producing to JMS with custom mapper:
java
@Outgoing("to-jms")
public PublisherBuilder<Message<String>> produceToJms() {
return ReactiveStreams.of("test1", "test2")
.map(s -> JmsMessage.builder(s)
.customMapper((p, session) -> {
TextMessage textMessage = session.createTextMessage(p);
textMessage.setStringProperty("custom-mapped-property", "XXX" + p);
return textMessage;
})
.build()
);
}