Simple usage
gropucache的官方网站是 https://github.com/golang/groupcache
consistenthash模块
一致性hash算法,通常是用在查找一个合适的下载节点时,使负载更平均,但是对于同样的请求始终返回一样的结果。
type Map struct { hash Hash replicas int keys []int // Sorted hashMap map[int]string } Map结构中replicas的含义是增加虚拟桶,使数据分布更加均匀。 // 创建Map结构 func New(replicas int,fn Hash) *Map // 添加新的Key func (m *Map) Add(keys ...string) // 根据hash(key)获取value func (m *Map) Get(key string) string // 判断Map是否为空 func (m *Map) IsEmpty() bool // 竟然没有提供Remove方法 O_O
用法简单例子
package main import ( "fmt" "github.com/golang/groupcache/consistenthash" ) func main() { c := consistenthash.New(70,nil) c.Add("A","B","C","D","E") for _,key := range []string{"what","nice","what","good","yes!"} { fmt.Printf("%s -> %s\n",key,c.Get(key)) } } // Expect output // ------------- // what -> C // nice -> A // what -> C // nice -> A // good -> D // yes! -> E
singleflight 模块
type Group struct { } // 当多个相同的key请求的时候,函数只被调用一次 func (g *Group) Do(key string,fn func() (interface{},error)) (interface{},error)
使用例子
package main import ( "fmt" "sync" "time" "github.com/golang/groupcache/singleflight" ) func NewDelayReturn(dur time.Duration,n int) func() (interface{},error) { return func() (interface{},error) { time.Sleep(dur) return n,nil } } func main() { g := singleflight.Group{} wg := sync.WaitGroup{} wg.Add(2) go func() { ret,err := g.Do("key",NewDelayReturn(time.Second*1,1)) if err != nil { panic(err) } fmt.Printf("key-1 get %v\n",ret) wg.Done() }() go func() { time.Sleep(100 * time.Millisecond) // make sure this is call is later ret,NewDelayReturn(time.Second*2,2)) if err != nil { panic(err) } fmt.Printf("key-2 get %v\n",ret) wg.Done() }() wg.Wait() }
执行结果(耗时: 1.019s)
key-2 get 1 key-1 get 1
lru 模块
最初是用在内存管理上的一个算法,根据历史的请求数,分析出最热门的数据,并保存下来。
type Cache struct { // MaxEntries is the maximum number of cache entries before // an item is evicted. Zero means no limit. MaxEntries int // OnEvicted optionally specificies a callback function to be // executed when an entry is purged from the cache. OnEvicted func(key Key,value interface{}) // contains filtered or unexported fields } func New(maxEntries int) *Cache func (c *Cache) Add(key Key,value interface{}) func (c *Cache) Get(key Key) (value interface{},ok bool) func (c *Cache) Len() int func (c *Cache) Remove(key Key) func (c *Cache) RemoveOldest()
用法举例
package main import ( "fmt" "github.com/golang/groupcache/lru" ) func main() { cache := lru.New(2) cache.Add("x","x0") cache.Add("y","y0") yval,ok := cache.Get("y") if ok { fmt.Printf("y is %v\n",yval) } cache.Add("z","z0") fmt.Printf("cache length is %d\n",cache.Len()) _,ok = cache.Get("x") if !ok { fmt.Printf("x key was weeded out\n") } } // Expect output //-------------- // y is y0 // cache length is 2 // x key was weeded out
HTTPPool 模块
type HTTPPool struct { // 可选,为每次的请求封装的Context参数 Context func(*http.Request) groupcache.Context // 可选,不懂这个干啥的 // 注释说:请求的时候用的就是这个 Transport func(groupcache.Context) http.RoundTripper } // self 必须是一个合法的URL指向当前的服务器,比如 "http://10.0.0.1:8000" // 这个函数默会注册一个路由 // http.handle("/_groupcache/",poolInstance) // 该路由主要用户节点间获取数据的功能 // 另外该函数不能重复调用,否则会panic func NewHTTPPool(self string) *HTTPPool // 更新节点列表 // 用了consistenthash // 奇怪的时候,只有节点添加的函数,并没有删除的 func (p *HTTPPool) Set(peers ...string) // 用一致性hash算法选择一个节点 func (p *HTTPPool) PickPeer(key string) (groupcache.ProtoGetter,bool) // 用于处理通过HTTP传递过来的grpc请求 func (p *HTTPPool) ServeHTTP(w http.ResponseWriter,r *http.Request)
其他 (待补充)
本来想在官网上找个例子,可以
package main // A SizeReaderAt is a ReaderAt with a Size method. // // An io.SectionReader implements SizeReaderAt. type SizeReaderAt interface { Size() int64 io.ReaderAt } // NewMultiReaderAt is like io.MultiReader but produces a ReaderAt // (and Size),instead of just a reader. func NewMultiReaderAt(parts ...SizeReaderAt) SizeReaderAt { m := &multi{ parts: make([]offsetAndSource,len(parts)),} var off int64 for _,p := range parts { m.parts = append(m.parts,offsetAndSource{off,p}) off += p.Size() } m.size = off return m } // NewChunkAlignedReaderAt returns a ReaderAt wrapper that is backed // by a ReaderAt r of size totalSize where the wrapper guarantees that // all ReadAt calls are aligned to chunkSize boundaries and of size // chunkSize (except for the final chunk,which may be shorter). // // A chunk-aligned reader is good for caching,letting upper layers have // any access pattern,but guarantees that the wrapped ReaderAt sees // only nicely-cacheable access patterns & sizes. func NewChunkAlignedReaderAt(r SizeReaderAt,chunkSize int) SizeReaderAt { // ... } func part(s string) SizeReaderAt { return io.NewSectionReader(strings.NewReader(s),int64(len(s))) } func handler(w http.ResponseWriter,r *http.Request) { sra := NewMultiReaderAt( part("Hello,"),part(" world! "),part("You requested "+r.URL.Path+"\n"),) rs := io.NewSectionReader(sra,sra.Size()) http.ServeContent(w,r,"foo.txt",modTime,rs) } func main(){ me := "http://10.0.0.1" peers := groupcache.NewHTTPPool(me) // Whenever peers change: peers.Set("http://10.0.0.1","http://10.0.0.2","http://10.0.0.3") var thumbNails = groupcache.NewGroup("thumbnail",64<<20,groupcache.GetterFunc( func(ctx groupcache.Context,key string,dest groupcache.Sink) error { fileName := key dest.SetBytes(generateThumbnail(fileName)) return nil })) var data []byte err := thumbNails.Get(ctx,"big-file.jpg",groupcache.AllocatingByteSliceSink(&data)) // ... http.ServeContent(w,"big-file-thumb.jpg",bytes.NewReader(data)) }
groupcache 使用例子
package main import ( "errors" "flag" "log" "net/http" "strconv" "strings" "github.com/golang/groupcache" ) var localStorage map[string]string func init() { localStorage = make(map[string]string) localStorage["hello"] = "world" localStorage["info"] = "This is an example" } func main() { port := flag.Int("port",4100,"Listen port") flag.Parse() // Name have to starts with http:// self := "http://localhost:" + strconv.Itoa(*port) pool := groupcache.NewHTTPPool(self) pool.Set(self,"http://localhost:4101") var helloworld = groupcache.NewGroup("helloworld",10<<20,groupcache.GetterFunc( func(ctx groupcache.Context,dest groupcache.Sink) error { log.Printf("groupcache get key: %v",key) value,exists := localStorage[key] if !exists { dest.SetString(key + " NotExist") return errors.New(key + " NotExist") } else { dest.SetString(value) return nil } })) http.HandleFunc("/",func(w http.ResponseWriter,r *http.Request) { key := strings.TrimPrefix(r.RequestURI,"/") log.Printf("Request(%v) key(%v)",r.RemoteAddr,key) if key == "" { http.Error(w,"Bad Request",http.StatusBadRequest) return } var data []byte err := helloworld.Get(nil,groupcache.AllocatingByteSliceSink(&data)) if err != nil { http.Error(w,err.Error(),http.StatusBadRequest) return } log.Printf("cache data: %v",data) w.Write(data) log.Println("Gets: ",helloworld.Stats.Gets.String()) log.Println("CacheHits: ",helloworld.Stats.CacheHits.String()) log.Println("Loads: ",helloworld.Stats.Loads.String()) log.Println("LocalLoads: ",helloworld.Stats.LocalLoads.String()) log.Println("PeerErrors: ",helloworld.Stats.PeerErrors.String()) log.Println("PeerLoads: ",helloworld.Stats.PeerLoads.String()) }) http.ListenAndServe(":"+strconv.Itoa(*port),nil) }