-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhandle.go
134 lines (113 loc) · 2.64 KB
/
handle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package accord
import (
"context"
"sync"
"time"
"github.com/bsm/accord/rpc"
"github.com/google/uuid"
)
// Handle holds temporary ownership of a resource. It will automatically renew
// its ownership in the background until either Done or Discard is called (first one wins).
// After a call to Done or Discard, all operations on the handle fail with ErrClosed.
type Handle struct {
id uuid.UUID
rpc rpc.V1Client
meta *metadata
opt *ClientOptions
mu sync.Mutex
ctx context.Context
close context.CancelFunc
}
func newHandle(id uuid.UUID, rpc rpc.V1Client, meta map[string]string, opt *ClientOptions) *Handle {
ctx, close := context.WithCancel(context.Background())
h := &Handle{
id: id,
rpc: rpc,
meta: &metadata{kv: meta},
opt: opt,
ctx: ctx,
close: close,
}
go h.renewLoop()
return h
}
// ID returns the handle ID.
func (h *Handle) ID() uuid.UUID {
return h.id
}
// Metadata returns metadata.
func (h *Handle) Metadata() map[string]string {
return h.meta.Snap()
}
// SetMeta sets a metadata key.
// It will only be persisted with the next (background) Renew or Done call.
func (h *Handle) SetMeta(key, value string) {
h.meta.Set(key, value)
}
// Renew manually renews the ownership of the resource with custom metadata.
func (h *Handle) Renew(ctx context.Context, meta map[string]string) error {
if h.isClosed() {
return ErrClosed
}
h.meta.Update(meta)
return h.renew(ctx, h.opt.ttlSeconds())
}
// Done marks the resource as done and invalidates the handle.
func (h *Handle) Done(ctx context.Context, meta map[string]string) error {
if h.isClosed() {
return ErrClosed
}
h.meta.Update(meta)
h.mu.Lock()
defer h.mu.Unlock()
_, err := h.rpc.Done(ctx, &rpc.DoneRequest{
Owner: h.opt.Owner,
HandleId: h.id[:],
Metadata: h.meta.Snap(),
})
if err == nil {
h.close()
}
return err
}
// Discard discards the handle.
func (h *Handle) Discard() error {
if h.isClosed() {
return ErrClosed
}
defer h.close()
return h.renew(h.ctx, 0)
}
func (h *Handle) isClosed() bool {
select {
case <-h.ctx.Done():
return true
default:
return false
}
}
func (h *Handle) renew(ctx context.Context, seconds uint32) error {
h.mu.Lock()
defer h.mu.Unlock()
_, err := h.rpc.Renew(ctx, &rpc.RenewRequest{
Owner: h.opt.Owner,
HandleId: h.id[:],
Ttl: seconds,
Metadata: h.meta.Snap(),
})
return err
}
func (h *Handle) renewLoop() {
ticker := time.NewTicker(h.opt.TTL * 3 / 10)
defer ticker.Stop()
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
if err := h.Renew(h.ctx, nil); err != nil && err != context.Canceled && err != ErrClosed {
h.opt.handleError(err)
}
}
}
}