Skip to content

Commit

Permalink
feat: ext/ttl
Browse files Browse the repository at this point in the history
ttl kv storage
  • Loading branch information
wdvxdr1123 committed Feb 24, 2021
1 parent 0a4353f commit 436e20b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 2 deletions.
6 changes: 4 additions & 2 deletions driver/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type wsDriver struct {
accessToken string
}

// Connect ...
// Connect 连接ws服务端
func (ws *wsDriver) Connect(url, accessToken string) {
var err error
ws.url = url
Expand All @@ -56,6 +56,7 @@ RETRY:
log.Infof("连接Websocket服务器: %v 成功", url)
}

// Listen 开始监听事件
func (ws *wsDriver) Listen(handler func([]byte)) {
for {
t, payload, err := ws.conn.ReadMessage()
Expand All @@ -66,7 +67,7 @@ func (ws *wsDriver) Listen(handler func([]byte)) {
}

if t == websocket.TextMessage {
rsp := gjson.ParseBytes(payload)
rsp := gjson.Parse(helper.BytesToString(payload))
if rsp.Get("echo").Exists() { // 存在echo字段,是api调用的返回
log.Debug("接收到API调用返回: ", strings.TrimSpace(helper.BytesToString(payload)))
if c, ok := ws.seqMap.LoadAndDelete(rsp.Get("echo").Uint()); ok {
Expand Down Expand Up @@ -94,6 +95,7 @@ func (ws *wsDriver) nextSeq() uint64 {
return atomic.AddUint64(&ws.seq, 1)
}

// Send 发送ws请求
func (ws *wsDriver) Send(req zero.APIRequest) (zero.APIResponse, error) {
ch := make(chan zero.APIResponse)
req.Echo = ws.nextSeq()
Expand Down
79 changes: 79 additions & 0 deletions extension/ttl/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ttl

import (
"sync"
"time"
)

// Cache is a synchronised map of items that auto-expire once stale
type Cache struct {
sync.RWMutex
ttl time.Duration
items map[string]*Item
}

// NewCache 创建指定生命周期的 Cache
func NewCache(ttl time.Duration) *Cache {
cache := &Cache{
ttl: ttl,
items: map[string]*Item{},
}
go cache.gc() // async gc
return cache
}

func (c *Cache) gc() {
ticker := time.Tick(time.Minute)
for {
<-ticker
c.Lock()
for key, item := range c.items {
if item.expired() {
delete(c.items, key)
}
}
c.Unlock()
}
}

// Get 通过 key 获取指定的元素
func (c *Cache) Get(key string) interface{} {
c.RLock()
item, ok := c.items[key]
c.RUnlock()
if ok && item.expired() {
c.Delete(key)
return nil
}
if item == nil {
return nil
}
return item.value
}

// Set 设置指定 key 的值
func (c *Cache) Set(key string, val interface{}) {
c.Lock()
defer c.Unlock()
item := &Item{
exp: time.Now().Add(c.ttl),
value: val,
}
c.items[key] = item
}

// Delete 删除指定key
func (c *Cache) Delete(key string) {
c.Lock()
defer c.Unlock()
delete(c.items, key)
}

// Touch 为指定key添加一定生命周期
func (c *Cache) Touch(key string, ttl time.Duration) {
c.Lock()
defer c.Unlock()
if c.items[key] != nil {
c.items[key].exp = c.items[key].exp.Add(ttl)
}
}
40 changes: 40 additions & 0 deletions extension/ttl/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ttl

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestGet(t *testing.T) {
cache := NewCache(time.Second)

data := cache.Get("hello")
assert.Nil(t, data)

cache.Set("hello", "world")
data = cache.Get("hello")
assert.Equal(t, "world", data)
}

func TestExpiration(t *testing.T) {
cache := NewCache(time.Second)

cache.Set("x", "1")
cache.Set("y", "z")
cache.Set("z", "3")

<-time.After(500 * time.Millisecond)
val := cache.Get("x")
assert.Equal(t, "1", val)

<-time.After(time.Second)
val = cache.Get("x")
assert.Equal(t, nil, val)
val = cache.Get("y")
assert.Equal(t, nil, val)
val = cache.Get("z")
assert.Equal(t, nil, val)
assert.Equal(t, 0, len(cache.items))
}
4 changes: 4 additions & 0 deletions extension/ttl/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package ttl provides a synchronised map that auto-expire
//
// This package is simply to provide a storage for session.
package ttl
13 changes: 13 additions & 0 deletions extension/ttl/item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ttl

import "time"

// Item represents a record in the cache map
type Item struct {
exp time.Time // expired time
value interface{} // value of the item
}

func (it *Item) expired() bool {
return it.exp.Before(time.Now())
}

0 comments on commit 436e20b

Please sign in to comment.