work queues 工作队列
1、模型图:
为什么会出现 work queues?
前提:使用 simple 队列的时候
我们应用程序在是使用消息系统的时候,一般生产者 P 生产消息是毫不费力的(发送消息即可),
而消费者接收完消息后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在
队列里面,一个消费者有可能不够用
那么怎么让消费者同事处理多个消息呢?
在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了
使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通
过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易
2、代码实例(轮询分发)
生产者进行生产消息
import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.util.ConnectionUtils;public class Send { private static final String QUEUE_NAME ="test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //获取连接 Connection conn = ConnectionUtils.getConnection(); //获取Channel Channel channel= conn.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); //连续发送50个消息 for(int i = 1; i<=50;i++){ String msg = "work" + i; channel.basicPublish("",QUEUE_NAME, null, msg.getBytes()); Thread.sleep(1000); } channel.close(); conn.close(); }}
消费者1:
import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive { private static final String QUEUE_NAME ="test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //声明队列 channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者1接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消费者1处理完成!"); } } }; //监听队列 boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); }}
消费者2:
import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive2 { private static final String QUEUE_NAME ="test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //声明队列 channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者2接收到的消息" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消费者2处理完成!"); } } }; //监听队列 boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); }}
提前开启消费者进行对消息队列的监听!!!
此时开启消息的生产:
消费者2:
消费者2接收到的消息work2消费者2处理完成!消费者2接收到的消息work4消费者2处理完成!消费者2接收到的消息work6消费者2处理完成!消费者2接收到的消息work8消费者2处理完成!消费者2接收到的消息work10消费者2处理完成!消费者2接收到的消息work12消费者2处理完成!消费者2接收到的消息work14消费者2处理完成!消费者2接收到的消息work16消费者2处理完成!消费者2接收到的消息work18消费者2处理完成!消费者2接收到的消息work20消费者2处理完成!消费者2接收到的消息work22消费者2处理完成!消费者2接收到的消息work24消费者2处理完成!消费者2接收到的消息work26消费者2处理完成!消费者2接收到的消息work28消费者2处理完成!消费者2接收到的消息work30消费者2处理完成!消费者2接收到的消息work32消费者2处理完成!消费者2接收到的消息work34消费者2处理完成!消费者2接收到的消息work36消费者2处理完成!消费者2接收到的消息work38消费者2处理完成!消费者2接收到的消息work40消费者2处理完成!消费者2接收到的消息work42消费者2处理完成!消费者2接收到的消息work44消费者2处理完成!消费者2接收到的消息work46消费者2处理完成!消费者2接收到的消息work48消费者2处理完成!消费者2接收到的消息work50消费者2处理完成!
消费者1:
消费者2接收到的消息work2消费者2处理完成!消费者2接收到的消息work4消费者2处理完成!消费者2接收到的消息work6消费者2处理完成!消费者2接收到的消息work8消费者2处理完成!消费者2接收到的消息work10消费者2处理完成!消费者2接收到的消息work12消费者2处理完成!消费者2接收到的消息work14消费者2处理完成!消费者2接收到的消息work16消费者2处理完成!消费者2接收到的消息work18消费者2处理完成!消费者2接收到的消息work20消费者2处理完成!消费者2接收到的消息work22消费者2处理完成!消费者2接收到的消息work24消费者2处理完成!消费者2接收到的消息work26消费者2处理完成!消费者2接收到的消息work28消费者2处理完成!消费者2接收到的消息work30消费者2处理完成!消费者2接收到的消息work32消费者2处理完成!消费者2接收到的消息work34消费者2处理完成!消费者2接收到的消息work36消费者2处理完成!消费者2接收到的消息work38消费者2处理完成!消费者2接收到的消息work40消费者2处理完成!消费者2接收到的消息work42消费者2处理完成!消费者2接收到的消息work44消费者2处理完成!消费者2接收到的消息work46消费者2处理完成!消费者2接收到的消息work48消费者2处理完成!消费者2接收到的消息work50消费者2处理完成!
测试结果:
1.消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
2.消费者 1 和消费者 2 货到的消息数量是一样的 一个奇数一个偶数
按道理消费者 1 获取的比消费者 2 要多
这种方式叫做 轮询分发 结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,
任务总是你一个我一个的分
官方文档的解释如下:
3、代码实例(公平派遣/公平分发)Fair dispatch
您可能已经注意到调度仍然无法完全按照我们的意愿运行。例如,在有两个工人的情况下,当所有奇怪的消息都
很重,甚至消息很轻时,一个工人将经常忙,而另一个工作人员几乎不会做任何工作。那么,RabbitMQ对此一无
所知,仍然会均匀地发送消息。
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲
目地向第n个消费者发送每个第n个消息。
我们可以使用basicQos方法和 prefetchCount = 1设置。这告诉RabbitMQ一次不向
一个worker发送一条消息。或者,换句话说, 在处理并确认前一个消息之前,不要
向工作人员发送新消息。相反,它会将它发送给下一个仍然很忙的工人。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答
总结:能者多劳
生产者:
import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.util.ConnectionUtils;public class Send { private static final String QUEUE_NAME ="test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //获取连接 Connection conn = ConnectionUtils.getConnection(); //获取Channel Channel channel= conn.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); //每个消费者发送确认消息之前,消费队列不发送下一个消息到消费者,一个只处理一个消息 //限制发送给同一个消费者不得超过一个消息 int prefetchCount = 1; channel.basicQos(prefetchCount ); //连续发送50个消息 for(int i = 1; i<=50;i++){ String msg = "work" + i; channel.basicPublish("",QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i + 50 ); } channel.close(); conn.close(); }}
消费者1:
import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive { private static final String QUEUE_NAME ="test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //声明队列 channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null); //保证一次只分发一次 int prefetchCount = 1; channel.basicQos(prefetchCount ); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者1接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消费者1处理完成!"); //手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 //自动应答false boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); }}
消费者2:
import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class WorkReceive2 { private static final String QUEUE_NAME ="test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //声明队列 channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null); int prefetchCount = 1; channel.basicQos(prefetchCount ); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者2接收到的消息" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消费者2处理完成!"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); }}
消费者1:
消费者1接收到的消息work1消费者1处理完成!消费者1接收到的消息work4消费者1处理完成!消费者1接收到的消息work6消费者1处理完成!消费者1接收到的消息work9消费者1处理完成!消费者1接收到的消息work11消费者1处理完成!消费者1接收到的消息work14消费者1处理完成!消费者1接收到的消息work16消费者1处理完成!消费者1接收到的消息work19消费者1处理完成!消费者1接收到的消息work21消费者1处理完成!消费者1接收到的消息work24消费者1处理完成!消费者1接收到的消息work26消费者1处理完成!消费者1接收到的消息work29消费者1处理完成!消费者1接收到的消息work31消费者1处理完成!消费者1接收到的消息work34消费者1处理完成!消费者1接收到的消息work36消费者1处理完成!消费者1接收到的消息work39消费者1处理完成!消费者1接收到的消息work41消费者1处理完成!消费者1接收到的消息work44消费者1处理完成!消费者1接收到的消息work46消费者1处理完成!消费者1接收到的消息work49消费者1处理完成!
消费者2:
消费者2接收到的消息work2消费者2处理完成!消费者2接收到的消息work3消费者2处理完成!消费者2接收到的消息work5消费者2处理完成!消费者2接收到的消息work7消费者2处理完成!消费者2接收到的消息work8消费者2处理完成!消费者2接收到的消息work10消费者2处理完成!消费者2接收到的消息work12消费者2处理完成!消费者2接收到的消息work13消费者2处理完成!消费者2接收到的消息work15消费者2处理完成!消费者2接收到的消息work17消费者2处理完成!消费者2接收到的消息work18消费者2处理完成!消费者2接收到的消息work20消费者2处理完成!消费者2接收到的消息work22消费者2处理完成!消费者2接收到的消息work23消费者2处理完成!消费者2接收到的消息work25消费者2处理完成!消费者2接收到的消息work27消费者2处理完成!消费者2接收到的消息work28消费者2处理完成!消费者2接收到的消息work30消费者2处理完成!消费者2接收到的消息work32消费者2处理完成!消费者2接收到的消息work33消费者2处理完成!消费者2接收到的消息work35消费者2处理完成!消费者2接收到的消息work37消费者2处理完成!消费者2接收到的消息work38消费者2处理完成!消费者2接收到的消息work40消费者2处理完成!消费者2接收到的消息work42消费者2处理完成!消费者2接收到的消息work43消费者2处理完成!消费者2接收到的消息work45消费者2处理完成!消费者2接收到的消息work47消费者2处理完成!消费者2接收到的消息work48消费者2处理完成!消费者2接收到的消息work50消费者2处理完成!
这时候现象就是消费者 1 速度小于消费者 2
体现一句话:能者多劳