阻塞队列实现
BlockingQueue.java
- BlockingQueue.java
package com.gxx.record.utils;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 阻塞队列
* 0.size 上限大小
* 1.add 添加,达到上限大小,hold住
* 2.remove 获取,大小为0,hold住
* @author Gxx
*/
public class BlockingQueue {
/**
* 上限大小
*/
private int size = 5;
/**
* 内容
*/
private List<Object> list = new LinkedList<Object>();
/**
* 锁
*/
private Lock lock = new ReentrantLock();
/**
* 非满条件
*/
private Condition notFull = lock.newCondition();
/**
* 非空条件
*/
private Condition notEmpty = lock.newCondition();
/**
* 构造函数
*/
public BlockingQueue () {
}
/**
* 构造函数
* @param size
*/
public BlockingQueue (int size) {
this.size = size;
}
/**
* 添加,达到上限大小,hold住
* @param object
*/
public void add(Object object) {
lock.lock();
try {
if(size == list.size()) {
System.out.println(" [->添加]挂起,满了:共" + list.size() + "个");
notFull.await();
System.out.println(" [->添加]唤醒,不满了:共" + list.size() + "个");
}
list.add(object);
System.out.println(" [->添加]了1个,尝试唤醒:共" + list.size() + "个");
notEmpty.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 获取,大小为0,hold住
*/
public Object remove() {
Object object = null;
lock.lock();
try {
if(0 == list.size()) {
System.out.println("[<-获取]挂起,空了:共" + list.size() + "个");
notEmpty.await();
System.out.println("[<-获取]唤醒,不空了:共" + list.size() + "个");
}
object = list.remove(0);
System.out.println("[<-获取]删除了1个,尝试唤醒:共" + list.size() + "个");
notFull.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return object;
}
}
BlockingQueueTest.java
- BlockingQueueTest.java
package com.gxx.record.utils;
import org.apache.commons.lang3.RandomUtils;
/**
* 阻塞队列测试
* @author Gxx
*/
public class BlockingQueueTest {
/**
* 入口
* @param param
*/
public static void main(String[] param) {
/**
* 阻塞队列
*/
BlockingQueue blockingQueue = new BlockingQueue(3);
/**
* 并发线程
*/
Thread addTread1 = new AddThread(blockingQueue);
Thread addTread2 = new AddThread(blockingQueue);
Thread addTread3 = new AddThread(blockingQueue);
Thread addTread4 = new AddThread(blockingQueue);
Thread removeThread1 = new RemoveThread(blockingQueue);
Thread removeThread2 = new RemoveThread(blockingQueue);
Thread removeThread3 = new RemoveThread(blockingQueue);
Thread removeThread4 = new RemoveThread(blockingQueue);
/**
* 启动线程
*/
addTread1.start();
addTread2.start();
addTread3.start();
addTread4.start();
removeThread1.start();
removeThread2.start();
removeThread3.start();
removeThread4.start();
}
}
/**
* 新增线程
* @author Gxx
*/
class AddThread extends Thread {
BlockingQueue blockingQueue;
public AddThread(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
blockingQueue.add("数字:[" + RandomUtils.nextInt(0, 100) + "]");
}
}
/**
* 获取线程
* @author Gxx
*/
class RemoveThread extends Thread {
BlockingQueue blockingQueue;
public RemoveThread(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
blockingQueue.remove();
}
}
输出
[<-获取]挂起,空了:共0个
[<-获取]挂起,空了:共0个
[<-获取]挂起,空了:共0个
[<-获取]挂起,空了:共0个
[->添加]了1个,尝试唤醒:共1个
[->添加]了1个,尝试唤醒:共2个
[->添加]了1个,尝试唤醒:共3个
[->添加]挂起,满了:共3个
[<-获取]唤醒,不空了:共3个
[<-获取]删除了1个,尝试唤醒:共2个
[<-获取]唤醒,不空了:共2个
[<-获取]删除了1个,尝试唤醒:共1个
[<-获取]唤醒,不空了:共1个
[<-获取]删除了1个,尝试唤醒:共0个
[->添加]唤醒,不满了:共0个
[->添加]了1个,尝试唤醒:共1个
[<-获取]唤醒,不空了:共1个
[<-获取]删除了1个,尝试唤醒:共0个