I am using the following configuration for multi VirtualHosts.
Having simple receiver
@Component
public class MessageReceiver {
@StreamListener("SampleQueueA")
public void onQueueAReceived(DemoMessage msg) {
System.out.println("Message from A: "+msg);
}
@StreamListener("SampleQueueB")
public void onQueueBReceived(DemoMessage msg) {
System.out.println("Message from B: "+msg);
}
}
[public interface MessageChannels {
@Input("SampleQueueA")
SubscribableChannel queueA();
@Input("SampleQueueB")
SubscribableChannel queueB();
}](url)
spring:
rabbitmq:
host: 127.0.0.1
virtual-host: /defaultVH
username: guest
password: guest
cloud:
stream:
bindings:
SampleQueueA:
binder: rabbit-A
contentType: application/x-java-object
group: groupA
destination: SampleQueueA
SampleQueueB:
binder: rabbit-B
contentType: application/x-java-object
group: groupB
destination: SampleQueueB
binders:
rabbit-A:
defaultCandidate: false
inheritEnvironment: false
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
virtualHost: /vhA
username: guest
password: guest
port: 5672
connection-timeout: 10000
rabbit-B:
defaultCandidate: false
inheritEnvironment: false
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
virtualHost: /vhB
username: guest
password: guest
port: 5672
connection-timeout: 10000
############################################
# default settings
############################################
spring:
main:
banner-mode: "off"
application:
name: demo-service
cloud:
config:
enabled: true #change this to use config-service
retry:
maxAttempts: 3
discovery:
enabled: false
fail-fast: true
override-system-properties: false
server:
port: 8080
I have created VirutualHosts .
—————The Trace is below .
2019-03-22 18:46:12.665 INFO 15076 --- [eueA.groupA-214] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@315ce10a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2019-03-22 18:46:12.665 INFO 15076 --- [eueA.groupA-215] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-22 18:46:12.672 WARN 15076 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Socket closed)
2019-03-22 18:46:12.761 INFO 15076 --- [eueB.groupB-215] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@7a76fb88: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2019-03-22 18:46:12.761 INFO 15076 --- [eueB.groupB-216] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-22 18:46:12.767 WARN 15076 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Socket closed)
2019-03-22 18:46:17.683 INFO 15076 --- [eueA.groupA-215] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@2c25619d: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2019-03-22 18:46:17.683 INFO 15076 --- [eueA.groupA-216] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-22 18:46:17.689 WARN 15076 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Socket closed)
2019-03-22 18:46:17.781 INFO 15076 --- [eueB.groupB-216] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@7849d510: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2019-03-22 18:46:17.781 INFO 15076 --- [eueB.groupB-217] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-22 18:46:17.787 WARN 15076 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Socket closed)
2019-03-22 18:46:22.700 INFO 15076 --- [eueA.groupA-216] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@65494b62: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2019-03-22 18:46:22.700 INFO 15076 --- [eueA.groupA-217] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-22 18:46:22.706 WARN 15076 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Socket closed)
2019-03-22 18:46:22.799 INFO 15076 --- [eueB.groupB-217] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32e087c2: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2019-03-22 18:46:22.799 INFO 15076 --- [eueB.groupB-218] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-03-22 18:46:22.806 WARN 15076 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Socket closed)
am i missing some configuration. ..?
Thanks in advance .
Hi @garyrussell ,
i am facing same issue in my local enabled with debug log, can you please have a quick look.
2021-03-17 13:32:20.044 |DEBUG| localhost-startStop-1 | o.s.a.r.l.SimpleMessageListenerContainer || Starting Rabbit listener container.
2021-03-17 13:32:20.044 |INFO| localhost-startStop-1 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.357 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.372 |WARN| AMQP Connection 0:0:0:0:0:0:0:1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.372 |INFO| localhost-startStop-1 | o.s.a.r.l.SimpleMessageListenerContainer || Broker not available; cannot force queue declarations during start: java.io.IOException: No compatible authentication mechanism found — server offered [PLAIN AMQPLAIN]
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-1 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@56b6bd25: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-10 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2ca85a66: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-7 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@35ef8624: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-9 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@5c9ae5c7: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-8 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2bb84494: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-5 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2da111f8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-6 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@881c77b: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-4 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@20df0058: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-3 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@73bd5c7b: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-2 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2d04e450: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.419 |INFO| listenerContainer-2 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.441 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.441 |WARN| AMQP Connection 0:0:0:0:0:0:0:1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.441 |DEBUG| listenerContainer-2 | o.s.a.r.l.SimpleMessageListenerContainer || Recovering consumer in 5000 ms.
2021-03-17 13:32:20.441 |INFO| listenerContainer-10 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.441 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.457 |WARN| AMQP Connection 0:0:0:0:0:0:0:1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.457 |DEBUG| listenerContainer-10 | o.s.a.r.l.SimpleMessageListenerContainer || Recovering consumer in 5000 ms.
2021-03-17 13:32:20.457 |INFO| listenerContainer-5 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.472 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)2021-03-17 13:32:20.044 |DEBUG| localhost-startStop-1 | o.s.a.r.l.SimpleMessageListenerContainer || Starting Rabbit listener container.
2021-03-17 13:32:20.044 |INFO| localhost-startStop-1 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.357 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.372 |WARN| AMQP Connection 0:0:0:0:0:0:0:1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.372 |INFO| localhost-startStop-1 | o.s.a.r.l.SimpleMessageListenerContainer || Broker not available; cannot force queue declarations during start: java.io.IOException: No compatible authentication mechanism found — server offered [PLAIN AMQPLAIN]
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-1 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@56b6bd25: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-10 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2ca85a66: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-7 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@35ef8624: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-9 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@5c9ae5c7: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-8 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2bb84494: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-5 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2da111f8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-6 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@881c77b: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-4 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@20df0058: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-3 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@73bd5c7b: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.403 |DEBUG| listenerContainer-2 | o.s.a.r.l.BlockingQueueConsumer || Starting consumer Consumer@2d04e450: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2021-03-17 13:32:20.419 |INFO| listenerContainer-2 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.441 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.441 |WARN| AMQP Connection 0:0:0:0:0:0:0:1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.441 |DEBUG| listenerContainer-2 | o.s.a.r.l.SimpleMessageListenerContainer || Recovering consumer in 5000 ms.
2021-03-17 13:32:20.441 |INFO| listenerContainer-10 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.441 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.457 |WARN| AMQP Connection 0:0:0:0:0:0:0:1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
2021-03-17 13:32:20.457 |DEBUG| listenerContainer-10 | o.s.a.r.l.SimpleMessageListenerContainer || Recovering consumer in 5000 ms.
2021-03-17 13:32:20.457 |INFO| listenerContainer-5 | o.s.a.r.c.CachingConnectionFactory || Attempting to connect to: localhost:5672
2021-03-17 13:32:20.472 |WARN| AMQP Connection 127.0.0.1:5672 | c.r.c.impl.ForgivingExceptionHandler || An unexpected connection driver error occured (Exception message: Socket closed)
I’m trying to catch/cover all unexpected error such as error connection in rabbitmq. i’ve tried all what i think is related Exception (Exception, IOException, SocketException) even the general exception itself (all of this is not catching the error) so i can prefer other flow, maybe as example: doing log submit using 3rd party system, or notify admin via email while maintain code to reconnect for 3 more times instead showing this error message and stopping all execution.
Here’s the uncatched error message :
16:27:51.476 [AMQP Connection 192.168.7.167:5672] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: Socket Closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:598)
at java.lang.Thread.run(Thread.java:748)
Here’s my code :
package TestPackage;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.SocketException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
public class TestQueueSend {
static ConnectionFactory rbmqFactory;
static Connection rbmqConn;
static Channel rbmqChannel;
public static void main(String[] args){
Properties rabbitMQConf = new Properties();
try {
rabbitMQConf.load(new FileInputStream("./config/rabbitmqconf.properties"));
} catch (IOException errorLoadRabbitMQConfig) {
errorLoadRabbitMQConfig.getStackTrace();
}
rbmqFactory = new ConnectionFactory();
rbmqFactory.setUsername(rabbitMQConf.getProperty("rabbit_mq_username"));
rbmqFactory.setPassword(rabbitMQConf.getProperty("rabbit_mq_password"));
rbmqFactory.setVirtualHost(rabbitMQConf.getProperty("rabbit_mq_virtualHost"));
rbmqFactory.setHost(rabbitMQConf.getProperty("rabbit_mq_host"));
rbmqFactory.setPort(Integer.parseInt(rabbitMQConf.getProperty("rabbit_mq_port")));
System.out.println("Running RabbitMQConnection!");
try {
System.out.println("Connection START : " + rabbitMQConf.getProperty("rabbit_mq_host"));
rbmqConn = rbmqFactory.newConnection();
System.out.println("Create Channel");
rbmqChannel = rbmqConn.createChannel();
String queueName = "queueTest";
String queueContent = "Kasur ini rusak";
System.out.println("Queue Declare");
rbmqChannel.queueDeclare(queueName,false,false,false,null);
rbmqChannel.basicPublish("",queueName,null,queueContent.getBytes());
System.out.println(" [x] Sent '" + queueContent + "'");
rbmqChannel.close();
rbmqConn.close();
} catch(SocketException errorSocketCatch) {
System.out.println("ERROR_SOCKET");
System.out.println(errorSocketCatch.getMessage());
} catch(IOException errorIOCatch) {
System.out.println("ERROR_IO");
System.out.println(errorIOCatch.getMessage());
} catch (TimeoutException errorTimeoutCatch) {
System.out.println("ERROR_TIMEOUT");
System.out.println(errorTimeoutCatch.getMessage());
} catch (Exception errorGeneric) {
System.out.println("ERROR_GENERIC");
System.out.println(errorGeneric.getMessage());
}
}
}
for other information, this is what i use :
Maven 3.3
com rabbitmq amqp client 5.7.2 (5.7.1 not work too)
org slf4j slf4j api 1.7.26
ch qos logback classic:1.2.3
for the ide i’m using :
IntelliJ IDEA 2019.1.3 (Community Edition)
Build #IC-191.7479.19, built on May 28, 2019
JRE: 1.8.0_202-release-1483-b58 amd64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
Windows 10 10.0
Question
Anupam Hyanki (Anupam_Hyanki)
Instasmart Software Solutions
Director
Instasmart Software Solutions
IN
-
View Profile
-
Send Message
Member since 2016
3 posts
Instasmart Software Solutions
Posted: October 25, 2018
Last activity: March 22, 2019
Posted: 25 Oct 2018 5:18 EDT
Last activity: 22 Mar 2019 9:00 EDT
Trying to connect with Rabbit MQ — Listener is unable to connect to the queue
We are trying to connect to Rabbit MQ. The JNDI server is connecting fine. While the listener is not able to connect to the queue. Here is the exception trace.
We are trying to connect to Rabbit MQ. The JNDI server is connecting fine. While the listener is not able to connect to the queue. Here is the exception trace.
2018-10-24 15:12:15,583 [ JMS-Thread-16] [ ] [ ] [ ] ( services.jms.JMSListener) ERROR JMS.RabbitMQListener [email protected] — Caught JMSException in listener RabbitMQListener
com.rabbitmq.jms.util.RMQJMSException: Timed out establishing RabbitMQ Connection
at com.rabbitmq.jms.admin.RMQConnectionFactory.instantiateNodeConnection(RMQConnectionFactory.java:344) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.rabbitmq.jms.admin.RMQConnectionFactory.createConnection(RMQConnectionFactory.java:284) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.rabbitmq.jms.admin.RMQConnectionFactory.createConnection(RMQConnectionFactory.java:233) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.pega.pegarules.integration.engine.internal.services.jms.JMSListener.initializeConnection(JMSListener.java:1064) ~[printegrint.jar:?]
at com.pega.pegarules.integration.engine.internal.services.jms.JMSListener.runPoller(JMSListener.java:758) ~[printegrint.jar:?]
at com.pega.pegarules.integration.engine.internal.services.jms.JMSListener.run_(JMSListener.java:753) ~[printegrint.jar:?]
at com.pega.pegarules.integration.engine.internal.services.listener.ServiceListenerBaseImpl.run(ServiceListenerBaseImpl.java:454) ~[printegrint.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.util.concurrent.TimeoutException
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:315) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1095) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1054) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1012) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1170) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.jms.admin.RMQConnectionFactory$4.create(RMQConnectionFactory.java:237) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.rabbitmq.jms.admin.RMQConnectionFactory.instantiateNodeConnection(RMQConnectionFactory.java:328) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
Data Integration
Posted: 25 Oct 2018 5:18 EDT
Anupam Hyanki (Anupam_Hyanki)
Instasmart Software Solutions
Director
Instasmart Software Solutions
IN
-
View Profile
-
Send Message
Whats the difference with destinationName and amqpQueueName ?
<Resource name=»jms/PegaQueue»
type=»javax.jms.Queue»
factory=»com.rabbitmq.jms.admin.RMQObjectFactory»
destinationName=»»
amqpExchangeName=»pegasystems»
amqpRoutingKey=»pega.app.*»
amqp=»true»
amqpQueueName=»Pega.Case.Order.CreateOrUpdate.Request» />
Posted: 25 Oct 2018 5:18 EDT
Anupam Hyanki (Anupam_Hyanki)
Instasmart Software Solutions
Director
Instasmart Software Solutions
IN
-
View Profile
-
Send Message
This is the detailed exception
This is the detailed exception
[AMQP Connection 192.168.101.10:15672] WARN com.rabbitmq.client.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
2018-10-25 10:06:22,341 [ JMS-Thread-16] [ ] [ ] [ ] ( services.jms.JMSListener) ERROR JMS.RabbitMQListener [email protected] — Caught JMSException in listener RabbitMQListener
com.rabbitmq.jms.util.RMQJMSException: Timed out establishing RabbitMQ Connection
at com.rabbitmq.jms.admin.RMQConnectionFactory.instantiateNodeConnection(RMQConnectionFactory.java:344) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.rabbitmq.jms.admin.RMQConnectionFactory.createConnection(RMQConnectionFactory.java:284) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.rabbitmq.jms.admin.RMQConnectionFactory.createConnection(RMQConnectionFactory.java:233) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.pega.pegarules.integration.engine.internal.services.jms.JMSListener.initializeConnection(JMSListener.java:1064) ~[printegrint.jar:?]
at com.pega.pegarules.integration.engine.internal.services.jms.JMSListener.runPoller(JMSListener.java:758) ~[printegrint.jar:?]
at com.pega.pegarules.integration.engine.internal.services.jms.JMSListener.run_(JMSListener.java:753) ~[printegrint.jar:?]
at com.pega.pegarules.integration.engine.internal.services.listener.ServiceListenerBaseImpl.run(ServiceListenerBaseImpl.java:454) ~[printegrint.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.util.concurrent.TimeoutException
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:315) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1095) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1054) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1012) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1170) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.jms.admin.RMQConnectionFactory$4.create(RMQConnectionFactory.java:237) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
at com.rabbitmq.jms.admin.RMQConnectionFactory.instantiateNodeConnection(RMQConnectionFactory.java:328) ~[rabbitmq-jms-1.11.0.jar:1.11.0]
Posted: 21 Mar 2019 15:21 EDT
this looks to be the RabbitMQ client code. One suggestion, write a simple standalone jms java client completely outside Pega to verify the same client API calls (run the client on the same host as pega server). If the same error, you can then focus on why socket_close exceptions, could be networking related.
Позвольте мне начать с того, что я только начал заниматься AMQP.
Я хочу потреблять/извлекать данные из очереди. Я использую библиотеки Spring (spring-boot-starter-amqp), чтобы упростить задачу. У меня есть класс слушателя с методом, аннотированным @RabbitListener, где я устанавливаю очередь. Все остальное настраивается через свойства:
rabbitmq:
username: user
password: password
virtual-host: virtual-host
port: 5672
host: host
queue: _316_
listener:
simple:
retry:
enabled: true
initial-interval: 1000
max-attempts: 8
max-interval: 10000
multiplier: 2.0
stateless: true
Все работает нормально, пока я не отключу хост на некоторое время. Когда это происходит, соединение разрывается и предпринимаются попытки восстановить его. После повторного установления соединения прослушиватель не начинает получать сообщения. После перезапуска приложения все в порядке, но я уверен, что его можно как-то настроить, чтобы потребитель продолжал перезапускаться, по крайней мере, он должен попытаться сделать это после восстановления соединения (или, по крайней мере, это то, что я ожидал).
После разрыва соединения в логах можно найти следующее:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); [email protected]: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected], localPort= 36678], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting [email protected]: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected], localPort= 36678], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failurentat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)ntat java.base/java.lang.Thread.run(Thread.java:835)nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.ntat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)ntat
Затем делается попытка подключения, и мы в цикле:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED
Пока соединение не будет восстановлено:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/[email protected] [delegate=amqp://[email protected], localPort= 50574]
И больше ничего не происходит — никакие сообщения не потребляются.
ОБНОВЛЕНИЕ: последовал совет и включил ведение журнала DEBUG.
Когда приложение запускается, мы:
- запуск контейнера прослушивателя
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
-
создание соединения
-
стартовый потребитель
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer [email protected]: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
- создаем канал и начинаем потреблять
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://[email protected],1)
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: [email protected]: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected], localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: [email protected]: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected], localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in [email protected]: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected], localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[[email protected](byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])
Это продолжается до тех пор, пока соединение не прервется:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); [email protected]: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp:///[email protected], localPort= 37906], acknowledgeMode=AUTO local queue size=0
Канал закрыт, и пользователь каким-то образом удаляется, как в журнале говорится:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)
Проблема с драйвером подключения, за которой следует исключение:
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)
org.springframework.amqp.rabbit.support.ConsumerCancelledException: nullntat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)ntat java.base/java.lang.Thread.run(Thread.java:835
Потребитель выдает исключение и говорит, что обработка может r-e-s-t-a-r-t
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it
Переформулировка бывает:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting [email protected]: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected], localPort= 37906], acknowledgeMode=AUTO local queue size=0
Каналы закрыты:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected],1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected], localPort= 37906]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://[email protected],1)
Запускается новый потребитель:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer [email protected]: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
Мы пытаемся подключиться, что заканчивается ошибкой WARN и AUTHENTICATION (поскольку в предыдущем журнале говорилось, что пользователь был удален?):
An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failurentat
ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.ntat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)ntat
Потребитель, который пытался запустить:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup
И он (потребитель) отменяется:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling [email protected]: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
Канал закрыт и контейнер останавливается:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer
И затем контейнер закрывается:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container
Ждем завершения worker’ов, все успешно, потом снова пытаемся подключиться, снова и снова логируется один и тот же SOCKET_CLOSED.
Затем хост возвращается и соединение восстанавливается. Кэшированный Rabit Channel создается и ничего не происходит.
Я предполагаю, что проблема в том, что контейнер был закрыт и больше не вернулся к жизни, поэтому потребителей нет.
ЧТО РАБОТАЛО:
Я создал класс с методом «прослушивания», который принимает ListenerContainerConsumerFailedEvent. Этот класс имеет RabbitListenerEndpointRegistry (компонент, который Boot удобно создал для меня), и всякий раз, когда вызывается этот метод, я проверяю, работает ли listenerContainer, если нет, то я запускаю его (эта проверка, скорее всего, избыточна).
@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
if (!listenerContainer.isRunning()){
listenerContainer.start();
}
}
Позвольте мне начать с того, что я только начал баловаться AMQP.
Я хочу получать / извлекать данные из очереди. Я использую библиотеки Spring (spring-boot-starter-amqp), чтобы упростить задачу. У меня есть класс слушателя с методом, аннотированным с помощью @RabbitListener, где я устанавливаю очередь. Все остальное настраивается через свойства:
rabbitmq:
username: user
password: password
virtual-host: virtual-host
port: 5672
host: host
queue: _316_
listener:
simple:
retry:
enabled: true
initial-interval: 1000
max-attempts: 8
max-interval: 10000
multiplier: 2.0
stateless: true
Все работает нормально, пока я не сделаю хост на некоторое время недоступным. Когда это происходит, соединение разрывается и предпринимаются попытки восстановить его. После восстановления соединения слушатель не начинает получать сообщения. После перезапуска приложения все в порядке, но я уверен, что это можно настроить так, чтобы потребитель продолжал перезагружаться, по крайней мере, он должен попытаться сделать это после восстановления соединения (или, по крайней мере, это то, что я ожидал).
После разрыва соединения в журналах можно найти:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); Consumer@22ead351: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@22ead351: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failurentat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)ntat java.base/java.lang.Thread.run(Thread.java:835)nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.ntat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)ntat
Затем делается попытка подключения, и мы зацикливаемся:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED
Пока соединение не будет восстановлено:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/SimpleConnection@3931e0ad [delegate=amqp://user@host, localPort= 50574]
И больше ничего не происходит — сообщения не потребляются.
ОБНОВЛЕНИЕ : Последовал совету и включил ведение журнала DEBUG.
Когда приложение запускается, мы:
- запуск контейнера слушателя
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
-
создание связи
-
начальный потребитель
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@3daf03d8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
- создание канала и начало потребления
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://user@host,1)
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[B@4b817fae(byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])
Это продолжается до тех пор, пока соединение не разорвется:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp:///user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
Канал отключен, и пользователь каким-то образом удаляется, как говорится в журнале:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)
Проблема с драйвером подключения, после чего генерируется исключение:
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)
org.springframework.amqp.rabbit.support.ConsumerCancelledException: nullntat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)ntat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)ntat java.base/java.lang.Thread.run(Thread.java:835
Потребитель заявляет об исключении, что обработка может р-е-с-т-а-р-т
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it
Повторение происходит:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
Каналы закрываются:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://user@host,1)
Новый потребитель начинает:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
Мы пытаемся подключиться, что приводит к сбою WARN и AUTHENTICATION (потому что в предыдущем журнале говорилось, что этот пользователь был удален?):
An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failurentat
ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.ntat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)ntat
Потребитель, который пытался запустить:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup
И он (потребитель) отменяется:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
Канал закрыт и контейнер останавливается:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer
А затем контейнер закрывается:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container
Мы ждем завершения рабочих процессов, это успешно, затем мы пытаемся подключиться снова, один и тот же SOCKET_CLOSED регистрируется снова и снова.
Затем хост возвращается и соединение восстанавливается. Кэшированный канал Rabit создается и ничего не происходит.
Я предполагаю, что проблема в том, что контейнер был отключен и больше не ожил, следовательно, нет потребителей.
ЧТО Сработало :
Я создал класс, у которого есть метод «прослушивания», который принимает событие ListenerContainerConsumerFailedEvent. В этом классе есть RabbitListenerEndpointRegistry (bean-компонент, который для меня был создан Boot), и всякий раз, когда вызывается этот метод, я проверяю, запущен ли listenerContainer, если нет, то я запускаю его (эта проверка, скорее всего, избыточна).
@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
if (!listenerContainer.isRunning()){
listenerContainer.start();
}
}
Rod
unread,
Jul 11, 2019, 11:52:12 PM7/11/19
to rabbitmq-users
Hi, I’m facing the following issue when using perfTest tool:
#20:38:32.753 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
20:40:41.648 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
#20:40:45.082 [AMQP Connection 0:0:0:0:0:0:0:1:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
20:43:23.786 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
#20:43:26.797 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
In my publish-consume-spec.js I have this content:
[
{‘name’: ‘message-sizes-and-producers’,
‘type’: ‘varying’,
‘params’: [{‘time-limit’: 30,
‘consumer-count’: 90}],
‘variables’: [{‘name’: ‘min-msg-size’,
‘values’: [0, 1000, 10000, 100000]},
{‘name’: ‘producer-count’,
‘values’: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}]}
]
I’m not sure why it is failing. For example for another scenario, it sometimes is passing and sometimes not (with the same error). This is the scenario:
{‘name’: ‘no-ack’,
‘type’: ‘simple’,
‘params’: [{‘time-limit’: 30}]
}
Also, If I run perfTest command just plain (bin/runjava com.rabbitmq.perf.PerfTest) I don’t see any error.
Arnaud Cogoluègnes
unread,
Jul 12, 2019, 10:44:30 AM7/12/19
to rabbitm…@googlegroups.com
Rod
unread,
Jul 12, 2019, 11:00:12 PM7/12/19
to rabbitmq-users
No Results at all. Now Randomly I’m getting success but eventually, if I change some variables (more publishers, more consumers, etc) I’m going to start getting the error again. Even now, I’m getting randomly this error with minimal configuration( 1 publisher 1,2,3…8 consumers)
El viernes, 12 de julio de 2019, 3:44:30 (UTC-4), Arnaud Cogoluègnes escribió:
Does the run crash completely (no results)? There can be some spurious
warnings when a run stops and PerfTestMulti moves on to the next one.
On Thu, Jul 11, 2019 at 10:52 PM Rod <rodrigo…@gmail.com> wrote:
>
> Hi, I’m facing the following issue when using perfTest tool:
>
>
> #20:38:32.753 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
>
> 20:40:41.648 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
>
> #20:40:45.082 [AMQP Connection 0:0:0:0:0:0:0:1:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
>
> 20:43:23.786 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
>
> #20:43:26.797 [AMQP Connection some_ip:5672] WARN c.r.c.impl.ForgivingExceptionHandler — An unexpected connection driver error occured (Exception message: Socket closed)
>
>
>
> In my publish-consume-spec.js I have this content:
>
> [
>
> {‘name’: ‘message-sizes-and-producers’,
> ‘type’: ‘varying’,
> ‘params’: [{‘time-limit’: 30,
> ‘consumer-count’: 90}],
> ‘variables’: [{‘name’: ‘min-msg-size’,
> ‘values’: [0, 1000, 10000, 100000]},
> {‘name’: ‘producer-count’,
> ‘values’: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}]}
> ]
>
>
>
>
> I’m not sure why it is failing. For example for another scenario, it sometimes is passing and sometimes not (with the same error). This is the scenario:
>
> {‘name’: ‘no-ack’,
> ‘type’: ‘simple’,
> ‘params’: [{‘time-limit’: 30}]
> }
>
>
>
> Also, If I run perfTest command just plain (bin/runjava com.rabbitmq.perf.PerfTest) I don’t see any error.
>
> —
> You received this message because you are subscribed to the Google Groups «rabbitmq-users» group.
Arnaud Cogoluègnes
unread,
Jul 15, 2019, 4:19:29 PM7/15/19
to rabbitm…@googlegroups.com
I changed your configuration file a bit to make the run shorter, but I
could not reproduce:
[
{‘name’: ‘message-sizes-and-producers’,
‘type’: ‘varying’,
‘params’: [{‘time-limit’: 15,
‘consumer-count’: 10}],
‘variables’: [{‘name’: ‘min-msg-size’,
‘values’: [1000, 10000]},
{‘name’: ‘producer-count’,
‘values’: [1, 2, 3, 4, 5]}]}
]
The test runs and results for the different scenarios are dumped in
the result file.
Can you please make sure that you’re using the latest version of
PerfTest (2.8.1 currently).
You can also add more log [1]: create a perf-test-log-config.xml file
with the following content:
<configuration>
<appender name=»STDOUT» class=»ch.qos.logback.core.ConsoleAppender»>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} —
%msg%n</pattern>
</encoder>
</appender>
<logger name=»com.rabbitmq.perf» level=»debug» />
<root level=»warn»>
<appender-ref ref=»STDOUT» />
</root>
</configuration>
And run PerfTestMulti like this:
bin/runjava -Dlogback.configurationFile=./perf-test-log-config.xml
com.rabbitmq.perf.PerfTestMulti publish-consume-spec.js
publish-consume-result.js
Then please post the console output in a file in this thread.
[1] https://github.com/rabbitmq/rabbitmq-perf-test#logging
> To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user…@googlegroups.com.
> To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/5ab1425b-440b-4151-8663-7e4fafa67093%40googlegroups.com.
Rod
unread,
Jul 15, 2019, 8:07:34 PM7/15/19
to rabbitmq-users
Question:
what should be the content of the html file for the test you gave me?
This is my content but somehow, I’m able to just see the title of my scenario:
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "https://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>RabbitMQ Performance</title>
<link href="../perf.css" rel="stylesheet" type="text/css">
<!--[if lte IE 8]><script language="javascript" type="text/javascript" src="../lib/excanvas.min.js"></script><![endif]-->
<script language="javascript" type="text/javascript" src="../lib/jquery.min.js"></script>
<script language="javascript" type="text/javascript" src="../lib/jquery.flot.min.js"></script>
<script language="javascript" type="text/javascript" src="../perf.js"></script>
<script language="javascript" type="text/javascript">
$(document).ready(function() {
var main_results;
$.ajax({
url: 'publish-consume-results-rod.js',
success: function(data) {
render_graphs(JSON.parse(data));
},
fail: function() { alert('error loading publish-consume-results-rod.js'); }
});
});
</script>
</head>
<body>
<h1>RabbitMQ Performance Example</h1>
<h3>message-sizes-and-producers</h3>
<div class="chart"
data-type="time"
data-latency="true"
data-x-axis="time (s)"
data-y-axis="rate (msg/s)"
data-y-axis2="latency (μs)"
data-scenario="message-sizes-and-producers"></div>
</body>
</html>
[root@ip-10-14-19-204 examples]#
Arnaud Cogoluègnes
unread,
Jul 16, 2019, 9:51:56 AM7/16/19
to rabbitm…@googlegroups.com