博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
3、RabbitMQ-work queues 工作队列
阅读量:5837 次
发布时间:2019-06-18

本文共 12136 字,大约阅读时间需要 40 分钟。

work queues 工作队列

1、模型图:

为什么会出现 work queues?
前提:使用 simple 队列的时候
我们应用程序在是使用消息系统的时候,一般生产者 P 生产消息是毫不费力的(发送消息即可),
而消费者接收完消息后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在
队列里面,一个消费者有可能不够用
 
那么怎么让消费者同事处理多个消息呢?
在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了
 
使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通
过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易
 

 

2、代码实例(轮询分发)

生产者进行生产消息

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.util.ConnectionUtils;public class Send {          private static final String  QUEUE_NAME  ="test_work_queue";          public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {           //获取连接           Connection conn = ConnectionUtils.getConnection();                      //获取Channel           Channel channel= conn.createChannel();           //声明队列           channel.queueDeclare(QUEUE_NAME, false, false,  false,null);                      //连续发送50个消息           for(int i = 1; i<=50;i++){                String msg = "work" + i;                channel.basicPublish("",QUEUE_NAME, null,  msg.getBytes());                Thread.sleep(1000);           }            channel.close();           conn.close();     }}

 

 

消费者1:

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive {          private static final String  QUEUE_NAME  ="test_work_queue";          public static void main(String[] args) throws IOException,  TimeoutException {                      Connection conn = ConnectionUtils.getConnection();                      Channel channel = conn.createChannel();           //声明队列           channel.queueDeclareNoWait(QUEUE_NAME, false, false,  false, null);                      //定义一个消费者           Consumer consumer = new DefaultConsumer(channel){                //收到消息就会触发这个方法                @Override                public void handleDelivery(String consumerTag,  Envelope envelope, BasicProperties properties, byte[] body)                           throws IOException {                     String msg = new String(body,"utf-8");                     System.out.println("消费者1接收到的消息" +  msg);                                          try {                           Thread.sleep(1500);                     } catch (InterruptedException e) {                           e.printStackTrace();                     }finally{                           System.out.println("消费者1处理完成!");                     }                }           };           //监听队列           boolean autoAck = true;           channel.basicConsume(QUEUE_NAME, autoAck, consumer);     }}

 

 

消费者2:

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive2 {        private static final String  QUEUE_NAME ="test_work_queue";        public static void main(String[] args) throws IOException, TimeoutException {                Connection conn = ConnectionUtils.getConnection();                Channel channel = conn.createChannel();        //声明队列        channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);                //定义一个消费者        Consumer consumer = new DefaultConsumer(channel){            //收到消息就会触发这个方法            @Override            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)                    throws IOException {                String msg = new String(body,"utf-8");                System.out.println("消费者2接收到的消息" + msg);                                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally{                    System.out.println("消费者2处理完成!");                }            }        };        //监听队列        boolean autoAck = true;        channel.basicConsume(QUEUE_NAME, autoAck, consumer);     }}

 

提前开启消费者进行对消息队列的监听!!!

 

此时开启消息的生产:
消费者2:
消费者2接收到的消息work2消费者2处理完成!消费者2接收到的消息work4消费者2处理完成!消费者2接收到的消息work6消费者2处理完成!消费者2接收到的消息work8消费者2处理完成!消费者2接收到的消息work10消费者2处理完成!消费者2接收到的消息work12消费者2处理完成!消费者2接收到的消息work14消费者2处理完成!消费者2接收到的消息work16消费者2处理完成!消费者2接收到的消息work18消费者2处理完成!消费者2接收到的消息work20消费者2处理完成!消费者2接收到的消息work22消费者2处理完成!消费者2接收到的消息work24消费者2处理完成!消费者2接收到的消息work26消费者2处理完成!消费者2接收到的消息work28消费者2处理完成!消费者2接收到的消息work30消费者2处理完成!消费者2接收到的消息work32消费者2处理完成!消费者2接收到的消息work34消费者2处理完成!消费者2接收到的消息work36消费者2处理完成!消费者2接收到的消息work38消费者2处理完成!消费者2接收到的消息work40消费者2处理完成!消费者2接收到的消息work42消费者2处理完成!消费者2接收到的消息work44消费者2处理完成!消费者2接收到的消息work46消费者2处理完成!消费者2接收到的消息work48消费者2处理完成!消费者2接收到的消息work50消费者2处理完成!

 

消费者1:

消费者2接收到的消息work2消费者2处理完成!消费者2接收到的消息work4消费者2处理完成!消费者2接收到的消息work6消费者2处理完成!消费者2接收到的消息work8消费者2处理完成!消费者2接收到的消息work10消费者2处理完成!消费者2接收到的消息work12消费者2处理完成!消费者2接收到的消息work14消费者2处理完成!消费者2接收到的消息work16消费者2处理完成!消费者2接收到的消息work18消费者2处理完成!消费者2接收到的消息work20消费者2处理完成!消费者2接收到的消息work22消费者2处理完成!消费者2接收到的消息work24消费者2处理完成!消费者2接收到的消息work26消费者2处理完成!消费者2接收到的消息work28消费者2处理完成!消费者2接收到的消息work30消费者2处理完成!消费者2接收到的消息work32消费者2处理完成!消费者2接收到的消息work34消费者2处理完成!消费者2接收到的消息work36消费者2处理完成!消费者2接收到的消息work38消费者2处理完成!消费者2接收到的消息work40消费者2处理完成!消费者2接收到的消息work42消费者2处理完成!消费者2接收到的消息work44消费者2处理完成!消费者2接收到的消息work46消费者2处理完成!消费者2接收到的消息work48消费者2处理完成!消费者2接收到的消息work50消费者2处理完成!

 

测试结果:
1.消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
2.消费者 1 和消费者 2 货到的消息数量是一样的 一个奇数一个偶数
按道理消费者 1 获取的比消费者 2 要多
 
这种方式叫做
轮询分发 结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,
任务总是你一个我一个的分

 

官方文档的解释如下:

 

 

 

3、代码实例(公平派遣/公平分发)Fair dispatch

您可能已经注意到调度仍然无法完全按照我们的意愿运行。例如,在有两个工人的情况下,当所有奇怪的消息都
很重,甚至消息很轻时,一个工人将经常忙,而另一个工作人员几乎不会做任何工作。那么,RabbitMQ对此一无
所知,仍然会均匀地发送消息。
 
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲
目地向第n个消费者发送每个第n个消息。

 

 

我们可以使用basicQos方法和 prefetchCount = 1设置。这告诉RabbitMQ一次不向
一个worker发送一条消息。或者,换句话说,
在处理并确认前一个消息之前,不要
向工作人员发送新消息。相反,它会将它发送给下一个仍然很忙的工人。

 

还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答

 

总结:能者多劳

 

生产者:

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.util.ConnectionUtils;public class Send {        private static final String  QUEUE_NAME ="test_work_queue";        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        //获取连接        Connection conn = ConnectionUtils.getConnection();                //获取Channel        Channel channel= conn.createChannel();        //声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false,null);                //每个消费者发送确认消息之前,消费队列不发送下一个消息到消费者,一个只处理一个消息        //限制发送给同一个消费者不得超过一个消息        int prefetchCount = 1;        channel.basicQos(prefetchCount );                //连续发送50个消息        for(int i = 1; i<=50;i++){            String msg = "work" + i;            channel.basicPublish("",QUEUE_NAME, null, msg.getBytes());            Thread.sleep(i + 50 );        }        channel.close();        conn.close();    }}

 

 

消费者1:

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive {        private static final String  QUEUE_NAME ="test_work_queue";        public static void main(String[] args) throws IOException, TimeoutException {                Connection conn = ConnectionUtils.getConnection();                Channel channel = conn.createChannel();        //声明队列        channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);                //保证一次只分发一次        int prefetchCount = 1;        channel.basicQos(prefetchCount );                //定义一个消费者        Consumer consumer = new DefaultConsumer(channel){            //收到消息就会触发这个方法            @Override            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)                    throws IOException {                String msg = new String(body,"utf-8");                System.out.println("消费者1接收到的消息" + msg);                                try {                    Thread.sleep(1500);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally{                    System.out.println("消费者1处理完成!");                    //手动回执                    channel.basicAck(envelope.getDeliveryTag(), false);                }                            }        };        //监听队列        //自动应答false        boolean autoAck = false;        channel.basicConsume(QUEUE_NAME, autoAck, consumer);            }}

 

 

消费者2:

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive2 {        private static final String  QUEUE_NAME ="test_work_queue";        public static void main(String[] args) throws IOException, TimeoutException {                Connection conn = ConnectionUtils.getConnection();                Channel channel = conn.createChannel();        //声明队列        channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);                int prefetchCount = 1;        channel.basicQos(prefetchCount );                //定义一个消费者        Consumer consumer = new DefaultConsumer(channel){            //收到消息就会触发这个方法            @Override            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)                    throws IOException {                String msg = new String(body,"utf-8");                System.out.println("消费者2接收到的消息" + msg);                                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally{                    System.out.println("消费者2处理完成!");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        //监听队列        boolean autoAck = false;        channel.basicConsume(QUEUE_NAME, autoAck, consumer);            }}

 

消费者1:

消费者1接收到的消息work1消费者1处理完成!消费者1接收到的消息work4消费者1处理完成!消费者1接收到的消息work6消费者1处理完成!消费者1接收到的消息work9消费者1处理完成!消费者1接收到的消息work11消费者1处理完成!消费者1接收到的消息work14消费者1处理完成!消费者1接收到的消息work16消费者1处理完成!消费者1接收到的消息work19消费者1处理完成!消费者1接收到的消息work21消费者1处理完成!消费者1接收到的消息work24消费者1处理完成!消费者1接收到的消息work26消费者1处理完成!消费者1接收到的消息work29消费者1处理完成!消费者1接收到的消息work31消费者1处理完成!消费者1接收到的消息work34消费者1处理完成!消费者1接收到的消息work36消费者1处理完成!消费者1接收到的消息work39消费者1处理完成!消费者1接收到的消息work41消费者1处理完成!消费者1接收到的消息work44消费者1处理完成!消费者1接收到的消息work46消费者1处理完成!消费者1接收到的消息work49消费者1处理完成!

消费者2:

消费者2接收到的消息work2消费者2处理完成!消费者2接收到的消息work3消费者2处理完成!消费者2接收到的消息work5消费者2处理完成!消费者2接收到的消息work7消费者2处理完成!消费者2接收到的消息work8消费者2处理完成!消费者2接收到的消息work10消费者2处理完成!消费者2接收到的消息work12消费者2处理完成!消费者2接收到的消息work13消费者2处理完成!消费者2接收到的消息work15消费者2处理完成!消费者2接收到的消息work17消费者2处理完成!消费者2接收到的消息work18消费者2处理完成!消费者2接收到的消息work20消费者2处理完成!消费者2接收到的消息work22消费者2处理完成!消费者2接收到的消息work23消费者2处理完成!消费者2接收到的消息work25消费者2处理完成!消费者2接收到的消息work27消费者2处理完成!消费者2接收到的消息work28消费者2处理完成!消费者2接收到的消息work30消费者2处理完成!消费者2接收到的消息work32消费者2处理完成!消费者2接收到的消息work33消费者2处理完成!消费者2接收到的消息work35消费者2处理完成!消费者2接收到的消息work37消费者2处理完成!消费者2接收到的消息work38消费者2处理完成!消费者2接收到的消息work40消费者2处理完成!消费者2接收到的消息work42消费者2处理完成!消费者2接收到的消息work43消费者2处理完成!消费者2接收到的消息work45消费者2处理完成!消费者2接收到的消息work47消费者2处理完成!消费者2接收到的消息work48消费者2处理完成!消费者2接收到的消息work50消费者2处理完成!

这时候现象就是消费者 1 速度小于消费者 2

 

体现一句话:能者多劳

转载于:https://www.cnblogs.com/Mrchengs/p/10530839.html

你可能感兴趣的文章
Kconfig的格式
查看>>
关于Cursor的moveToFirst和moveToNext的意义
查看>>
个人--工资划分5份
查看>>
有关文件下载的文件名
查看>>
史上最详细的wamp配置虚拟域名步骤
查看>>
oracle 授权
查看>>
lv扩展磁盘空间
查看>>
java8之stream流的基本操作
查看>>
二维数组计算协方差java
查看>>
SpringBoot下Redis相关配置是如何被初始化的
查看>>
为你的AliOS Things应用增加自定义cli命令
查看>>
MongoDB 创建基础索引、组合索引、唯一索引以及优化
查看>>
百度PaddlePaddle常规赛NLP赛道火热开启
查看>>
稳了!这才是cookie,session与token的真正区别
查看>>
python项目实战:制作一个简易的GUI界面浏览器
查看>>
OSChina 周二乱弹 —— 假期余额已不足!
查看>>
前端那些事之React篇--helloword
查看>>
Oracle11g及PL/SQL Developer的安装和配置
查看>>
ios的google解析XML框架GDataXML的配置及使用
查看>>
netty-当一个客户端连接到来的时候发生了什么
查看>>