原创

通过Redis缓存删除策略深入理解LRU

关于LRU

关于LRU算法,接触lru应该是在操作系统的设计与实现中,在页式管理中的页面替换算法其中一种方式就是使用lru。在淘汰某一页是,选择离当前时间最近的一段时间内,未被使用过的页先淘汰。该算法的出发点就是,如果某一页被访问了,着他有可能马上还要被访问。反过来说就是,若某一页长时间未被访问,那么他最近一段时间都不会再次被访问。

第二次接触是在阅读redis源码时注意到,redis针对过期键的删除策略中同样使用到了lru,不过redis提供了六种内存淘汰策略,其中两种volatile-lruallkeys-lru都是涉及到了lru算法。第一种是在redis中设置了过期时间的key中进行lru。第二种是在所有的key中进行lru方式的淘汰。以防止系统提供的内存被key沾满。

要完全实现LRU算法是很困难的,因为要找出最近最久未被使用,就要针对每一个数据进行记录,记录和计算时间,次数,而且每一次更新都要对记录进行及时的更改,需要花费巨大的开销。

正因如此实际使用中通常使用的都是近似LRU算法。

  1. 最不经常使用淘汰算法(LFU)
    该算法在需要进行淘汰时,先淘汰到当前为止被访问次数最少的那一项。次方法的实现只需要在数据中添加一项计数即可,被访问则计数加一,淘汰时计数清零。

  2. 最近没有使用淘汰算法(NUR)
    该算法在需要进行淘汰时,从所有页中选定一段时间内未被访问的数据项,在其中任选一项进行删除。该算法需要在数据中添加一个标志位,被访问标志位为1,未访问为0,定时对标志位清零。这样在淘汰时,只需要在所有为0的数据项中随机选取一个进行删除即可。

LRU在redis中的实现方式

我们来看一下redis是如何高效的使用lru的呢。

在redis中提供了六种内存淘汰策略,以下是官方文档中的解释:

# 在设置了过期时间的key中,使用近似的lru淘汰策略
# volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# 所有的key均使用近似的lru淘汰策略
# allkeys-lru -> Evict any key using approximated LRU.


# LFU 最不经常使用

# volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# 在设置了过期时间的key中,使用lfu淘汰策略

# allkeys-lfu -> Evict any key using approximated LFU.
# 所有的key均使用lfu淘汰策略



# volatile-random -> Remove a random key among the ones with an expire set.
# //在设置了过期时间的key中,使用随机淘汰策略

# allkeys-random -> Remove a random key, any key.
# //所有的key均使用随机淘汰策略

# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# //使用ttl淘汰策略

# noeviction -> Don't evict anything, just return an error on write operations .
# //不允许淘汰,在写操作发生,但内存不够时,将会返回错误
#
# LRU means Least Recently Used
# LFU means Least Frequently Used
#
# Both LRU, LFU and volatile-ttl are implemented using approximated randomized algorithms.
# 使用近似随机算法实现了LRU、LFU和可变TTL。

这其中我们主要讨论volatile-lru``volatile-lfu这两种策略。

先看Redis中对过期的key进行淘汰的策略,分为两种:

  1. 主动淘汰(以定时任务的方式定时清理过期的key,也叫做定期删除)
  2. 在redis源码server.cserverCron是一个周期函数,默认没100ms执行一次,主要工作是做一些redis的后台操作,例如持久化过程的文件生成与监测,客户端信息的打印...等等,其中还有两项就是我们本片文章要关注的,LRU全局时钟的更新和调用databasesCron调用。
  3. databasesCron中通过调用activeExpireCycle(相关源码)实现了redis的主动淘汰,逻辑实现:
    1. 遍历至多16个DB 。【由宏CRON_DBS_PER_CALL定义,默认为16】
    2. 对于没有过期key的db直接跳过。
    3. 随机挑选20个带过期时间的key。【由宏ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP定义,默认20】
    4. 如果key过期,则将key相关的内存释放,或者放入失效队列。
    5. 由于redis的单线程模型,所以不能一直阻塞这进行清理,对操作有超时时间限制,如果操作时间超过允许的限定时间,至多25ms。则此次淘汰操作结束返回,否则进入5。
    6. 如果该DB下,有超过5个key (ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4=5) 实际失效,则进入 2,否则选择下一个DB,再次进入2。
    7. 遍历完成,结束。

总结来说,该种策略就是每隔一段时间执行一次删除,并且通过限制删除操作的时长和频率来减少对cpu的占用,同时还能减少过期key的内存占用。难点在于对频率和超时时间的设置,太频繁活着时间太长,同样会引起cpu的浪费,相反,会引起内存的浪费,接下来我们主要分析以下被动淘汰,这也是redis中主要是用LRU的地方

2. 被动淘汰(发生在执行命令时,若处理后的内存仍然不足,将返回oom,也叫做惰性删除,也是redisLRU主要是用的地方)

  • 在redis源码server.cprocessCommand是用于处理客户端发送过来的指令的啊函数,redis在每次执行指令前都会进行内存优化,使用freeMemoryIfNeededAndSafe函数。
      if (server.maxmemory && !server.lua_timedout) {
              int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
          }
    
  • freeMemoryIfNeededAndSafe函数中通过调用freeMemoryIfNeeded(相关源码)进行内存的管理,他就好像一个过滤器,在命令执行前过滤掉过期的key,逻辑实现如下如下。
    1.获取redis server当前已经使用的内存mem_reported。
    2.如果mem_reported < server.maxmemory ,则返回ok。
    3.如果内存策略配置为noeviction,返回错误。
    4.如果是LRU策略,如果是VOLATILE的LRU,则每次从可失效的数据集中,每次随机采样maxmemory_samples(默认为5)个key,从中选取idletime最大的key进行淘汰。
    5.如果是ALLKEYS_LRU则从全局数据中进行采样,每次随机采样maxmemory_samples(默认为5)个key,并从中选择idletime最大的key进行淘汰。
    6.如果释放内存之后,还是超过了server.maxmemory,则继续淘汰,只到释放后剩下的内存小于server.maxmemory为止。

总结来说,该种策略是对cpu友好的,因为他只有在执行命令时才会进行过期删除,而且他只会针对命令相关的键进行操作,因此他不会在其他的键上浪费cpu时间。但是缺点就是无法及时的清理内存,造成过期键对内存空间的占用。

由此可见redis中使用的volatile-lru算法可以归结为前文中提到的最近没有使用淘汰算法(NUR),一段时间内未被访问我们可以将其视为key过期,淘汰时从这些过期的key中随机选择。而volatile-lfu则是最不经常使用淘汰算法(LFU),不需要使用全局时钟,只需要记录访问数即可。

Redis会基于server.maxmemory_samples配置选取固定数目的key,然后比较它们的lru访问时间,然后淘汰最近最久没有访问的key,maxmemory_samples的值越大,Redis的近似LRU算法就越接近于严格LRU算法,但是相应消耗也变高,对性能有一定影响,样本值默认为5。

使用双向链表+HashMap实现

lru的典型实现就是双向链表+散列表,双链表用于存储数据结点,并且它是按照结点最近被使用的时间来存储的。如果一个结点被访问了,我们有理由相信它在接下来的一段时间被访问的概率要大于其它结点。于是,我们把它放到双向链表的头部。当我们往双向链表里插入一个结点,我们也有可能很快就会使用到它,同样把它插入到头部。我们使用这种方式不断地调整着双向链表,链表尾部的结点自然也就是最近一段时间,最久没有使用到的结点。

package com.tools.java;
import java.util.HashMap;
import java.util.Objects;
public class LRUCache<K, V> {

    private CacheNode first;
    private CacheNode last;
    private Integer currentCacheSize;
    private Integer cacheCapacity;
    private HashMap<K, CacheNode> cache;

    public LRUCache(Integer cacheCapacity) {
        this.currentCacheSize = 0;
        this.cacheCapacity = cacheCapacity;
        this.cache = new HashMap<>(cacheCapacity);
    }

    /**
     * cache的put操作
     *
     * @param key
     * @param value
     */
    public void put(K key, V value) {
        CacheNode node = cache.get(key);
        if (node == null) {
            //判断容量是否等于当前的size,是的话将会移除最后一个
            if (Objects.equals(currentCacheSize, cacheCapacity)) {
                cache.remove(key);
                removeLast();
            }
            //创建一个新的缓存节点
            node = new CacheNode();
            node.key = key;
        }
        node.value = value;
        moveToFirst(node);
        cache.put(key, node);
        //重新计算cache的大小
        currentCacheSize = cache.size();
    }

    /**
     * 根据key去获取value
     *
     * @param key cache的key值
     * @return 返回cache中key对应的value
     */
    public Object get(K key) {
        CacheNode cacheNode = cache.get(key);
        if (cacheNode == null) {
            return null;
        }
        //将找到的cache节点进行前移操作,并返回value值
        moveToFirst(cacheNode);
        return cacheNode.value;
    }

    /**
     * 清空操作
     */
    public void clear() {
        cache.clear();
        first = null;
        last = null;
        cacheCapacity = 0;
    }

    /**
     * 根据key进行remove操作
     *
     * @param key
     * @return
     */
    public Object remove(K key) {
        CacheNode cacheNode = cache.get(key);
        if (cacheNode != null) {
            if (null != cacheNode.pre) {
                cacheNode.pre.next = cacheNode.next;
            }
            if (null != cacheNode.next) {
                cacheNode.next.pre = cacheNode.pre;
            }
            if (cacheNode == first) {
                first = cacheNode.next;
            }
            if (cacheNode == last) {
                last = cacheNode.pre;
            }
        }
        currentCacheSize--;
        return cache.remove(key);
    }

    /**
     * 将节点移动到最前
     *
     * @param node 被移动的cache节点
     */
    private void moveToFirst(CacheNode node) {
        if (first == node) {
            return;
        }
        if (node.next != null) {
            node.next.pre = node.pre;
        }
        if (node.pre != null) {
            node.pre.next = node.next;
        }
        if (node == last) {
            last = last.pre;
        }
        if (first == null || last == null) {
            first = last = node;
            return;
        }

        node.next = first;
        first.pre = node;
        first = node;
        first.pre = null;
    }

    /**
     * 移除最末尾的操作,在cache达到容量时进行的操作
     */
    private void removeLast() {
        if (last != null) {
            cache.remove(last.key);
            last = last.pre;
            if (last.pre == null) {
                first = null;
            }
            last.next = null;
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        CacheNode node = first;
        while (node != null) {
            sb.append(String.format("%s:%s ", node.key, node.value));
            node = node.next;
        }

        return sb.toString();
    }

    /**
     * cache的节点内部类
     */
    class CacheNode {
        CacheNode next;
        CacheNode pre;
        Object key;
        Object value;

        public CacheNode() {
        }
    }

    public static void main(String[] args) {

        LRUCache<Integer, String> lru = new LRUCache<Integer, String>(3);

        lru.put(1, "a");    // 1:a
        System.out.println(lru.toString());
        lru.put(2, "b");    // 2:b 1:a
        System.out.println(lru.toString());
        lru.put(3, "c");    // 3:c 2:b 1:a
        System.out.println(lru.toString());
        lru.put(4, "d");    // 4:d 3:c 2:b
        System.out.println(lru.toString());
        lru.put(5, "e");    // 5:e 4:d 3:c
        System.out.println(lru.toString());
        lru.put(1, "aa");   // 1:aa 4:d 3:c
        System.out.println(lru.toString());
        lru.put(2, "bb");   // 2:bb 1:aa 4:d
        System.out.println(lru.toString());
        lru.put(5, "e");    // 5:e 2:bb 1:aa
        System.out.println(lru.toString());
        lru.get(1);         // 1:aa 5:e 2:bb
        System.out.println(lru.toString());
        lru.remove(11);     // 1:aa 5:e 2:bb
        System.out.println(lru.toString());
        lru.remove(1);      //5:e 2:bb
        System.out.println(lru.toString());
        lru.put(1, "aaa");  //1:aaa 5:e 2:bb
        System.out.println(lru.toString());
        lru.clear();
        System.out.println("-----"+lru.toString());
    }
}

总结

至此我们已经分析过了redis中lru的使用,在redis中为了均衡cpu时间和内存占用使用了近似lru的算法实现了过期键的删除,该方式可以说为redis这种内存数据库提供了良好的效率。文末我提供了一种lru的java实现。采用双向链表来实现元素(k-v键值对)的存储,同时采用hash表来存储相关的key与item的对应关系。这样,我们既能在O(1)的时间对key进行操作,同时又能利用Double LinkedList的添加和删除节点的便利性。

redis设计与实现
redis lru实现策略-zxszcaijin-ChinaUnix博客
封面 图片来自网络,侵权删除

sev7e0
Write by sev7e0
end
本文目录