-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathazure.go
98 lines (75 loc) · 2.29 KB
/
azure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package jetcapture
import (
"context"
"io"
"net/url"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)
const azureUploadBufferSz = 64 * 1024 * 1024
var (
_ BlockStore[string] = &AzureBlobStore[string]{}
)
// BuildURLBase should return a URL that serves as the base for the block
// For example: https://capture.blob.core.windows.net/backup/from-stream-foo/
type BuildURLBase[K DestKey] func(ctx context.Context, destKey K) (string, error)
// OverrideUploadOptions is an optional function to override various upload option (e.g. AccessTier)
type OverrideUploadOptions[K DestKey] func(options *azblob.UploadStreamOptions, destKey K)
type AzureBlobStore[K DestKey] struct {
credz azcore.TokenCredential
buildURLBaseFn BuildURLBase[K]
optionsFn OverrideUploadOptions[K]
}
func NewAzureBlobStore[K DestKey](
credential azcore.TokenCredential,
buildURLBaseFn BuildURLBase[K],
optionsFn OverrideUploadOptions[K],
) (*AzureBlobStore[K], error) {
return &AzureBlobStore[K]{
buildURLBaseFn: buildURLBaseFn,
optionsFn: optionsFn,
credz: credential,
}, nil
}
func (a *AzureBlobStore[K]) Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, int64, time.Duration, error) {
start := time.Now()
base, err := a.buildURLBaseFn(ctx, destKey)
if err != nil {
return "", 0, 0, err
}
parsed, _ := url.Parse(base)
container, blobName, _ := strings.Cut(parsed.Path, "/")
u, err := url.JoinPath(blobName, dir, fileName)
if err != nil {
return "", 0, 0, err
}
log.Infof("writing block to https://%s%s/%s", parsed.Host, container, u)
bc, err := azblob.NewClient("https://"+parsed.Host, a.credz, nil)
if err != nil {
return "", 0, 0, err
}
var options azblob.UploadStreamOptions
if a.optionsFn != nil {
a.optionsFn(&options, destKey)
}
if options.BlockSize == 0 {
options.BlockSize = azureUploadBufferSz
}
reader := &countingReader{Reader: block}
_, err = bc.UploadStream(ctx, container, u, reader, &options)
if err != nil {
return "", 0, 0, err
}
return u, int64(reader.n), time.Since(start), nil
}
type countingReader struct {
io.Reader
n int
}
func (r *countingReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.n += n
return
}