ConcurrentHashMap高级应用

​ 一直以来想做的一个功能是,对用户调用API的次数做限制,由于调用很频繁,读写次数基本上相等,MySQL这种数据库肯定不能解决。

​ 接触JAVA之前,我的第一个解决方案是对HashMap的一串操作进行加锁,用JAVA表示就是

代码

/** username-> invokeCount */

private static HashMap<StringIntegermap new HashMap<>();

private final static int MAX_COUNT 1000;

/** Inc invokeCount */

public static boolean increment(String username){

    Integer cnt 1;

    synchronized (map){

        // 由于HashMap支持null值,所以get返回的结果不能代表不存在

        if (map.containsKey(username)){

            cnt map.get(username);

            cnt++;

        }

        map.put(username, cnt);

    }

    return cnt MAX_COUNT;

}

上面这种方案是最low的方案,直到接触了JAVA的ConcurrentHashMap类,听说是支持高并发的HashMap

​ 于是乎王者操作一顿分析发现根本问题是保证一串代码的原子锁必须加锁...

​ 最后分析ConcurrentHashMap源码时无意中发现了

  • java.util.concurrent.ConcurrentHashMap#computeIfAbsent
  • java.util.concurrent.ConcurrentHashMap#computeIfPresent
  • java.util.concurrent.ConcurrentHashMap#compute
  • java.util.concurrent.ConcurrentHashMap#merge

看上去就不一样,实际上HashMap也有这系列方法,但是由于如果使用HashMap的话,锁的就是整个HashMap跟之前情况是一样。

不过ConcurrentHashMap不一样,锁的粒度低。下面分析这3个方法对上面代码的优化

computeIfAbsent

代码

/**

 * If the specified key is not already associated with a value,

 * attempts to compute its value using the given mapping function

 * and enters it into this map unless {@code null}.  The entire

 * method invocation is performed atomically, so the function is

 * applied at most once per key.  Some attempted update operations

 * on this map by other threads may be blocked while computation

 * is in progress, so the computation should be short and simple,

 * and must not attempt to update any other mappings of this map.

 *

 * @param key key with which the specified value is to be associated

 * @param mappingFunction the function to compute a value

 * @return the current (existing or computed) value associated with

 *         the specified key, or null if the computed value is null

 * @throws NullPointerException if the specified key or mappingFunction

 *         is null

 * @throws IllegalStateException if the computation detectably

 *         attempts a recursive update to this map that would

 *         otherwise never complete

 * @throws RuntimeException or Error if the mappingFunction does so,

 *         in which case the mapping is left unestablished

 */

public computeIfAbsent(keyFunction<? super Kextends VmappingFunction)

从方法名推断这个函数的意思是:key不存在时的计算,从注释上可以得知:

  • 当map中没有这个key的映射时,尝试通过mappingFunction计算key的val并添加到map。
  • mappingFunctionapply方法执行是原子性的,所以每个key最多执行一次,因为添加到map了就不会重复执行了
  • 其他线程尝试更新map时可能会阻塞,当其他线程的更新和这个bucket冲突就会同步锁阻塞
  • 这个mappingFunctionapply函数运算耗时应该尽可能短和简单
  • 不能在mappingFunctionapply函数里更新map中的其他映射
  • 返回计算结果

computeIfPresent

代码

/**

 * If the value for the specified key is present, attempts to

 * compute a new mapping given the key and its current mapped

 * value.  The entire method invocation is performed atomically.

 * Some attempted update operations on this map by other threads

 * may be blocked while computation is in progress, so the

 * computation should be short and simple, and must not attempt to

 * update any other mappings of this map.

 *

 * @param key key with which a value may be associated

 * @param remappingFunction the function to compute a value

 * @return the new value associated with the specified key, or null if none

 * @throws NullPointerException if the specified key or remappingFunction

 *         is null

 * @throws IllegalStateException if the computation detectably

 *         attempts a recursive update to this map that would

 *         otherwise never complete

 * @throws RuntimeException or Error if the remappingFunction does so,

 *         in which case the mapping is unchanged

 */

public computeIfPresent(keyBiFunction<? super Ksuper Vextends VremappingFunction)

从方法名推断这个函数的意思是:key存在时的计算,从注释上可以得知:

  • 当map中有这个key的映射时,尝试通过remappingFunction计算key的val并更新
  • remappingFunctionapply方法执行是原子性的,所以每个key最多执行一次,因为添加到map了就不会重复执行了
  • 其他线程尝试更新map时可能会阻塞,当其他线程的更新和这个bucket冲突就会同步锁阻塞
  • 这个remappingFunctionapply函数运算耗时应该尽可能短和简单
  • 不能在remappingFunctionapply函数里更新map中的其他映射
  • 返回计算结果

compute

代码

/**

 * Attempts to compute a mapping for the specified key and its

 * current mapped value (or {@code null} if there is no current

 * mapping). The entire method invocation is performed atomically.

 * Some attempted update operations on this map by other threads

 * may be blocked while computation is in progress, so the

 * computation should be short and simple, and must not attempt to

 * update any other mappings of this Map.

 *

 * @param key key with which the specified value is to be associated

 * @param remappingFunction the function to compute a value

 * @return the new value associated with the specified key, or null if none

 * @throws NullPointerException if the specified key or remappingFunction

 *         is null

 * @throws IllegalStateException if the computation detectably

 *         attempts a recursive update to this map that would

 *         otherwise never complete

 * @throws RuntimeException or Error if the remappingFunction does so,

 *         in which case the mapping is unchanged

 */

public compute(key,

                 BiFunction<? super Ksuper Vextends VremappingFunction)

从方法名和以上2和方法推断这个方法的意思是:任何时候的计算,从注释上可以得知:

  • 不管key存在不存在都会执行remappingFunction的apply方法计算新的val
  • remappingFunctionapply方法执行是原子性的,所以每个key最多执行一次,因为添加到map了就不会重复执行了
  • 其他线程尝试更新map时可能会阻塞,当其他线程的更新和这个bucket冲突就会同步锁阻塞
  • 这个remappingFunctionapply函数运算耗时应该尽可能短和简单
  • 不能在remappingFunctionapply函数里更新map中的其他映射
  • 返回计算结果

compute关键源码如下

代码

for (Node<K,Vf, pred null;; ++binCount) {

    ek;

    if (e.hash == &&

        ((ek e.key== key ||

         (ek != null && key.equals(ek)))) {

        val remappingFunction.apply(key, e.val);

        if (val != null)

            e.val val;

        else {

            delta = -1;

            Node<K,Ven e.next;

            if (pred != null)

                pred.next en;

            else

                setTabAt(tab, ien);

        }

        break;

    }

    pred e;

    if ((e e.next) == null) {

        val remappingFunction.apply(keynull);

        if (val != null) {

            if (pred.next != null)

                throw new IllegalStateException("Recursive update");

            delta 1;

            pred.next new Node<K,V>(hkey, val);

        }

        break;

    }

}

merge

代码

/**

 * If the specified key is not already associated with a

 * (non-null) value, associates it with the given value.

 * Otherwise, replaces the value with the results of the given

 * remapping function, or removes if {@code null}. The entire

 * method invocation is performed atomically.  Some attempted

 * update operations on this map by other threads may be blocked

 * while computation is in progress, so the computation should be

 * short and simple, and must not attempt to update any other

 * mappings of this Map.

 *

 * @param key key with which the specified value is to be associated

 * @param value the value to use if absent

 * @param remappingFunction the function to recompute a value if present

 * @return the new value associated with the specified key, or null if none

 * @throws NullPointerException if the specified key or the

 *         remappingFunction is null

 * @throws RuntimeException or Error if the remappingFunction does so,

 *         in which case the mapping is unchanged

 */

public merge(keyvalueBiFunction<? super Vsuper Vextends VremappingFunction)

从方法名和以上2和方法推断这个方法的意思是:合并,从注释上可以得知:

  • 如果key不存在,往链表尾部插入key-value
  • 如果key存在
    • 计算结果val为null则删除
    • 否则key和计算结果插入
  • 其他线程尝试更新map时可能会阻塞,当其他线程的更新和这个bucket冲突就会同步锁阻塞
  • 这个remappingFunctionapply函数运算耗时应该尽可能短和简单
  • 不能在remappingFunctionapply函数里更新map中的其他映射
  • 返回插入的值

merge关键源码如下

代码

for (Node<K,Vf, pred null;; ++binCount) {

    ek;

    if (e.hash == &&

        ((ek e.key== key ||

         (ek != null && key.equals(ek)))) {

        val remappingFunction.apply(e.val, value);

        if (val != null)

            e.val val;

        else {

            delta = -1;

            Node<K,Ven e.next;

            if (pred != null)

                pred.next en;

            else

                setTabAt(tab, ien);

        }

        break;

    }

    pred e;

    if ((e e.next) == null) {

        delta 1;

        val value;

        pred.next new Node<K,V>(hkey, val);

        break;

    }

}

compute实现

代码

/** username-> invokeCount*/

private static ConcurrentHashMap<StringIntegermap new ConcurrentHashMap<>();

private final static int MAX_COUNT 1000;

/** Inc invokeCount */

public static boolean increment(String username){

    Integer cnt map.compute(username, (keyval) -> val == null : ++val);

    return cnt MAX_COUNT;

}

merge实现

代码

/** username-> invokeCount*/

private static ConcurrentHashMap<StringIntegermap new ConcurrentHashMap<>();

private final static int MAX_COUNT 1000;

/** Inc invokeCount */

public static boolean increment(String username){

    Integer cnt map.merge(username1, (mapValthisVal) -> ++mapVal);

    return cnt MAX_COUNT;

}

测试代码

代码

public static void main(String[] argsthrows InterruptedException {

    int cnt 100;

    Thread[] ths new Thread[cnt];

    for (int 0; i 100; i++) {

        ths[inew Thread(()->{

            increment("lyp");

        });

        ths[i].start();

    }

    for (int 0; i cnt; i++) {

        ths[i].join();

    }

    System.out.println(map);

}