22-并发集合

nobility 发布于 2021-06-13 1004 次阅读


并发集合

相对于同步容器:锁的粒度较大,多个线程不能同时执行同步集合对象中的方法,性能较差

  • VectorHashtable:方法都是使用synchronized进行修饰
  • Collections.synchronizedXXX()方法可使得非线程安全的集合变成线程安全的:只不过是使用同步代码块儿方式对集合方法进行了封装

ConcurrentHashMap/SkipListMap

JKD1.7和JKD1.8中的HashMap

JDK1.7是数组+链表

JDK1.8是数组+链表+红黑树结构,当某个槽已经有8个节点时就会转为红黑树,之所以该阈值设置为8是从该类源码的注释中可以查到,大致思路是,红黑树的节点占用空间是链表节点的两倍(链表只有指向下一个节点的指针,二叉树则需要左右指针),当数据量并不多时红黑树数据结构性能优势并不大,作者进行了泊松分布的函数计算得知想要哈希冲突达到8个的概率只有千万分之一,所以为了这种极端情况下提高查询效率就选择了8作为阈值

HashMap

HashMap线程不安全的体现

  • 同时put碰撞导致数据丢失:多个线程同时put计算等hash值是一样key会放到同一个位置,所以先放入的线程数据会被覆盖
  • 同时put扩容导致数据丢失:多个线程同时对哈希表进行扩容,扩容之后的哈希表也会只最后内个线程扩容保留下来
  • 死循环造成CPU100%(主要存在JDK1.7中):多个线程在扩容时,可能会造成循环链表,也就是两个节点相互指向对方,造成死循环

JKD1.7和JKD1.8中的ConcurrentHashMap

JDK1.7中最外层是多个段(每个段的底层数据结构与HashMap类似,仍然是数组+链表的拉链法),每个段独立上ReentrantLock锁,段与段之间互不影响,提高了并发效率,默认有16个段(默认值可在初始化时设置,一旦设置就无法更改,因为是段是无法扩容的),也就是说最多可同时支持16个线程并发写,前提是操作分布在不同的段上

JDK1.8中沿用了与它同时期的HashMap版本的数据结构,为了做到并发采用锁住node来实现减小锁粒度提高并发度,利用CAS算法部分的代替了锁;可从源码中putValue()方法得知(put()方法中调用了putValue()),具体如下:

  1. 判断key和value是否为空,若有一个为空就抛出异常
  2. 计算hash值,准备根据槽的类型进行赋值(循环进行下面操作,直到放入成功)
    • 若未进行初始化就进行初始化
    • 若此位置为空,直接使用CAS操作放入,若未放入成功,判断是否为MOVED状态(正在扩容)就帮助进行扩容和转义的工作,调用helpTransfer()方法
    • 若此处位置有内容,就使用同步代码块(锁为当前node)的方式进行增长链表或向红黑树添加节点
  3. 若满足阈值就转为红黑树
  4. 返回旧的值

ConcurrentHashMap

组合操作

使用ConcurrentHashMap中的方法时只能保证单个方法是线程安全的,组合使用多个方法需要使用同步机制才能保证线程安全,但是这就违背了ConcurrentHashMap对并发的性能提升,所以ConcurrentHashMap就提供了组合操作方法

方法名 描述
public boolean replace(K key, V oldValue, V newValue) 若当前key的值的是oldValue旧的值,则以原子方式将该值设置为newValue新值,并返回是否更新成功
public V putIfAbsent(K key, V value) 若当前key的值为空,才将值进行设置成value,则返回null;若key的值存在,不设置,并且返回这个key对应的value

CopyOnWriteArrayList/Set

读操作无需加锁,而且在写操作时也会阻塞读操作,只有写和写之间需要加锁;所以适用于对读操作多且要求快,写操作少且相对慢的场景

实现原理

相对于VectorSynchronizedListArrayList来说,允许在迭代中修改但会读到过期数据

public static void main(String[] args) {
  //ArrayList<String> list = new ArrayList<>(new Integer[]{1, 2, 3});  //不允许迭代修改,会抛出异常
  CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3});  //允许迭代修改,但是读的是过期数据
  Iterator<Integer> iterator = list.iterator();
  while (iterator.hasNext()) {
    System.out.println(list);
    Integer next = iterator.next();
    System.out.println(next);
    if (next.equals(2)) {
      list.remove(2);
    }
    if (next.equals(1)) {
      list.add(100);
    }
  }
}

创建副本,读写分离:在写入操作时,并不是直接写入,而是将容器拷贝一份副本,在副本容器中进行写入操作,写入操作结束后再将容器引用指向这个已经被修改了的副本,这样读操作就可以不加锁,从而提高并发效率

旧容器内容不可变:由于修改的是副本容器,所以读操作可能读到过期的数据,迭代器在迭代时的数据取决于迭代器的诞生时间

public static void main(String[] args) {
  CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3, 4});
  System.out.println(list);  //[1, 2, 3, 4]
  Iterator<Integer> itr1 = list.iterator();  //初始化后获得的迭代器
  list.remove(3);  //删除一个元素
  System.out.println(list);  //[1, 2, 4]
  Iterator<Integer> itr2 = list.iterator();  //删除一个元素后获得的迭代器

  itr1.forEachRemaining(System.out::print);  //1234
  System.out.println();
  itr2.forEachRemaining(System.out::print);  //123
}

缺点

  • 数据不一致问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性,所以若想写入数据,马上就能读到,该容器是无法做到的
  • 内存占用问题:因为CopyOnWrite的写是复制机制,所以进行写操作时,内存中会同时有两个相同对象的内存

并发Queue

BlockingQueue

阻塞队列是具有,调用阻塞方法可让调用方线程进入阻塞状态的队列,通常阻塞队列的一端给生产者生产数据使用,另一端给消费者消费数据用,阻塞队列是线程安全的,所以生产者和消费者也可以是多个线程

操作 阻塞方法 失败抛出异常 失败返回值
队尾增加元素 void put(E e) boolean add(E e) boolean offer(E e)
队头删除元素 E take() E remove() E poll()
查看队头 E element() E peek()
  • ArrayBlockingQueue:有界,创建时需指定容量;而且还可以指定是否公平,默认情况下是不公平的,若设置公平那么等待时间最长的线程就会别优先处理,不过这会同时带来一定性能损耗
  • LinkedBlockingQueue:无界;内部结构使用两把锁对应put()take()方法,所以并发性能比较高
  • PriorityBlockingQueue:无界;当容量不够时会自动扩容;支持优先级,所以队列中的元素需要是可比较的
  • SynchronousQueue:容量为0;线程间的直接传递,不做存储,所以效率很高
  • DelayQueue:无界;根据延迟时间排序,需要元素实现Deleayed接口规定排序接口

ConcurrentLinkedQueue

非阻塞队列,不具备阻塞功能,无界,利用CAS非阻塞算法来实现线程安全,适用于对性能要求较高的并发场景

此作者没有提供个人介绍
最后更新于 2021-06-13