Last updated on

Go从零实现 - 分布式缓存 - GeeCache

缓存淘汰策略、LRU 实现

存储是优先的,满了去掉谁

LFU(Least Frequently Used)

访问次数,去掉访问最少的

  • 内存占用大
  • 受已有模式影响多

LRU(Least Recently Used)

LRU 认为,如果数据最近被访问过,那么将来被访问的概率也会更高。

LRU 实现

image

整体逻辑并不复杂,包装 entry 是为了删除队尾的时候方便

delete(c.cache, kv.key)

package lru

import "container/list"

type Value interface {
	Len() int
}

type Cache struct {
	maxBytes int64
	nbytes   int64
	// 使用双向链表
	ll *list.List
	// 使用 map
	cache     map[string]*list.Element
	OnEvicted func(key string, value Value)
}

func New(maxBytes int64, onEvicted func(string2 string, value Value)) *Cache {
	return &Cache{
		maxBytes:  maxBytes,
		ll:        list.New(),
		cache:     make(map[string]*list.Element),
		OnEvicted: onEvicted,
	}
}

type entry struct {
	key   string
	value Value
}

func (c *Cache) Add(key string, value Value) {
	if ele, ok := c.cache[key]; ok {
		// LRU 调整顺序到头部 头部是最近使用的,尾部是最久未使用的
		c.ll.MoveToFront(ele)
		c.nbytes += int64(len(key)) + int64(value.Len())
		kv := ele.Value.(*entry)
		kv.value = value
	} else {
		// 新增
		ele := c.ll.PushFront(&entry{key, value})
		c.cache[key] = ele
		c.nbytes += int64(len(key)) + int64(value.Len())
	}
	for c.maxBytes != 0 && c.maxBytes < c.nbytes {
		c.RemoveOldest()
	}
}

func (c *Cache) RemoveOldest() {
	ele := c.ll.Back()

	if ele == nil {
		return
	}

	c.ll.Remove(ele)
	kv := ele.Value.(*entry)
	delete(c.cache, kv.key)
	c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
	if c.OnEvicted != nil {
		c.OnEvicted(kv.key, kv.value)
	}
}

func (c *Cache) Get(key string) (value Value, ok bool) {
	if ele, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ele)
		kv := ele.Value.(*entry)
		return kv.value, true
	}
	return
}

func (c *Cache) Len() int {
	return c.ll.Len()
}

单机并发缓存

为现有的 Cache 增加并发读取的能力

先看看使用层

func TestGet(t *testing.T) {
	loadCounts := make(map[string]int, len(db))
	gee := NewGroup("scores", 2<<10, GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				if _, ok := loadCounts[key]; !ok {
					loadCounts[key] = 0
				}
				loadCounts[key] += 1
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))

	for k, v := range db {
		if view, err := gee.Get(k); err != nil || view.String() != v {
			t.Fatal("failed to get value of Tom")
		} // load from callback function
		if _, err := gee.Get(k); err != nil || loadCounts[k] > 1 {
			t.Fatalf("cache %s miss", k)
		} // cache hit
	}

	if view, err := gee.Get("unknown"); err == nil {
		t.Fatalf("the value of unknow should be empty, but %s got", view)
	}
}

NewGroup 创建返回 gee,也就是缓存实例,创建的时候第三个参数是缓存未命中时的获取逻辑,获取到了,拿到的结果会加入缓存

NewGroup 的 核心 Get 实现

// A Getter loads data for a key.
type Getter interface {
	Get(key string) ([]byte, error)
}

// A GetterFunc implements Getter with a function.
type GetterFunc func(key string) ([]byte, error)

// Get implements Getter interface function
func (f GetterFunc) Get(key string) ([]byte, error) {
	return f(key)
}

// A Group is a cache namespace and associated data loaded spread over
type Group struct {
	name      string
	getter    Getter
	mainCache cache
}

// Get value for a key from cache
func (g *Group) Get(key string) (ByteView, error) {
	if key == "" {
		return ByteView{}, fmt.Errorf("key is required")
	}

	if v, ok := g.mainCache.get(key); ok {
		log.Println("[GeeCache] hit")
		return v, nil
	}

	return g.load(key)
}

func (g *Group) load(key string) (value ByteView, err error) {
	return g.getLocally(key)
}

func (g *Group) getLocally(key string) (ByteView, error) {
	bytes, err := g.getter.Get(key)
	if err != nil {
		return ByteView{}, err

	}
	value := ByteView{b: cloneBytes(bytes)}
	g.populateCache(key, value)
	return value, nil
}

func (g *Group) populateCache(key string, value ByteView) {
	g.mainCache.add(key, value)
}

Getter 这里接口型函数的设计可看

cache 的实现

package geecache

import (
	"geecache/lru"
	"sync"
)

type cache struct {
	mu         sync.Mutex
	lru        *lru.Cache
	cacheBytes int64
}

func (c *cache) add(key string, value ByteView) {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.lru == nil {
		c.lru = lru.New(c.cacheBytes, nil)
	}
	c.lru.Add(key, value)
}

func (c *cache) get(key string) (value ByteView, ok bool) {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.lru == nil {
		return
	}

	if v, ok := c.lru.Get(key); ok {
		return v.(ByteView), ok
	}

	return
}

这里为 lru 的读取套上了一层锁的机制

Note

多个协程(goroutine)同时读写同一个变量,在并发度较高的情况下,会发生冲突。确保一次只有一个协程(goroutine)可以访问该变量以避免冲突,这称之为互斥,互斥锁可以解决这个问题。

sync.Mutex 是一个互斥锁,可以由不同的协程加锁和解锁。

sync.Mutex  是 Go 语言标准库提供的一个互斥锁,当一个协程(goroutine)获得了这个锁的拥有权后,其它请求锁的协程(goroutine) 就会阻塞在  Lock()  方法的调用上,直到调用  Unlock()  锁被释放。

byte 是很通用的存储结构,抽象一层这个结构,并满足我们 Value 的接口要求

// Value use Len to count how many bytes it takes
type Value interface {
	Len() int
}

package geecache

// A ByteView holds an immutable view of bytes.
type ByteView struct {
	b []byte
}

// Len returns the view's length
func (v ByteView) Len() int {
	return len(v.b)
}

// ByteSlice returns a copy of the data as a byte slice.
func (v ByteView) ByteSlice() []byte {
	return cloneBytes(v.b)
}

// String returns the data as a string, making a copy if necessary.
func (v ByteView) String() string {
	return string(v.b)
}

func cloneBytes(b []byte) []byte {
	c := make([]byte, len(b))
	copy(c, b)
	return c
}

至此整体结构

geecache/
    |--lru/
        |--lru.go  // lru 缓存淘汰策略
    |--byteview.go // 缓存值的抽象与封装
    |--cache.go    // 并发控制
    |--geecache.go // 负责与外部交互,控制缓存存储和获取的主流程

至此,这一章节的单机并发缓存就已经完成了。

HTTP 服务端

Note

分布式缓存需要实现节点间通信,建立基于 HTTP 的通信机制是比较常见和简单的做法。如果一个节点启动了 HTTP 服务,那么这个节点就可以被其他节点访问。今天我们就为单机节点搭建 HTTP Server。

目标是可以通过 HTTP 协议访问到 上面实现的 geecache

使用层为

func main() {
	geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))

	addr := "localhost:9999"
	peers := geecache.NewHTTPPool(addr)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr, peers))
}

启动后可以通过 HTTP 进行访问

$ curl http://localhost:9999/_geecache/scores/Tom
630

$ curl http://localhost:9999/_geecache/scores/kkk
kkk not exist

URL 格式为 /<basepath>/<groupname>/<key> 通过 groupname 得到 group 实例,再使用  group.Get(key)  获取缓存数据。

peers := geecache.NewHTTPPool(addr)
http.ListenAndServe(addr, peers)

peers 需要满足 ListenAndServe 的接口要求

func ListenAndServe(addr string, handler Handler) error {
	server := &Server{Addr: addr, Handler: handler}
	return server.ListenAndServe()
}

type Handler interface {
	ServeHTTP(ResponseWriter, *Request)
}

因此造一个 HTTPPool 实现 ServeHTTP 接口就好

const defaultBasePath = "/_geecache/"

// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
	// this peer's base URL, e.g. "https://example.net:8000"
	self     string
	basePath string
}

// NewHTTPPool initializes an HTTP pool of peers.
func NewHTTPPool(self string) *HTTPPool {
	return &HTTPPool{
		self:     self,
		basePath: defaultBasePath,
	}
}

// Log info with server name
func (p *HTTPPool) Log(format string, v ...interface{}) {
	log.Printf("[Server %s] %s", p.self, fmt.Sprintf(format, v...))
}

// ServeHTTP handle all http requests
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if !strings.HasPrefix(r.URL.Path, p.basePath) {
		panic("HTTPPool serving unexpected path: " + r.URL.Path)
	}
	p.Log("%s %s", r.Method, r.URL.Path)
	// /<basepath>/<groupname>/<key> required
	parts := strings.SplitN(r.URL.Path[len(p.basePath):], "/", 2)
	if len(parts) != 2 {
		http.Error(w, "bad request", http.StatusBadRequest)
		return
	}

	groupName := parts[0]
	key := parts[1]

	group := GetGroup(groupName)
	if group == nil {
		http.Error(w, "no such group: "+groupName, http.StatusNotFound)
		return
	}

	view, err := group.Get(key)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/octet-stream")
	w.Write(view.ByteSlice())
}

接下来,运行 main 函数,使用 curl 做一些简单测试:

$ curl http://localhost:9999/_geecache/scores/Tom
630
$ curl http://localhost:9999/_geecache/scores/kkk
kkk not exist

GeeCache 的日志输出如下:

2020/02/11 23:28:39 geecache is running at localhost:9999
2020/02/11 23:29:08 [Server localhost:9999] GET /_geecache/scores/Tom
2020/02/11 23:29:08 [SlowDB] search key Tom
2020/02/11 23:29:16 [Server localhost:9999] GET /_geecache/scores/kkk
2020/02/11 23:29:16 [SlowDB] search key kkk

一致性哈希(hash)

场景

当我们有多个 Cache Server 的时候,一个 key 的请求过来,去哪个 Cache Server 来请求呢 需要满足以下条件

  1. 缓存命中率高
  2. 容易拓展

image

方案

方案 1 如下图 image 命中率高,算法简单,但是一旦 Cache Server 数量变化,现有缓存都无用了

方案 2 一致性哈希 如上图 image

并且为了解决分布不均匀的问题,增加虚拟节点 image

image

实现

package consistenthash

import (
	"hash/crc32"
	"sort"
	"strconv"
)

// Hash maps bytes to uint32
type Hash func(data []byte) uint32

// Map constains all hashed keys
type Map struct {
	hash     Hash
	replicas int
	keys     []int // Sorted
	hashMap  map[int]string
}

// New creates a Map instance
func New(replicas int, fn Hash) *Map {
	m := &Map{
		replicas: replicas,
		hash:     fn,
		hashMap:  make(map[int]string),
	}
	if m.hash == nil {
		m.hash = crc32.ChecksumIEEE
	}
	return m
}

// Add adds some keys to the hash.
func (m *Map) Add(keys ...string) {
	for _, key := range keys {
		for i := 0; i < m.replicas; i++ {
			hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
			m.keys = append(m.keys, hash)
			m.hashMap[hash] = key
		}
	}
	sort.Ints(m.keys)
}

// Get gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
	if len(m.keys) == 0 {
		return ""
	}

	hash := int(m.hash([]byte(key)))
	// Binary search for appropriate replica.
	idx := sort.Search(len(m.keys), func(i int) bool {
		return m.keys[i] >= hash
	})

	return m.hashMap[m.keys[idx%len(m.keys)]]
}

最后一行 m.keys[idx%len(m.keys)] 有些理解成本

Note

如果  idx == len(m.keys),说明应选择  m.keys[0],因为  m.keys  是一个环状结构,所以用取余数的方式来处理这种情况。

其他参考

小林 coding:什么是一致性哈希?

一致性哈希 - Consistent Hashing 是什么?为什么系统设计面试中经常会提到?10 分钟讲解一致性哈希 | 系统设计 System Design EP1

分布式节点

现有流程

现在的流程为


接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴
                |  否                         是
                |-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵
                            |  否
                            |-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶

1、3 已经在 单机并发缓存那里实现,本节主要来解决 2

使用一致性哈希选择节点        是                                    是
    |-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
                    |  否                                    ↓  否
                    |----------------------------> 回退到本地节点处理。

先看使用层

import (
	"flag"
	"fmt"
	"geecache"
	"log"
	"net/http"
	"strings"
)

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))
}

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(strings.TrimPrefix(addr, "http://"), peers))
}

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
			key := r.URL.Query().Get("key")
			view, err := gee.Get(key)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/octet-stream")
			w.Write(view.ByteSlice())

		}))
	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(strings.TrimPrefix(apiAddr, "http://"), nil))

}

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://0.0.0.0:9999"
	addrMap := map[int]string{
		8001: "http://0.0.0.0:8001",
		8002: "http://0.0.0.0:8002",
		8003: "http://0.0.0.0:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
}
#!/bin/bash
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &

wait

起了三个 Cache Server,端口分别在 8001 8002 8003 其中有个服务同时起了 Cache Server 和 对外暴露的 API 服务,这个 API 服务放在 9999 端口上

执行结果是

2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8001
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8002
2023/11/22 09:29:09 fontend server is running at http://0.0.0.0:9999
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8003
>>> start test
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [SlowDB] search key Tom
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
630630630

整个流程

如下图 image image image

上面这两个图可以结合着理解 一致性哈希在分布式节点的使用

其他

var _ PeerPicker = (*HTTPPool)(nil)  这行代码是用于验证  *HTTPPool  类型是否实现了  PeerPicker  接口的一种方式。

即利用强制类型转换,确保 struct HTTPPool 实现了接口 PeerPicker。这样 IDE 和编译期间就可以检查,而不是等到使用的时候。

参考这里

防止缓存击穿

背景

缓存雪崩:缓存在同一时刻全部失效,造成瞬时 DB 请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。

缓存击穿:一个存在的 key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时 DB 请求量大、压力骤增。

缓存穿透:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。

在分布式节点中有以下使用结果

#!/bin/bash
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &
curl "http://127.0.0.1:9999/api?key=Tom" &

wait
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8001
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8002
2023/11/22 09:29:09 fontend server is running at http://0.0.0.0:9999
2023/11/22 09:29:09 geecache is running at http://0.0.0.0:8003
>>> start test
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8003] Pick peer http://0.0.0.0:8002
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [SlowDB] search key Tom
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
2023/11/22 09:29:11 [Server http://0.0.0.0:8002] GET /_geecache/scores/Tom
2023/11/22 09:29:11 [GeeCache] hit
630630630

对着 api 请求了 三次,8003 向 8002 发了三次请求

本次来实现一个叫作 singleflight 的包来解决这个问题,做到只对远端进行一次请求

方案

从实现上来看,就是对向远端请求的那个流程加锁

func (g *Group) load(key string) (value ByteView, err error) {
	// each key is only fetched once (either locally or remotely)
	// regardless of the number of concurrent callers.
	viewi, err := g.loader.Do(key, func() (interface{}, error) {
		if g.peers != nil {
			if peer, ok := g.peers.PickPeer(key); ok {
				if value, err = g.getFromPeer(peer, key); err == nil {
					return value, nil
				}
				log.Println("[GeeCache] Failed to get from peer", err)
			}
		}

		return g.getLocally(key)
	})

	if err == nil {
		return viewi.(ByteView), nil
	}
	return
}

实现如下

package singleflight

import "sync"

// call is an in-flight or completed Do call
type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}

// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()

	return c.val, c.err
}

两个锁

  • mu 来处理对 group m 的操作
  • wg 来实现对一个 key 只执行一次函数,其余的等待这个函数请求的结果

参考

原文