Camel 和 Active MQ 连接到某个本地代理

Camel and Active MQ connects to some local broker

提问人:Michal Ambrož 提问时间:9/21/2023 更新时间:9/21/2023 访问量:31

问:

我有骆驼弹簧启动应用程序,我想连接本地活动 MQ 代理。我添加了库

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-activemq</artifactId>
        <version>${cammel.version}</version>
    </dependency>

配置

  @Bean(name = "activemq")
  public ActiveMQComponent createComponent() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("tcp://AP934467:61616");


    ActiveMQComponent activeMQComponent = new ActiveMQComponent();
    activeMQComponent.setConnectionFactory(activeMQConnectionFactory);
    return activeMQComponent;
  }

和两个简单的流程

    from("file:src/data")
            .log(LoggingLevel.ERROR, "Forwarding message...")
            .to("activemq:queue:In.Queue");

    from("activemq:queue:In.Queue")
            .log(LoggingLevel.ERROR, "Forwarding message again...")
            .to("activemq:queue:Out.Queue");

一切正常。它获取文件,将其放在“某处”,读取它并将其放入 Out.Queue。问题是,我在 ActiveMQ 中看不到消息。如果我在“tcp://AP934467:61616”上终止 ActiveMQ,它甚至可以工作。消息由其他设备处理。

在下面登录

2023-09-21 08:11:36.126 DEBUG o.a.c.c.file.GenericFileConsumer.processExchange - About to process file: GenericFile[message1.xml] using exchange: Exchange[]
2023-09-21 08:11:36.126 ERROR route2.log - Forwarding message...
2023-09-21 08:11:36.126 DEBUG o.a.camel.processor.SendProcessor.process - >>>> activemq://queue:Out.Queue Exchange[063A6440B9046E0-0000000000000002]
2023-09-21 08:11:36.126 DEBUG o.a.c.c.j.JmsConfiguration$CamelJmsTemplate.execute - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:AP934467-53582-1695276133517-4:1:2,started=true,closed=false} java.lang.Object@613d705a
2023-09-21 08:11:36.126 DEBUG o.a.camel.component.jms.JmsBinding.appendJmsProperty - Ignoring non primitive header: CamelFileInitialOffset of class: org.apache.camel.support.resume.Resumables$AnonymousResumable with value: org.apache.camel.support.resume.Resumables$AnonymousResumable@63c9e513
2023-09-21 08:11:36.126 DEBUG o.a.c.component.jms.JmsConfiguration.doSend - Sending JMS message to: queue://Out.Queue with message: ActiveMQBytesMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {CamelFileLastModified=1693383287223, CamelFileParent=src\data, CamelMessageTimestamp=1693383287223, CamelFilePath=src\data\message1.xml, CamelFileNameConsumed=message1.xml, CamelFileLength=160, CamelFileRelativePath=message1.xml, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\devenv\IdeaProjects\eai-cammel\web\src\data\message1.xml, CamelFileName=message1.xml, CamelFileNameOnly=message1.xml}, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQBytesMessage{ bytesOut = org.apache.activemq.util.ByteArrayOutputStream@4bf3c3ed, dataOut = java.io.DataOutputStream@7a43afe9, dataIn = null }
2023-09-21 08:11:36.129 DEBUG o.a.activemq.broker.region.Queue.messageSent - localhost Message ID:AP934467-53582-1695276133517-4:1:2:1:2 sent to queue://Out.Queue
2023-09-21 08:11:36.130 DEBUG o.a.activemq.broker.region.Queue.doPageInForDispatch - queue://Out.Queue, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 1, memUsage:1184, maxPageSize:200
2023-09-21 08:11:36.130 DEBUG o.a.c.c.file.GenericFileOnCompletion.onCompletion - Done processing file: GenericFile[message1.xml] using exchange: Exchange[063A6440B9046E0-0000000000000002]
2023-09-21 08:11:36.131 DEBUG o.a.c.c.j.DefaultJmsMessageListenerContainer.doReceiveAndExecute - Received message of type [class org.apache.activemq.command.ActiveMQBytesMessage] from consumer [ActiveMQMessageConsumer { value=ID:AP934467-53582-1695276133517-4:1:1:1, started=true }] of session [Cached JMS Session: ActiveMQSession {id=ID:AP934467-53582-1695276133517-4:1:1,started=true,closed=false} java.lang.Object@4a1954fb]
2023-09-21 08:11:36.131 DEBUG o.a.activemq.broker.region.Queue.doPageInForDispatch - queue://Out.Queue, subscriptions=1, memory=0%, size=0, pending=0 toPageIn: 0, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 2, memUsage:0, maxPageSize:200
2023-09-21 08:11:36.131 DEBUG o.a.c.c.f.s.GenericFileProcessStrategySupport.renameFile - Renaming file: GenericFile[message1.xml] to: GenericFile[.camel\message1.xml]
2023-09-21 08:11:36.131 DEBUG o.a.activemq.broker.region.Queue.doPageInForDispatch - queue://Out.Queue, subscriptions=1, memory=0%, size=0, pending=0 toPageIn: 0, force:false, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 2, memUsage:0, maxPageSize:200
2023-09-21 08:11:36.131 DEBUG o.a.c.c.jms.EndpointMessageListener.onMessage - activemq://queue:Out.Queue consumer received JMS message: ActiveMQBytesMessage {commandId = 11, responseRequired = true, messageId = ID:AP934467-53582-1695276133517-4:1:2:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:AP934467-53582-1695276133517-4:1:2:1, destination = queue://Out.Queue, transactionId = null, expiration = 0, timestamp = 1695276696126, arrival = 0, brokerInTime = 1695276696126, brokerOutTime = 1695276696130, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@3c088eb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1184, properties = {CamelFileLastModified=1693383287223, CamelFileParent=src\data, CamelMessageTimestamp=1693383287223, CamelFilePath=src\data\message1.xml, CamelFileNameConsumed=message1.xml, CamelFileLength=160, CamelFileRelativePath=message1.xml, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\devenv\IdeaProjects\eai-cammel\web\src\data\message1.xml, CamelFileName=message1.xml, CamelFileNameOnly=message1.xml}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQBytesMessage{ bytesOut = null, dataOut = null, dataIn = null }
2023-09-21 08:11:36.132 ERROR route3.log - Forwarding message again...
2023-09-21 08:11:36.132 DEBUG o.a.camel.processor.SendProcessor.process - >>>> activemq://queue:In.Queue Exchange[063A6440B9046E0-0000000000000003]
2023-09-21 08:11:36.132 DEBUG o.a.c.c.j.JmsConfiguration$CamelJmsTemplate.execute - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:AP934467-53582-1695276133517-4:1:1,started=true,closed=false} java.lang.Object@4a1954fb
2023-09-21 08:11:36.132 DEBUG org.apache.camel.util.FileUtil.renameFile - Tried 1 to rename file: C:\devenv\IdeaProjects\eai-cammel\web\src\data\message1.xml to: src\data\.camel\message1.xml with result: true
2023-09-21 08:11:36.132 DEBUG o.a.c.component.jms.JmsConfiguration.doSend - Sending JMS message to: queue://In.Queue with message: ActiveMQBytesMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = queue://Out.Queue, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {CamelFileLastModified=1693383287223, CamelFileParent=src\data, CamelMessageTimestamp=1695276696126, CamelFilePath=src\data\message1.xml, CamelFileNameConsumed=message1.xml, CamelFileLength=160, CamelFileRelativePath=message1.xml, CamelJmsDeliveryMode=2, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\devenv\IdeaProjects\eai-cammel\web\src\data\message1.xml, CamelFileName=message1.xml, CamelFileNameOnly=message1.xml}, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQBytesMessage{ bytesOut = org.apache.activemq.util.ByteArrayOutputStream@1af2b730, dataOut = java.io.DataOutputStream@294cf90a, dataIn = null }
2023-09-21 08:11:36.133 DEBUG o.a.activemq.broker.region.Queue.messageSent - localhost Message ID:AP934467-53582-1695276133517-4:1:1:1:2 sent to queue://In.Queue
2023-09-21 08:11:36.134 DEBUG o.a.activemq.broker.region.Queue.doPageInForDispatch - queue://In.Queue, subscriptions=0, memory=0%, size=2, pending=0 toPageIn: 1, force:false, Inflight: 0, pagedInMessages.size 1, pagedInPendingDispatch.size 1, enqueueCount: 2, dequeueCount: 0, memUsage:2368, maxPageSize:200
java apache-camel activemq

评论


答:

0赞 Michal Ambrož 9/21/2023 #1
activeMQConnectionFactory.setAlwaysSyncSend(true);

帮助了我,现在我在 AMQ 中看到消息,如果它关闭,它会失败

评论

0赞 Justin Bertram 9/21/2023
如果这是正确答案,请将其标记为正确答案,以帮助将来遇到相同问题的其他人。谢谢!