多线程与高并发(三)

多线程与高并发(三)

多线程场景下的容器

在Java日常开发中经常使用到容器. 下面就看看多线程场景下容器的选择

容器的选择

Map容器

一个场景: 100个线程, 每个线程向容器中添加10w条数据, 比较Hashtable,Collections和ConcurrentHashMap的时间.

public class Test01 {

    private static final Integer RUN_TIMES = 100000;
    private static final Integer THREAD_NUM = 100;

    public static void main(String[] args) {
        UUID[] keyArr = new UUID[THREAD_NUM * RUN_TIMES];
        UUID[] valueArr = new UUID[THREAD_NUM * RUN_TIMES];
        for (int i = 0; i < THREAD_NUM * RUN_TIMES; i++) {
            keyArr[i] = UUID.randomUUID();
            valueArr[i] = UUID.randomUUID();
        }

        testHashTableAdd(keyArr, valueArr);
        testCollectionsAdd(keyArr, valueArr);
        testConcurrentHashMapAdd(keyArr, valueArr);

    }


    public static void testHashTableAdd(UUID[] keyArr, UUID[] valueArr){
        long startTime = System.currentTimeMillis();
        Hashtable<UUID, UUID> hashtable = new Hashtable<>();
        Thread[] threads = new Thread[THREAD_NUM];
        for (int i = 0; i < THREAD_NUM; i++) {
            int startNum = i * RUN_TIMES;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < RUN_TIMES; j++) {
                        hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
                    }
                }
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        System.out.println("HashTable 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
    }

    public static void testCollectionsAdd(UUID[] keyArr, UUID[] valueArr){
        long startTime = System.currentTimeMillis();
        Map<UUID, UUID> hashMap = new HashMap<>();
        Map<UUID, UUID> hashtable = Collections.synchronizedMap(hashMap);
        Thread[] threads = new Thread[THREAD_NUM];
        for (int i = 0; i < THREAD_NUM; i++) {
            int startNum = i * RUN_TIMES;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < RUN_TIMES; j++) {
                        hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
                    }
                }
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Collections 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
    }

    public static void testConcurrentHashMapAdd(UUID[] keyArr, UUID[] valueArr){
        long startTime = System.currentTimeMillis();
        Map<UUID, UUID> hashtable = new ConcurrentHashMap<>();
        Thread[] threads = new Thread[THREAD_NUM];
        for (int i = 0; i < THREAD_NUM; i++) {
            int startNum = i * RUN_TIMES;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < RUN_TIMES; j++) {
                        hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
                    }
                }
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("ConcurrentHashMap 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));
    }
}

输出结果:

HashTable 添加完毕, 时间 : 13056
Collections 添加完毕, 时间 : 9884
ConcurrentHashMap 添加完毕, 时间 : 55504

可以看到, 时间上来看是Collections最快, ConcurrentHashMap最慢, 而且慢的不止是一点.

为什么呢?

我们都知道ConcurrentHashMap是设计来给多线程场景下用的, 而且使用了分段锁机制, 为什么还没有直接Synchronized同步快呢?

一般情况下, 我们使用容器, 读的情况要比写的情况多, 所以我们再来看看读时的时间.


public class Test01 {

    private static final Integer RUN_TIMES = 100000;
    private static final Integer THREAD_NUM = 100;
    private static final Integer SEARCH_TIMES = 1000000;

    public static void main(String[] args) {
        UUID[] keyArr = new UUID[THREAD_NUM * RUN_TIMES];
        UUID[] valueArr = new UUID[THREAD_NUM * RUN_TIMES];
        for (int i = 0; i < THREAD_NUM * RUN_TIMES; i++) {
            keyArr[i] = UUID.randomUUID();
            valueArr[i] = UUID.randomUUID();
        }


        testHashTableAdd(keyArr, valueArr);
        testCollectionsAdd(keyArr, valueArr);
        testConcurrentHashMapAdd(keyArr, valueArr);

    }


    public static void testHashTableAdd(UUID[] keyArr, UUID[] valueArr){
        long startTime = System.currentTimeMillis();
        Hashtable<UUID, UUID> hashtable = new Hashtable<>();
        Thread[] threads = new Thread[THREAD_NUM];
        for (int i = 0; i < THREAD_NUM; i++) {
            int startNum = i * RUN_TIMES;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < RUN_TIMES; j++) {
                        hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
                    }
                }
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("HashTable 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));

        startTime = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(()->{
                for (int j = 0; j < SEARCH_TIMES; j++) {
                    hashtable.get(keyArr[10]);
                }
            });
        }
        System.out.println("HashTable 查询完毕, 时间 : " + (System.currentTimeMillis() - startTime));

    }

    public static void testCollectionsAdd(UUID[] keyArr, UUID[] valueArr){
        long startTime = System.currentTimeMillis();
        Map<UUID, UUID> hashMap = new HashMap<>();
        Map<UUID, UUID> hashtable = Collections.synchronizedMap(hashMap);
        Thread[] threads = new Thread[THREAD_NUM];
        for (int i = 0; i < THREAD_NUM; i++) {
            int startNum = i * RUN_TIMES;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < RUN_TIMES; j++) {
                        hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
                    }
                }
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Collections 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));

        startTime = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(()->{
                for (int j = 0; j < SEARCH_TIMES; j++) {
                    hashtable.get(keyArr[10]);
                }
            });
        }
        System.out.println("Collections 查询完毕, 时间 : " + (System.currentTimeMillis() - startTime));

    }

    public static void testConcurrentHashMapAdd(UUID[] keyArr, UUID[] valueArr){
        long startTime = System.currentTimeMillis();
        Map<UUID, UUID> hashtable = new ConcurrentHashMap<>();
        Thread[] threads = new Thread[THREAD_NUM];
        for (int i = 0; i < THREAD_NUM; i++) {
            int startNum = i * RUN_TIMES;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < RUN_TIMES; j++) {
                        hashtable.put(keyArr[startNum+j], valueArr[startNum+j]);
                    }
                }
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("ConcurrentHashMap 添加完毕, 时间 : " + (System.currentTimeMillis() - startTime));

        startTime = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(()->{
                for (int j = 0; j < SEARCH_TIMES; j++) {
                    hashtable.get(keyArr[10]);
                }
            });
        }
        System.out.println("ConcurrentHashMap 查询完毕, 时间 : " + (System.currentTimeMillis() - startTime));

    }
}

在添加完之后, 再开这么多线程每个线程去取keyArr中第10个元素, 取一千万次

HashTable 添加完毕, 时间 : 14196
HashTable 查询完毕, 时间 : 50732
Collections 添加完毕, 时间 : 9536
Collections 查询完毕, 时间 : 51183
ConcurrentHashMap 添加完毕, 时间 : 43753
ConcurrentHashMap 查询完毕, 时间 : 1592

可以看到, ConcurrentHashMap的长处在于查询.

Collection容器

场景: 一个容器中有10w条数据, 起100个线程每次去移除容器中第一个.

public class Test02 {

    private static final int LENGTH = 100000;
    private static final int THREAD_NUM = 100;

    public static void main(String[] args) {
        testVector();
        testQueue();
    }

    public static void testVector(){
        Vector<Integer> vector = new Vector<>();
        for (int i = 0; i < LENGTH; i++) {
            vector.add(i);
        }

        for (int i = 0; i < THREAD_NUM; i++) {
            new Thread(() -> {
                while (vector.size() > 0){
                    System.out.println(vector.remove(0));
                }
            }).start();
        }
    }

    public static void testQueue(){
        Queue<Integer> queue = new ConcurrentLinkedQueue<>();
        for (int i = 0; i < LENGTH; i++) {
            queue.add(i);
        }

        for (int i = 0; i < THREAD_NUM; i++) {
            new Thread(() -> {
                while (true){
                    Integer poll = queue.poll();
                    if (poll == null) {
                        break;
                    }else {
                        System.out.println(poll);
                    }
                }
            }).start();
        }
    }
}

testVector方法执行是会报错的, 因为vector.size() > 0所做的判断和vector.remove(0)虽然每个都是原子操作, 但是两步原子操作并没有合成一步原子操作, 会导致有的线程remove的时候容器中已经没有元素了.

而Queue中使用cas出队,本身就是一步原子操作.

同步Map

我们学过Map都知道, Map的实现有HashMap, LinkedHashMap和TreeMap.

其中, LinkedHashMap是HashMap加了一个双向链表保证插入顺序, TreeMap是使用红黑树使节点排序.

那么同步容器中为什么没有ConcurrentTreeMap呢?

如果同步的时候也想要排序怎么办.

JUC提供了ConcurrentSkipListMap.

他是使用跳表实现的, 因为使用cas实现树结构太复杂了.

同步List

CopyOnWriteList是JUC提供的同步列表.

CopyOnWrite的意思是写时复制.

基于实际情况下, 读的情况远大于写的情况.

当我们向数组中写数据的时候, 考虑用CopyOnWrite来提高效率.

CopyOnWrite在写的时候不加锁.

众所周知, Vector在写和读的时候都加锁,而CopyOnWrite读时候不加锁, 但是在写的时候会在原数组的基础上copy一个出来, 将添加的元素添加到最后一个.

在写完之后将容器的引用指向新的数组.

这样处理的好处是, 写的时候对读没有影响.

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

因此, CopyOnWriteList添加的效率是比较低的, 因为每次都要拷贝一份数组.

阻塞队列

BlockingQueue 阻塞队列

阻塞队列提供的方法可以使线程阻塞, 所以叫阻塞队列.

  1. LinkedBlockingQueue 无界阻塞队列
  2. ArrayBlockingQueue 有界阻塞队列
  3. DelayQueue 可以实现在时间上的排序
  4. SynchronousQueue 用来给线程之间传递内容的
  5. TransferQueue 是前面队列的组合

Queue中的方法

  • boolean offer(E e)

  • boolean add(E e)

  • E poll();

  • E peek();

offer()和add()都是向队列中添加元素. 区别在于, 当队列满了, offer()会返回false, 而add()会抛异常.

peek()方法是取队头元素, 并且不会remove掉.

poll()方法是取队头元素, 并且remove掉.

BlockingQueue

阻塞队列在队列的基础上加了put()和take()方法.

对应offer和poll.

但是offer和poll失败了就返回false, put和take如果失败了就会在这里等.

当然, offer也提供了等待一个时间的重载方法, 可以使线程等候一段时间.

DelayQueue

DelayQueue可以实现按照等待的时间进行排序.

向这个Queue中添加任务要求必须实现Delayed接口.

实现getDelay和compareTo方法, 在队列中等待越短的任务会优先得到运行.

一般用来按时间进行任务调度.

PriorityQueue

上面的DelayQueue本质上是一个PriorityQueue.

在他内部进行了一个排序.

PriorityQueue内部实现是使用了树.可以理解为堆排序中的小根堆.小的值排在上面.

SynchronousQueue

这个Queue的容量为0, 不是用来装东西的. 跟Exchanger很像, 是用来交换的.


public static void main(String[] args){
    SynchronousQueue<String> queue = new SynchronousQueue();
    new Thread(() -> {
        try {
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    try {
        queue.put("aaa");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("线程结束 queue.size()=" + queue.size());
}

当把子线程那部分代码注释掉的时候, 程序无法结束, 并且无任何输出.

如果使用add方法, 会直接报错

    public static void main(String[] args){
        SynchronousQueue<String> queue = new SynchronousQueue();
        queue.add("aaa");
        System.out.println("线程结束 queue.size()=" + queue.size());
    }

因为这个队列不是个容器, 不能装东西.


   转载规则


《多线程与高并发(三)》 echi1995 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
多线程与高并发(四) 多线程与高并发(四)
多线程与高并发(四)线程池Executor执行者, 有一个方法 execute(). ExecutorService继承自Executor, 也是一个接口. 除了execute()方法之外, 还完善了整个任务执行器的生命周期. shutdo
下一篇 
排序算法的总结 排序算法的总结
排序算法的总结各种排序算法的时间复杂度和空间复杂度的总结. 时间复杂度 额外空间复杂度 稳定性 选择排序 O(N^2) O(1) 无 冒泡排序 O(N^2) O(1) 有 插入排序 O(N^2) O(1) 有 归并
  目录