多线程与高并发(三)
多线程场景下的容器
在Java日常开发中经常使用到容器. 下面就看看多线程场景下容器的选择
容器的选择
Map容器
一个场景: 100个线程, 每个线程向容器中添加10w条数据, 比较Hashtable,Collections和ConcurrentHashMap的时间.
public class Test01 {
private static final Integer RUN_TIMES = 100000;
private static final Integer THREAD_NUM = 100;
public static void main(String[] args) {
UUID[] keyArr = new UUID[THREAD_NUM * RUN_TIMES];
UUID[] valueArr = new UUID[THREAD_NUM * RUN_TIMES];
for (int i = 0; i < THREAD_NUM * RUN_TIMES; i++) {
keyArr[i] = UUID.randomUUID();
valueArr[i] = UUID.randomUUID();
}
testHashTableAdd(keyArr, valueArr);
testCollectionsAdd(keyArr, valueArr);
testConcurrentHashMapAdd(keyArr, valueArr);
}
public static void testHashTableAdd(UUID[] keyArr, UUID[] valueArr){
long startTime = System.currentTimeMillis();
Hashtable<UUID, UUID> hashtable = new Hashtable<>();
Thread[] threads = new Thread[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i++) {
int startNum = i * RUN_TIMES;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < RUN_TIMES; j++) {
hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("HashTable 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
}
public static void testCollectionsAdd(UUID[] keyArr, UUID[] valueArr){
long startTime = System.currentTimeMillis();
Map<UUID, UUID> hashMap = new HashMap<>();
Map<UUID, UUID> hashtable = Collections.synchronizedMap(hashMap);
Thread[] threads = new Thread[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i++) {
int startNum = i * RUN_TIMES;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < RUN_TIMES; j++) {
hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Collections 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
}
public static void testConcurrentHashMapAdd(UUID[] keyArr, UUID[] valueArr){
long startTime = System.currentTimeMillis();
Map<UUID, UUID> hashtable = new ConcurrentHashMap<>();
Thread[] threads = new Thread[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i++) {
int startNum = i * RUN_TIMES;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < RUN_TIMES; j++) {
hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("ConcurrentHashMap 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
}
}
输出结果:
HashTable 添加完毕, 时间 : 13056
Collections 添加完毕, 时间 : 9884
ConcurrentHashMap 添加完毕, 时间 : 55504
可以看到, 时间上来看是Collections最快, ConcurrentHashMap最慢, 而且慢的不止是一点.
为什么呢?
我们都知道ConcurrentHashMap是设计来给多线程场景下用的, 而且使用了分段锁机制, 为什么还没有直接Synchronized同步快呢?
一般情况下, 我们使用容器, 读的情况要比写的情况多, 所以我们再来看看读时的时间.
public class Test01 {
private static final Integer RUN_TIMES = 100000;
private static final Integer THREAD_NUM = 100;
private static final Integer SEARCH_TIMES = 1000000;
public static void main(String[] args) {
UUID[] keyArr = new UUID[THREAD_NUM * RUN_TIMES];
UUID[] valueArr = new UUID[THREAD_NUM * RUN_TIMES];
for (int i = 0; i < THREAD_NUM * RUN_TIMES; i++) {
keyArr[i] = UUID.randomUUID();
valueArr[i] = UUID.randomUUID();
}
testHashTableAdd(keyArr, valueArr);
testCollectionsAdd(keyArr, valueArr);
testConcurrentHashMapAdd(keyArr, valueArr);
}
public static void testHashTableAdd(UUID[] keyArr, UUID[] valueArr){
long startTime = System.currentTimeMillis();
Hashtable<UUID, UUID> hashtable = new Hashtable<>();
Thread[] threads = new Thread[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i++) {
int startNum = i * RUN_TIMES;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < RUN_TIMES; j++) {
hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("HashTable 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < SEARCH_TIMES; j++) {
hashtable.get(keyArr[10]);
}
});
}
System.out.println("HashTable 查询完毕, 时间 : " + (System.currentTimeMillis() - startTime));
}
public static void testCollectionsAdd(UUID[] keyArr, UUID[] valueArr){
long startTime = System.currentTimeMillis();
Map<UUID, UUID> hashMap = new HashMap<>();
Map<UUID, UUID> hashtable = Collections.synchronizedMap(hashMap);
Thread[] threads = new Thread[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i++) {
int startNum = i * RUN_TIMES;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < RUN_TIMES; j++) {
hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Collections 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < SEARCH_TIMES; j++) {
hashtable.get(keyArr[10]);
}
});
}
System.out.println("Collections 查询完毕, 时间 : " + (System.currentTimeMillis() - startTime));
}
public static void testConcurrentHashMapAdd(UUID[] keyArr, UUID[] valueArr){
long startTime = System.currentTimeMillis();
Map<UUID, UUID> hashtable = new ConcurrentHashMap<>();
Thread[] threads = new Thread[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i++) {
int startNum = i * RUN_TIMES;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < RUN_TIMES; j++) {
hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("ConcurrentHashMap 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < SEARCH_TIMES; j++) {
hashtable.get(keyArr[10]);
}
});
}
System.out.println("ConcurrentHashMap 查询完毕, 时间 : " + (System.currentTimeMillis() - startTime));
}
}
在添加完之后, 再开这么多线程每个线程去取keyArr中第10个元素, 取一千万次
HashTable 添加完毕, 时间 : 14196
HashTable 查询完毕, 时间 : 50732
Collections 添加完毕, 时间 : 9536
Collections 查询完毕, 时间 : 51183
ConcurrentHashMap 添加完毕, 时间 : 43753
ConcurrentHashMap 查询完毕, 时间 : 1592
可以看到, ConcurrentHashMap的长处在于查询.
Collection容器
场景: 一个容器中有10w条数据, 起100个线程每次去移除容器中第一个.
public class Test02 {
private static final int LENGTH = 100000;
private static final int THREAD_NUM = 100;
public static void main(String[] args) {
testVector();
testQueue();
}
public static void testVector(){
Vector<Integer> vector = new Vector<>();
for (int i = 0; i < LENGTH; i++) {
vector.add(i);
}
for (int i = 0; i < THREAD_NUM; i++) {
new Thread(() -> {
while (vector.size() > 0){
System.out.println(vector.remove(0));
}
}).start();
}
}
public static void testQueue(){
Queue<Integer> queue = new ConcurrentLinkedQueue<>();
for (int i = 0; i < LENGTH; i++) {
queue.add(i);
}
for (int i = 0; i < THREAD_NUM; i++) {
new Thread(() -> {
while (true){
Integer poll = queue.poll();
if (poll == null) {
break;
}else {
System.out.println(poll);
}
}
}).start();
}
}
}
testVector方法执行是会报错的, 因为vector.size() > 0
所做的判断和vector.remove(0)
虽然每个都是原子操作, 但是两步原子操作并没有合成一步原子操作, 会导致有的线程remove的时候容器中已经没有元素了.
而Queue中使用cas出队,本身就是一步原子操作.
同步Map
我们学过Map都知道, Map的实现有HashMap, LinkedHashMap和TreeMap.
其中, LinkedHashMap是HashMap加了一个双向链表保证插入顺序, TreeMap是使用红黑树使节点排序.
那么同步容器中为什么没有ConcurrentTreeMap呢?
如果同步的时候也想要排序怎么办.
JUC提供了ConcurrentSkipListMap
.
他是使用跳表实现的, 因为使用cas实现树结构太复杂了.
同步List
CopyOnWriteList是JUC提供的同步列表.
CopyOnWrite的意思是写时复制.
基于实际情况下, 读的情况远大于写的情况.
当我们向数组中写数据的时候, 考虑用CopyOnWrite来提高效率.
CopyOnWrite在写的时候不加锁.
众所周知, Vector在写和读的时候都加锁,而CopyOnWrite读时候不加锁, 但是在写的时候会在原数组的基础上copy一个出来, 将添加的元素添加到最后一个.
在写完之后将容器的引用指向新的数组.
这样处理的好处是, 写的时候对读没有影响.
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
因此, CopyOnWriteList添加的效率是比较低的, 因为每次都要拷贝一份数组.
阻塞队列
BlockingQueue 阻塞队列
阻塞队列提供的方法可以使线程阻塞, 所以叫阻塞队列.
- LinkedBlockingQueue 无界阻塞队列
- ArrayBlockingQueue 有界阻塞队列
- DelayQueue 可以实现在时间上的排序
- SynchronousQueue 用来给线程之间传递内容的
- TransferQueue 是前面队列的组合
Queue中的方法
boolean offer(E e)
boolean add(E e)
E poll();
E peek();
offer()和add()都是向队列中添加元素. 区别在于, 当队列满了, offer()会返回false, 而add()会抛异常.
peek()方法是取队头元素, 并且不会remove掉.
poll()方法是取队头元素, 并且remove掉.
BlockingQueue
阻塞队列在队列的基础上加了put()和take()方法.
对应offer和poll.
但是offer和poll失败了就返回false, put和take如果失败了就会在这里等.
当然, offer也提供了等待一个时间的重载方法, 可以使线程等候一段时间.
DelayQueue
DelayQueue可以实现按照等待的时间进行排序.
向这个Queue中添加任务要求必须实现Delayed接口.
实现getDelay和compareTo方法, 在队列中等待越短的任务会优先得到运行.
一般用来按时间进行任务调度.
PriorityQueue
上面的DelayQueue本质上是一个PriorityQueue.
在他内部进行了一个排序.
PriorityQueue内部实现是使用了树.可以理解为堆排序中的小根堆.小的值排在上面.
SynchronousQueue
这个Queue的容量为0, 不是用来装东西的. 跟Exchanger很像, 是用来交换的.
public static void main(String[] args){
SynchronousQueue<String> queue = new SynchronousQueue();
new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
queue.put("aaa");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程结束 queue.size()=" + queue.size());
}
当把子线程那部分代码注释掉的时候, 程序无法结束, 并且无任何输出.
如果使用add方法, 会直接报错
public static void main(String[] args){
SynchronousQueue<String> queue = new SynchronousQueue();
queue.add("aaa");
System.out.println("线程结束 queue.size()=" + queue.size());
}
因为这个队列不是个容器, 不能装东西.