package com.gxx.record.utils; import java.util.LinkedList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 阻塞队列 * 0.size 上限大小 * 1.add 添加,达到上限大小,hold住 * 2.remove 获取,大小为0,hold住 * @author Gxx */ public class BlockingQueue { /** * 上限大小 */ private int size = 5; /** * 内容 */ private List<Object> list = new LinkedList<Object>(); /** * 锁 */ private Lock lock = new ReentrantLock(); /** * 非满条件 */ private Condition notFull = lock.newCondition(); /** * 非空条件 */ private Condition notEmpty = lock.newCondition(); /** * 构造函数 */ public BlockingQueue () { } /** * 构造函数 * @param size */ public BlockingQueue (int size) { this.size = size; } /** * 添加,达到上限大小,hold住 * @param object */ public void add(Object object) { lock.lock(); try { if(size == list.size()) { System.out.println(" [->添加]挂起,满了:共" + list.size() + "个"); notFull.await(); System.out.println(" [->添加]唤醒,不满了:共" + list.size() + "个"); } list.add(object); System.out.println(" [->添加]了1个,尝试唤醒:共" + list.size() + "个"); notEmpty.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 获取,大小为0,hold住 */ public Object remove() { Object object = null; lock.lock(); try { if(0 == list.size()) { System.out.println("[<-获取]挂起,空了:共" + list.size() + "个"); notEmpty.await(); System.out.println("[<-获取]唤醒,不空了:共" + list.size() + "个"); } object = list.remove(0); System.out.println("[<-获取]删除了1个,尝试唤醒:共" + list.size() + "个"); notFull.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return object; } }
package com.gxx.record.utils; import org.apache.commons.lang3.RandomUtils; /** * 阻塞队列测试 * @author Gxx */ public class BlockingQueueTest { /** * 入口 * @param param */ public static void main(String[] param) { /** * 阻塞队列 */ BlockingQueue blockingQueue = new BlockingQueue(3); /** * 并发线程 */ Thread addTread1 = new AddThread(blockingQueue); Thread addTread2 = new AddThread(blockingQueue); Thread addTread3 = new AddThread(blockingQueue); Thread addTread4 = new AddThread(blockingQueue); Thread removeThread1 = new RemoveThread(blockingQueue); Thread removeThread2 = new RemoveThread(blockingQueue); Thread removeThread3 = new RemoveThread(blockingQueue); Thread removeThread4 = new RemoveThread(blockingQueue); /** * 启动线程 */ addTread1.start(); addTread2.start(); addTread3.start(); addTread4.start(); removeThread1.start(); removeThread2.start(); removeThread3.start(); removeThread4.start(); } } /** * 新增线程 * @author Gxx */ class AddThread extends Thread { BlockingQueue blockingQueue; public AddThread(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { blockingQueue.add("数字:[" + RandomUtils.nextInt(0, 100) + "]"); } } /** * 获取线程 * @author Gxx */ class RemoveThread extends Thread { BlockingQueue blockingQueue; public RemoveThread(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { blockingQueue.remove(); } }
[<-获取]挂起,空了:共0个 [<-获取]挂起,空了:共0个 [<-获取]挂起,空了:共0个 [<-获取]挂起,空了:共0个 [->添加]了1个,尝试唤醒:共1个 [->添加]了1个,尝试唤醒:共2个 [->添加]了1个,尝试唤醒:共3个 [->添加]挂起,满了:共3个 [<-获取]唤醒,不空了:共3个 [<-获取]删除了1个,尝试唤醒:共2个 [<-获取]唤醒,不空了:共2个 [<-获取]删除了1个,尝试唤醒:共1个 [<-获取]唤醒,不空了:共1个 [<-获取]删除了1个,尝试唤醒:共0个 [->添加]唤醒,不满了:共0个 [->添加]了1个,尝试唤醒:共1个 [<-获取]唤醒,不空了:共1个 [<-获取]删除了1个,尝试唤醒:共0个