用户工具

站点工具


分享:技术:队列:阻塞队列实现

阻塞队列实现

BlockingQueue.java

BlockingQueue.java
package com.gxx.record.utils;
 
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
/** 
 * 阻塞队列
 * 0.size 上限大小
 * 1.add 添加,达到上限大小,hold住
 * 2.remove 获取,大小为0,hold住
 * @author Gxx
 */
public class BlockingQueue {
	/**
	 * 上限大小
	 */
	private int size = 5;
	/**
	 * 内容
	 */
	private List<Object> list = new LinkedList<Object>();
	/**
	 * 锁
	 */
	private Lock lock = new ReentrantLock();
	/**
	 * 非满条件
	 */
	private Condition notFull = lock.newCondition();
	/**
	 * 非空条件
	 */
	private Condition notEmpty = lock.newCondition();
 
	/**
	 * 构造函数
	 */
	public BlockingQueue () {
	}
 
	/**
	 * 构造函数
	 * @param size
	 */
	public BlockingQueue (int size) {
		this.size = size;
	}
 
	/**
	 * 添加,达到上限大小,hold住
	 * @param object
	 */
	public void add(Object object) {
		lock.lock();
		try {
			if(size == list.size()) {
				System.out.println("    [->添加]挂起,满了:共" + list.size() + "个");
				notFull.await();
				System.out.println("    [->添加]唤醒,不满了:共" + list.size() + "个");
			}
			list.add(object);
			System.out.println("    [->添加]了1个,尝试唤醒:共" + list.size() + "个");
			notEmpty.signal();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
 
	/**
	 * 获取,大小为0,hold住
	 */
	public Object remove() {
		Object object = null;
		lock.lock();
		try {
			if(0 == list.size()) {
				System.out.println("[<-获取]挂起,空了:共" + list.size() + "个");
				notEmpty.await();
				System.out.println("[<-获取]唤醒,不空了:共" + list.size() + "个");
			}
			object = list.remove(0);
			System.out.println("[<-获取]删除了1个,尝试唤醒:共" + list.size() + "个");
			notFull.signal();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
		return object;
	}
}

BlockingQueueTest.java

BlockingQueueTest.java
package com.gxx.record.utils;
 
import org.apache.commons.lang3.RandomUtils;
 
/** 
 * 阻塞队列测试
 * @author Gxx
 */
public class BlockingQueueTest {
 
	/**
	 * 入口
	 * @param param
	 */
	public static void main(String[] param) {
		/**
		 * 阻塞队列
		 */
		BlockingQueue blockingQueue = new BlockingQueue(3);
 
		/**
		 * 并发线程
		 */
		Thread addTread1 = new AddThread(blockingQueue);
		Thread addTread2 = new AddThread(blockingQueue);
		Thread addTread3 = new AddThread(blockingQueue);
		Thread addTread4 = new AddThread(blockingQueue);
		Thread removeThread1 = new RemoveThread(blockingQueue);
		Thread removeThread2 = new RemoveThread(blockingQueue);
		Thread removeThread3 = new RemoveThread(blockingQueue);
		Thread removeThread4 = new RemoveThread(blockingQueue);
 
		/**
		 * 启动线程
		 */
		addTread1.start();
		addTread2.start();
		addTread3.start();
		addTread4.start();
		removeThread1.start();
		removeThread2.start();
		removeThread3.start();
		removeThread4.start();
	}
}
 
/**
 * 新增线程
 * @author Gxx
 */
class AddThread extends Thread {
 
	BlockingQueue blockingQueue;
 
	public AddThread(BlockingQueue blockingQueue) {
		this.blockingQueue = blockingQueue;
	}
 
	@Override
	public void run() {
		blockingQueue.add("数字:[" + RandomUtils.nextInt(0, 100) + "]");
	}
}
 
/**
 * 获取线程
 * @author Gxx
 */
class RemoveThread extends Thread {
 
	BlockingQueue blockingQueue;
 
	public RemoveThread(BlockingQueue blockingQueue) {
		this.blockingQueue = blockingQueue;
	}
 
	@Override
	public void run() {
		blockingQueue.remove();
	}
}

输出

[<-获取]挂起,空了:共0个
[<-获取]挂起,空了:共0个
[<-获取]挂起,空了:共0个
[<-获取]挂起,空了:共0个
    [->添加]了1个,尝试唤醒:共1个
    [->添加]了1个,尝试唤醒:共2个
    [->添加]了1个,尝试唤醒:共3个
    [->添加]挂起,满了:共3个
[<-获取]唤醒,不空了:共3个
[<-获取]删除了1个,尝试唤醒:共2个
[<-获取]唤醒,不空了:共2个
[<-获取]删除了1个,尝试唤醒:共1个
[<-获取]唤醒,不空了:共1个
[<-获取]删除了1个,尝试唤醒:共0个
    [->添加]唤醒,不满了:共0个
    [->添加]了1个,尝试唤醒:共1个
[<-获取]唤醒,不空了:共1个
[<-获取]删除了1个,尝试唤醒:共0个
分享/技术/队列/阻塞队列实现.txt · 最后更改: 2018/04/01 08:49 由 gxx