多生产者多消费者模拟需求:
1、创建100个订单生产者,每个生产者生产100条订单,总共会生产10000条订单,由3个消费者进行订单消费处理。
2、100个订单生产者全部创建完毕,再一起生产消费订单数据
1、定义事件
package com.ljq.multi;public class Order {private String id;// IDprivate String name;private double price;// 金额public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}}
2、生产者
package com.ljq.multi;import com.lmax.disruptor.RingBuffer;/*** 生产者* * @author Administrator**/
public class Producer {private final RingBuffer<Order> ringBuffer;public Producer(RingBuffer<Order> ringBuffer) {this.ringBuffer = ringBuffer;}/*** onData用来发布事件,每调用一次就发布一次事件 它的参数会用过事件传递给消费者*/public void onData(String data) {// 可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽long sequence = ringBuffer.next();try {// 用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)Order order = ringBuffer.get(sequence);// 获取要通过事件传递的业务数据order.setId(data);} finally {// 发布事件// 注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的// sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。ringBuffer.publish(sequence);}}}
3、消费者
package com.ljq.multi;import java.util.concurrent.atomic.AtomicInteger;import com.lmax.disruptor.WorkHandler;public class Consumer implements WorkHandler<Order>{private String consumerId;private static AtomicInteger count = new AtomicInteger(0);public Consumer(String consumerId){this.consumerId = consumerId;}@Overridepublic void onEvent(Order order) throws Exception {count.incrementAndGet();System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId() + ",count:" + getCount());}public int getCount(){return count.get();}}
4、生产者消费者启动类
package com.ljq.multi;import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;/*** disrunptor 多生产者多消费者* * @author Administrator**/
public class Main {public static void main(String[] args) throws Exception {//创建ringBufferRingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Order>() { @Override public Order newInstance() { return new Order(); } }, 1024 * 1024, new YieldingWaitStrategy());SequenceBarrier barriers = ringBuffer.newBarrier();//创建3个消费者消费订单数据Consumer[] consumers = new Consumer[3];for(int i = 0; i < consumers.length; i++){consumers[i] = new Consumer("c" + i);}WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new IntEventExceptionHandler(),consumers);ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); final AtomicInteger count = new AtomicInteger(0);final CountDownLatch latch = new CountDownLatch(1);//100个生产者for (int i = 0; i < 100; i++) { final Producer p = new Producer(ringBuffer); new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println(count.incrementAndGet());latch.await(); //等待,直到100个生产者创建完成,再一起生产消费订单数据} catch (InterruptedException e) {e.printStackTrace();}//每个生产者生产100条订单for(int j = 0; j < 100; j ++){p.onData(UUID.randomUUID().toString());}}}).start();} System.out.println("---------------开始生产-----------------");latch.countDown(); //生产者全部创建完毕,开始生产订单}static class IntEventExceptionHandler implements ExceptionHandler { public void handleEventException(Throwable ex, long sequence, Object event) {} public void handleOnStartException(Throwable ex) {} public void handleOnShutdownException(Throwable ex) {} }
}