groupcache源码解读

Simple usage

gropucache的官方网站是 https://github.com/golang/groupcache

consistenthash模块

一致性hash算法,通常是用在查找一个合适的下载节点时,使负载更平均,但是对于同样的请求始终返回一样的结果。

type Map struct {
    hash     Hash
    replicas int
    keys     []int // Sorted
    hashMap  map[int]string
}

Map结构中replicas的含义是增加虚拟桶,使数据分布更加均匀。

// 创建Map结构
func New(replicas int,fn Hash) *Map
// 添加新的Key
func (m *Map) Add(keys ...string)
// 根据hash(key)获取value
func (m *Map) Get(key string) string
// 判断Map是否为空
func (m *Map) IsEmpty() bool
// 竟然没有提供Remove方法 O_O

用法简单例子

package main

import (
	"fmt"

	"github.com/golang/groupcache/consistenthash"
)

func main() {
	c := consistenthash.New(70,nil)
	c.Add("A","B","C","D","E")
	for _,key := range []string{"what","nice","what","good","yes!"} {
		fmt.Printf("%s -> %s\n",key,c.Get(key))
	}
}

// Expect output
// -------------
// what -> C
// nice -> A
// what -> C
// nice -> A
// good -> D
// yes! -> E

singleflight 模块

type Group struct {
}

// 当多个相同的key请求的时候,函数只被调用一次
func (g *Group) Do(key string,fn func() (interface{},error)) (interface{},error)

使用例子

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/golang/groupcache/singleflight"
)

func NewDelayReturn(dur time.Duration,n int) func() (interface{},error) {
	return func() (interface{},error) {
		time.Sleep(dur)
		return n,nil
	}
}

func main() {
	g := singleflight.Group{}
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		ret,err := g.Do("key",NewDelayReturn(time.Second*1,1))
		if err != nil {
			panic(err)
		}
		fmt.Printf("key-1 get %v\n",ret)
		wg.Done()
	}()
	go func() {
		time.Sleep(100 * time.Millisecond) // make sure this is call is later
		ret,NewDelayReturn(time.Second*2,2))
		if err != nil {
			panic(err)
		}
		fmt.Printf("key-2 get %v\n",ret)
		wg.Done()
	}()
	wg.Wait()
}

执行结果(耗时: 1.019s)

key-2 get 1
key-1 get 1

lru 模块

最初是用在内存管理上的一个算法,根据历史的请求数,分析出最热门的数据,并保存下来。

type Cache struct {
    // MaxEntries is the maximum number of cache entries before
    // an item is evicted. Zero means no limit.
    MaxEntries int

    // OnEvicted optionally specificies a callback function to be
    // executed when an entry is purged from the cache.
    OnEvicted func(key Key,value interface{})
    // contains filtered or unexported fields
}

func New(maxEntries int) *Cache
func (c *Cache) Add(key Key,value interface{})
func (c *Cache) Get(key Key) (value interface{},ok bool)
func (c *Cache) Len() int
func (c *Cache) Remove(key Key)
func (c *Cache) RemoveOldest()

用法举例

package main

import (
        "fmt"

        "github.com/golang/groupcache/lru"
)

func main() {
        cache := lru.New(2)
        cache.Add("x","x0")
        cache.Add("y","y0")
        yval,ok := cache.Get("y")
        if ok {
                fmt.Printf("y is %v\n",yval)
        }
        cache.Add("z","z0")

        fmt.Printf("cache length is %d\n",cache.Len())
        _,ok = cache.Get("x")
        if !ok {
                fmt.Printf("x key was weeded out\n")
        }
}
// Expect output
//--------------
// y is y0
// cache length is 2
// x key was weeded out

HTTPPool 模块

type HTTPPool struct {
    // 可选,为每次的请求封装的Context参数
    Context func(*http.Request) groupcache.Context

    // 可选,不懂这个干啥的
    // 注释说:请求的时候用的就是这个
    Transport func(groupcache.Context) http.RoundTripper
}

// self 必须是一个合法的URL指向当前的服务器,比如 "http://10.0.0.1:8000"
// 这个函数默会注册一个路由
// http.handle("/_groupcache/",poolInstance) 
// 该路由主要用户节点间获取数据的功能
// 另外该函数不能重复调用,否则会panic
func NewHTTPPool(self string) *HTTPPool

// 更新节点列表
// 用了consistenthash
// 奇怪的时候,只有节点添加函数,并没有删除的
func (p *HTTPPool) Set(peers ...string)

// 用一致性hash算法选择一个节点
func (p *HTTPPool) PickPeer(key string) (groupcache.ProtoGetter,bool)

// 用于处理通过HTTP传递过来的grpc请求
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter,r *http.Request)

其他 (待补充)

本来想在官网上找个例子,可以

package main

// A SizeReaderAt is a ReaderAt with a Size method.
//
// An io.SectionReader implements SizeReaderAt.
type SizeReaderAt interface {
    Size() int64
    io.ReaderAt
}

// NewMultiReaderAt is like io.MultiReader but produces a ReaderAt
// (and Size),instead of just a reader.
func NewMultiReaderAt(parts ...SizeReaderAt) SizeReaderAt {
    m := &multi{
        parts: make([]offsetAndSource,len(parts)),}
    var off int64
    for _,p := range parts {
        m.parts = append(m.parts,offsetAndSource{off,p})
        off += p.Size()
    }
    m.size = off
    return m
}

// NewChunkAlignedReaderAt returns a ReaderAt wrapper that is backed
// by a ReaderAt r of size totalSize where the wrapper guarantees that
// all ReadAt calls are aligned to chunkSize boundaries and of size
// chunkSize (except for the final chunk,which may be shorter).
//
// A chunk-aligned reader is good for caching,letting upper layers have
// any access pattern,but guarantees that the wrapped ReaderAt sees
// only nicely-cacheable access patterns & sizes.
func NewChunkAlignedReaderAt(r SizeReaderAt,chunkSize int) SizeReaderAt {
    // ...
}

func part(s string) SizeReaderAt {
    return io.NewSectionReader(strings.NewReader(s),int64(len(s)))
}

func handler(w http.ResponseWriter,r *http.Request) {
    sra := NewMultiReaderAt(
        part("Hello,"),part(" world! "),part("You requested "+r.URL.Path+"\n"),)
    rs := io.NewSectionReader(sra,sra.Size())
    http.ServeContent(w,r,"foo.txt",modTime,rs)
}


func main(){
    me := "http://10.0.0.1"
    peers := groupcache.NewHTTPPool(me)

    // Whenever peers change:
    peers.Set("http://10.0.0.1","http://10.0.0.2","http://10.0.0.3")
        
    var thumbNails = groupcache.NewGroup("thumbnail",64<<20,groupcache.GetterFunc(
        func(ctx groupcache.Context,key string,dest groupcache.Sink) error {
            fileName := key
            dest.SetBytes(generateThumbnail(fileName))
            return nil
        }))

    var data []byte
    err := thumbNails.Get(ctx,"big-file.jpg",groupcache.AllocatingByteSliceSink(&data))
    // ...
    http.ServeContent(w,"big-file-thumb.jpg",bytes.NewReader(data))


            
}

groupcache 使用例子

package main

import (
        "errors"
        "flag"
        "log"
        "net/http"
        "strconv"
        "strings"

        "github.com/golang/groupcache"
)

var localStorage map[string]string

func init() {
        localStorage = make(map[string]string)
        localStorage["hello"] = "world"
        localStorage["info"] = "This is an example"
}

func main() {
        port := flag.Int("port",4100,"Listen port")
        flag.Parse()

        // Name have to starts with http://
        self := "http://localhost:" + strconv.Itoa(*port)
        pool := groupcache.NewHTTPPool(self)
        pool.Set(self,"http://localhost:4101")

        var helloworld = groupcache.NewGroup("helloworld",10<<20,groupcache.GetterFunc(
                func(ctx groupcache.Context,dest groupcache.Sink) error {
                        log.Printf("groupcache get key: %v",key)
                        value,exists := localStorage[key]
                        if !exists {
                                dest.SetString(key + " NotExist")
                                return errors.New(key + " NotExist")
                        } else {
                                dest.SetString(value)
                                return nil
                        }
                }))

        http.HandleFunc("/",func(w http.ResponseWriter,r *http.Request) {
                key := strings.TrimPrefix(r.RequestURI,"/")
                log.Printf("Request(%v) key(%v)",r.RemoteAddr,key)
                if key == "" {
                        http.Error(w,"Bad Request",http.StatusBadRequest)
                        return
                }
                var data []byte
                err := helloworld.Get(nil,groupcache.AllocatingByteSliceSink(&data))
                if err != nil {
                        http.Error(w,err.Error(),http.StatusBadRequest)
                        return
                }
                log.Printf("cache data: %v",data)
                w.Write(data)
                log.Println("Gets: ",helloworld.Stats.Gets.String())
                log.Println("CacheHits: ",helloworld.Stats.CacheHits.String())
                log.Println("Loads: ",helloworld.Stats.Loads.String())
                log.Println("LocalLoads: ",helloworld.Stats.LocalLoads.String())
                log.Println("PeerErrors: ",helloworld.Stats.PeerErrors.String())
                log.Println("PeerLoads: ",helloworld.Stats.PeerLoads.String())
        })
        http.ListenAndServe(":"+strconv.Itoa(*port),nil)
}

参考文章

  1. http://talks.golang.org/2013/oscon-dl.slide#1
  2. http://xuchongfeng.github.io/2016/02/21/GroupCache%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB/

相关文章

程序目录结构 简单实现,用户登录后返回一个jwt的token,下次请求带上token请求用户信息接口并返回信息...
本篇博客的主要内容是用go写一个简单的Proof-of-Work共识机制,不涉及到网络通信环节,只是一个本地的简...
简介 默克尔树(MerkleTree)是一种典型的二叉树结构,其主要特点为: 最下面的叶节点包含存储数据或其...
接下来学习并发编程, 并发编程是go语言最有特色的地方, go对并发编程是原生支持. goroutine是go中最近本...
先普及一下, 什么是广度优先搜索 广度优先搜索类似于树的层次遍历。从图中的某一顶点出发,遍历每一个顶...
第一天: 接口的定义和实现 第二天: 一. go语言是面向接口编程. 在学习继承的时候说过, go语言只有封装,...