Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io_uring support #11

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module github.com/thomasjungblut/go-sstables

require (
github.com/anishathalye/porcupine v0.1.2
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/libp2p/go-buffer-pool v0.0.2
github.com/ncw/directio v1.0.5
Expand All @@ -13,8 +14,10 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/libp2p/go-sockaddr v0.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
golang.org/x/sys v0.0.0-20210921065528-437939a70204 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b h1:H31NTxP3FAzF0mVbqRCJA04XKxoAQD85Ug+zZ761duo=
github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM=
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 h1:5zELAgnSz0gqmr4Q5DWCoOzNHoeBAxVUXB7LS1eG+sw=
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ=
github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k=
github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4=
github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -23,6 +29,9 @@ github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569 h1:acDBvgSBtnyB
github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569/go.mod h1:+z336r1WR0gcwl1ALfoNBpDTCW06vO5DzBwunEcSvcs=
golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f h1:wJ3O7VtAmBlW5LFzggOI2U6CBWIkG+/IYf4Q/VGJdaA=
golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20210921065528-437939a70204 h1:JJhkWtBuTQKyz2bd5WG9H8iUsJRU3En/KRfN8B2RnDs=
golang.org/x/sys v0.0.0-20210921065528-437939a70204/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
9 changes: 9 additions & 0 deletions recordio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,12 @@ available, err := recordio.IsDirectIOAvailable()

In this package the DirectIO support comes through a library called [ncw/directio](https://github.com/ncw/directio), which has good support across Linux, macOS and Windows under a single interface. The caveats of each platform, for example the buffer/block sizes, need to still be taken into account.
Another caveat is that the block alignment causes to write a certain amount of waste. Let's imagine you have blocks of 1024 bytes and only want to write 1025 bytes, with DirectIO enabled you will end up with a file of size 2048 (2 blocks) instead of a file with only 1025 bytes with DirectIO disabled. The DirectIO file will be padded with zeroes towards the end and the in-library readers honor this format and not assume a corrupted file format.


## io_uring (experimental)

Since version 5.x the linux kernel supports a new asynchronous approach to execute syscalls. In a few words, io_uring is a shared ring buffer between the kernel and user space which allows queueing syscalls and later retrieve their results.

You can read more about io_uring at [https://kernel.dk/io_uring.pdf](https://kernel.dk/io_uring.pdf).

In case you have SELinux enabled, you might hit "permission denied" errors when initializing the uring try to test with permissive mode enabled temporarily.
123 changes: 123 additions & 0 deletions recordio/async_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//go:build linux

package recordio

import (
"github.com/godzie44/go-uring/uring"
"os"
)

// AsyncWriter takes an uring and executes all writes asynchronously. There are only two barriers: flush and close.
// Those barriers will ensure all previous writes have succeeded.
type AsyncWriter struct {
ringSize int32
submittedSQEs int32
ring *uring.Ring

file *os.File
offset uint64
}

// TODO(thomas): not thread-safe (yet)
func (w *AsyncWriter) Write(p []byte) (int, error) {
for w.submittedSQEs >= w.ringSize {
err := w.submitAwaitOne()
if err != nil {
return 0, err
}
}

// TODO(thomas): we would need to make a defensive copy for p, which actually is not optimal
// the reason is the buffer pooling (or the header reuse). It so happens that the original backing array was written
// a couple times before the ring was submitted. That caused some funny offsets to be written and eventually fail reading.
pc := make([]byte, len(p))
copy(pc, p)

err := w.ring.QueueSQE(uring.Write(w.file.Fd(), pc, w.offset), 0, 0)
if err != nil {
return 0, err
}

w.submittedSQEs++
w.offset += uint64(len(p))

return len(p), nil
}

func (w *AsyncWriter) Flush() error {
for w.submittedSQEs > 0 {
// wait for at least one event to free from the queue
err := w.submitAwaitOne()
if err != nil {
return err
}
}

return nil
}

func (w *AsyncWriter) submitAwaitOne() error {
// TODO(thomas): most likely there are more CQ events waiting, we should try to drain them optimistically to avoid overflowing memory buffers
cqe, err := w.ring.SubmitAndWaitCQEvents(1)
if err != nil {
return err
}

w.submittedSQEs--
w.ring.SeenCQE(cqe)

err = cqe.Error()
if err != nil {
return err
}

return nil
}

func (w *AsyncWriter) Size() int {
return 0
}

func (w *AsyncWriter) Close() error {
err := w.Flush()
if err != nil {
return err
}

err = w.ring.UnRegisterFiles()
if err != nil {
return err
}

err = w.ring.Close()
if err != nil {
return err
}

return w.file.Close()
}

func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, *os.File, error) {
ring, err := uring.New(numRingEntries, opts...)
if err != nil {
return nil, nil, err
}

writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return nil, nil, err
}

err = ring.RegisterFiles([]int{int(writeFile.Fd())})
if err != nil {
return nil, nil, err
}

writer := &AsyncWriter{
ringSize: int32(numRingEntries),
file: writeFile,
ring: ring,
}

return writer, writeFile, nil
}
89 changes: 89 additions & 0 deletions recordio/async_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build linux

package recordio

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
"os"
"testing"
)

func TestAsyncWriter_HappyPath(t *testing.T) {
ok, err := IsIOUringAvailable()
require.NoError(t, err)
if !ok {
t.Skip("iouring not available here")
return
}

temp, err := ioutil.TempFile("", "TestAsyncWriter_HappyPath")
require.NoError(t, err)
defer closeCleanFile(t, temp)

writer, file, err := NewAsyncWriter(temp.Name(), 4)
require.NoError(t, err)
require.NotNil(t, file)

var expected []byte
for i := 0; i < 100; i++ {
s := randomRecordOfSize(10)
_, err = writer.Write(s)
require.NoError(t, err)
expected = append(expected, s...)
}

require.NoError(t, writer.Close())
fileContentEquals(t, file, expected)
}

func TestAsyncWriter_GuardAgainstBufferReuse(t *testing.T) {
ok, err := IsIOUringAvailable()
require.NoError(t, err)
if !ok {
t.Skip("iouring not available here")
return
}

temp, err := ioutil.TempFile("", "TestAsyncWriter_GuardAgainstBufferReuse")
require.NoError(t, err)
defer closeCleanFile(t, temp)

writer, file, err := NewAsyncWriter(temp.Name(), 4)
require.NoError(t, err)
require.NotNil(t, file)

reusedSlice := []byte{13, 06, 91}
// we are writing the same slice, three times before a forced flush due to capacity
writeBuf(t, writer, reusedSlice)
writeBuf(t, writer, reusedSlice)
writeBuf(t, writer, reusedSlice)
// fourth time we change the slice in-place
reusedSlice[0] = 29
writeBuf(t, writer, reusedSlice)
writeBuf(t, writer, reusedSlice)
require.NoError(t, writer.Close())

fileContentEquals(t, file, []byte{
13, 06, 91,
13, 06, 91,
13, 06, 91,
29, 06, 91,
29, 06, 91,
})
}

func fileContentEquals(t *testing.T, file *os.File, expectedContent []byte) {
f, err := os.Open(file.Name())
require.NoError(t, err)
all, err := ioutil.ReadAll(f)
require.NoError(t, err)
assert.Equal(t, expectedContent, all)
}

func writeBuf(t *testing.T, writer WriteCloserFlusher, buf []byte) {
o, err := writer.Write(buf)
require.NoError(t, err)
assert.Equal(t, len(buf), o)
}
2 changes: 1 addition & 1 deletion recordio/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func fileHeaderAsByteSlice(compressionType uint32) []byte {
}

// for legacy reference still around, main paths unused - mostly for tests writing old versions
//noinspection GoUnusedFunction
// noinspection GoUnusedFunction
func writeRecordHeaderV1(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) {
// 4 byte magic number, 8 byte uncompressed size, 8 bytes for compressed size = 20 bytes
bytes := make([]byte, RecordHeaderSizeBytes)
Expand Down
44 changes: 44 additions & 0 deletions recordio/io_uring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package recordio

import (
"github.com/godzie44/go-uring/uring"
"os"
)

type IOUringFactory struct {
numRingEntries uint32
opts []uring.SetupOption
}

func (f *IOUringFactory) CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error) {
//TODO implement me
panic("implement me")
}

func (f *IOUringFactory) CreateNewWriter(filePath string, _ int) (*os.File, WriteCloserFlusher, error) {
writer, file, err := NewAsyncWriter(filePath, f.numRingEntries, f.opts...)
if err != nil {
return nil, nil, err
}

return file, writer, nil
}

func NewIOUringFactory(numRingEntries uint32, opts ...uring.SetupOption) *IOUringFactory {
return &IOUringFactory{
numRingEntries: numRingEntries,
opts: opts,
}
}

// IsIOUringAvailable tests whether io_uring is supported by the kernel.
// It will return (true, nil) if that's the case, if it's not available it will be (false, nil).
// Any other error will be indicated by the error (either true/false).
func IsIOUringAvailable() (available bool, err error) {
ring, err := uring.New(1)
defer func() {
err = ring.Close()
}()

return err == nil, err
}
15 changes: 15 additions & 0 deletions recordio/io_uring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package recordio

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestIsIOUringAvailable_HappyPath(t *testing.T) {
ok, err := IsIOUringAvailable()
require.NoError(t, err)
if !ok {
t.Skip("iouring not available here")
return
}
}
2 changes: 1 addition & 1 deletion recordio/recordio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestReadWriteEndToEndDirectIO(t *testing.T) {
return
}

tmpFile, err := ioutil.TempFile("", "recordio_EndToEnd")
tmpFile, err := ioutil.TempFile("", "recordio_EndToEndDirectIO")
require.NoError(t, err)
defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }()
writer, err := NewFileWriter(File(tmpFile), DirectIO())
Expand Down