场景:需要基于队列实现消息传递,保证消息不被丢弃和稳定
安装
安装erlang
1
| yum install -y epel-release
|
1
| wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
|
1
| rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
|
安装RabitMq
1
| rpm -Uvh rabbitmq-server-3.9.12-1.el7.noarch.rpm
|
1
| systemctl start rabbitmq-server
|
1
| systemctl status rabbitmq-server
|
1
| rabbitmq-plugins enable rabbitmq_management
|
参考
1 2 3
| rabbitmqctl add_user admin 123456 rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
|
1
| http://192.168.226.128:15672
|
简单实现
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| import com.rabbitmq.client.*;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeoutException;
public class provider {
private static final String QUEUE_NAME = "myQueue"; private static final String EXCHANGE_NAME = "myDirectExchange"; private static final String ROUTE_KEY = "routeKey"; private static final boolean QUEUE_PERSISTENCE = true; private static final boolean MESSAGE_SHART = false; private static final boolean AUTO_DELETE = false;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.226.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,QUEUE_PERSISTENCE,MESSAGE_SHART,AUTO_DELETE,null); channel.confirmSelect();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTE_KEY,null);
ConfirmCallback ackCallback = (deliveryTag, multiple) -> { System.out.println("["+deliveryTag +"]发送成功"); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("["+deliveryTag +"]发送失败"); }; channel.addConfirmListener(ackCallback, nackCallback);
Scanner scanner = new Scanner(System.in); while(scanner.hasNextLine()){ String str = scanner.next();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build(); channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,basicProperties,str.getBytes(StandardCharsets.UTF_8)); while(true){ if(channel.waitForConfirms()){ System.out.println("消息发送成功:["+str+"]"); break; } } } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| import com.rabbitmq.client.*;
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;
public class consumer {
private static final String QUEUE_NAME = "myQueue"; private static final String EXCHANGE_NAME = "myDirectExchange"; private static final String CONSUMER_ROUTE_KEY = "routeKey1"; private static final String DEAD_QUEUE_NAME = "myDeadQueue"; private static final String DEAD_EXCHANGE_NAME = "myDeadDirectExchange"; private static final String CONSUMER_DEAD_ROUTE_KEY = "routeKeyDead"; private static final boolean AUTO_ACK = false; private static final boolean BATCH_ACK_FLAG = false;
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.226.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,CONSUMER_ROUTE_KEY,null);
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); arguments.put("x-dead-letter-routing-key",CONSUMER_DEAD_ROUTE_KEY); arguments.put("x-max-length",20); channel.exchangeDeclare(DEAD_EXCHANGE_NAME,BuiltinExchangeType.DIRECT); channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANGE_NAME,CONSUMER_DEAD_ROUTE_KEY,null); channel.queueDeclare(QUEUE_NAME,false,false,false,arguments); channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,arguments);
DeliverCallback deliverCallback = (consumerTag,message) ->{ System.out.println("消费者["+consumerTag+"]:" + "消息序号["+message.getEnvelope().getDeliveryTag()+"]:" + "路由键["+message.getEnvelope().getRoutingKey()+"]:"+ "内容["+new String(message.getBody())+"]"); channel.basicAck(message.getEnvelope().getDeliveryTag(),BATCH_ACK_FLAG); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("这是返回的回调"); };
channel.basicConsume(QUEUE_NAME,AUTO_ACK,deliverCallback,cancelCallback); } }
|
注:
一个信道可以对应多个交换机,一个交换机可以绑定多个队列
交换机和队列中间有个routing_key(路由键),路由键在交换机是direct的时候起作用
死信队列出现的情况
消息的应答方式有自动应答和手动应答,手动应答时可通过回调函数处理逻辑