diff --git a/go.mod b/go.mod index e8b6959..6e47a72 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/thomasjungblut/go-sstables require ( github.com/anishathalye/porcupine v0.1.2 + github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/libp2p/go-buffer-pool v0.0.2 github.com/ncw/directio v1.0.5 @@ -13,8 +14,10 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/libp2p/go-sockaddr v0.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect + golang.org/x/sys v0.0.0-20210921065528-437939a70204 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 4550a06..89a42c9 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b h1:H31NTxP3FAzF0mVbqRCJA04XKxoAQD85Ug+zZ761duo= +github.com/godzie44/go-uring v0.0.0-20220131163417-b7645a723f7b/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM= +github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 h1:5zELAgnSz0gqmr4Q5DWCoOzNHoeBAxVUXB7LS1eG+sw= +github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5/go.mod h1:ermjEDUoT/fS+3Ona5Vd6t6mZkw1eHp99ILO5jGRBkM= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -8,6 +12,8 @@ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= +github.com/libp2p/go-sockaddr v0.1.1 h1:yD80l2ZOdGksnOyHrhxDdTDFrf7Oy+v3FMVArIRgZxQ= +github.com/libp2p/go-sockaddr v0.1.1/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4= github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -23,6 +29,9 @@ github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569 h1:acDBvgSBtnyB github.com/tjungblu/porcupine v0.0.0-20221116095144-377185aa0569/go.mod h1:+z336r1WR0gcwl1ALfoNBpDTCW06vO5DzBwunEcSvcs= golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f h1:wJ3O7VtAmBlW5LFzggOI2U6CBWIkG+/IYf4Q/VGJdaA= golang.org/x/exp v0.0.0-20181210123644-7d6377eee41f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20210921065528-437939a70204 h1:JJhkWtBuTQKyz2bd5WG9H8iUsJRU3En/KRfN8B2RnDs= +golang.org/x/sys v0.0.0-20210921065528-437939a70204/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/recordio/README.md b/recordio/README.md index 6bb26f1..04a87c4 100644 --- a/recordio/README.md +++ b/recordio/README.md @@ -210,3 +210,12 @@ available, err := recordio.IsDirectIOAvailable() In this package the DirectIO support comes through a library called [ncw/directio](https://github.com/ncw/directio), which has good support across Linux, macOS and Windows under a single interface. The caveats of each platform, for example the buffer/block sizes, need to still be taken into account. Another caveat is that the block alignment causes to write a certain amount of waste. Let's imagine you have blocks of 1024 bytes and only want to write 1025 bytes, with DirectIO enabled you will end up with a file of size 2048 (2 blocks) instead of a file with only 1025 bytes with DirectIO disabled. The DirectIO file will be padded with zeroes towards the end and the in-library readers honor this format and not assume a corrupted file format. + + +## io_uring (experimental) + +Since version 5.x the linux kernel supports a new asynchronous approach to execute syscalls. In a few words, io_uring is a shared ring buffer between the kernel and user space which allows queueing syscalls and later retrieve their results. + +You can read more about io_uring at [https://kernel.dk/io_uring.pdf](https://kernel.dk/io_uring.pdf). + +In case you have SELinux enabled, you might hit "permission denied" errors when initializing the uring try to test with permissive mode enabled temporarily. diff --git a/recordio/async_writer.go b/recordio/async_writer.go new file mode 100644 index 0000000..fc576ef --- /dev/null +++ b/recordio/async_writer.go @@ -0,0 +1,123 @@ +//go:build linux + +package recordio + +import ( + "github.com/godzie44/go-uring/uring" + "os" +) + +// AsyncWriter takes an uring and executes all writes asynchronously. There are only two barriers: flush and close. +// Those barriers will ensure all previous writes have succeeded. +type AsyncWriter struct { + ringSize int32 + submittedSQEs int32 + ring *uring.Ring + + file *os.File + offset uint64 +} + +// TODO(thomas): not thread-safe (yet) +func (w *AsyncWriter) Write(p []byte) (int, error) { + for w.submittedSQEs >= w.ringSize { + err := w.submitAwaitOne() + if err != nil { + return 0, err + } + } + + // TODO(thomas): we would need to make a defensive copy for p, which actually is not optimal + // the reason is the buffer pooling (or the header reuse). It so happens that the original backing array was written + // a couple times before the ring was submitted. That caused some funny offsets to be written and eventually fail reading. + pc := make([]byte, len(p)) + copy(pc, p) + + err := w.ring.QueueSQE(uring.Write(w.file.Fd(), pc, w.offset), 0, 0) + if err != nil { + return 0, err + } + + w.submittedSQEs++ + w.offset += uint64(len(p)) + + return len(p), nil +} + +func (w *AsyncWriter) Flush() error { + for w.submittedSQEs > 0 { + // wait for at least one event to free from the queue + err := w.submitAwaitOne() + if err != nil { + return err + } + } + + return nil +} + +func (w *AsyncWriter) submitAwaitOne() error { + // TODO(thomas): most likely there are more CQ events waiting, we should try to drain them optimistically to avoid overflowing memory buffers + cqe, err := w.ring.SubmitAndWaitCQEvents(1) + if err != nil { + return err + } + + w.submittedSQEs-- + w.ring.SeenCQE(cqe) + + err = cqe.Error() + if err != nil { + return err + } + + return nil +} + +func (w *AsyncWriter) Size() int { + return 0 +} + +func (w *AsyncWriter) Close() error { + err := w.Flush() + if err != nil { + return err + } + + err = w.ring.UnRegisterFiles() + if err != nil { + return err + } + + err = w.ring.Close() + if err != nil { + return err + } + + return w.file.Close() +} + +func NewAsyncWriter(filePath string, numRingEntries uint32, opts ...uring.SetupOption) (WriteCloserFlusher, *os.File, error) { + ring, err := uring.New(numRingEntries, opts...) + if err != nil { + return nil, nil, err + } + + writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, nil, err + } + + err = ring.RegisterFiles([]int{int(writeFile.Fd())}) + if err != nil { + return nil, nil, err + } + + writer := &AsyncWriter{ + ringSize: int32(numRingEntries), + file: writeFile, + ring: ring, + } + + return writer, writeFile, nil +} diff --git a/recordio/async_writer_test.go b/recordio/async_writer_test.go new file mode 100644 index 0000000..f42f79b --- /dev/null +++ b/recordio/async_writer_test.go @@ -0,0 +1,89 @@ +//go:build linux + +package recordio + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "io/ioutil" + "os" + "testing" +) + +func TestAsyncWriter_HappyPath(t *testing.T) { + ok, err := IsIOUringAvailable() + require.NoError(t, err) + if !ok { + t.Skip("iouring not available here") + return + } + + temp, err := ioutil.TempFile("", "TestAsyncWriter_HappyPath") + require.NoError(t, err) + defer closeCleanFile(t, temp) + + writer, file, err := NewAsyncWriter(temp.Name(), 4) + require.NoError(t, err) + require.NotNil(t, file) + + var expected []byte + for i := 0; i < 100; i++ { + s := randomRecordOfSize(10) + _, err = writer.Write(s) + require.NoError(t, err) + expected = append(expected, s...) + } + + require.NoError(t, writer.Close()) + fileContentEquals(t, file, expected) +} + +func TestAsyncWriter_GuardAgainstBufferReuse(t *testing.T) { + ok, err := IsIOUringAvailable() + require.NoError(t, err) + if !ok { + t.Skip("iouring not available here") + return + } + + temp, err := ioutil.TempFile("", "TestAsyncWriter_GuardAgainstBufferReuse") + require.NoError(t, err) + defer closeCleanFile(t, temp) + + writer, file, err := NewAsyncWriter(temp.Name(), 4) + require.NoError(t, err) + require.NotNil(t, file) + + reusedSlice := []byte{13, 06, 91} + // we are writing the same slice, three times before a forced flush due to capacity + writeBuf(t, writer, reusedSlice) + writeBuf(t, writer, reusedSlice) + writeBuf(t, writer, reusedSlice) + // fourth time we change the slice in-place + reusedSlice[0] = 29 + writeBuf(t, writer, reusedSlice) + writeBuf(t, writer, reusedSlice) + require.NoError(t, writer.Close()) + + fileContentEquals(t, file, []byte{ + 13, 06, 91, + 13, 06, 91, + 13, 06, 91, + 29, 06, 91, + 29, 06, 91, + }) +} + +func fileContentEquals(t *testing.T, file *os.File, expectedContent []byte) { + f, err := os.Open(file.Name()) + require.NoError(t, err) + all, err := ioutil.ReadAll(f) + require.NoError(t, err) + assert.Equal(t, expectedContent, all) +} + +func writeBuf(t *testing.T, writer WriteCloserFlusher, buf []byte) { + o, err := writer.Write(buf) + require.NoError(t, err) + assert.Equal(t, len(buf), o) +} diff --git a/recordio/file_writer.go b/recordio/file_writer.go index 0cba66b..f060ef6 100644 --- a/recordio/file_writer.go +++ b/recordio/file_writer.go @@ -88,7 +88,7 @@ func fileHeaderAsByteSlice(compressionType uint32) []byte { } // for legacy reference still around, main paths unused - mostly for tests writing old versions -//noinspection GoUnusedFunction +// noinspection GoUnusedFunction func writeRecordHeaderV1(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) { // 4 byte magic number, 8 byte uncompressed size, 8 bytes for compressed size = 20 bytes bytes := make([]byte, RecordHeaderSizeBytes) diff --git a/recordio/io_uring.go b/recordio/io_uring.go new file mode 100644 index 0000000..77ea4db --- /dev/null +++ b/recordio/io_uring.go @@ -0,0 +1,44 @@ +package recordio + +import ( + "github.com/godzie44/go-uring/uring" + "os" +) + +type IOUringFactory struct { + numRingEntries uint32 + opts []uring.SetupOption +} + +func (f *IOUringFactory) CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error) { + //TODO implement me + panic("implement me") +} + +func (f *IOUringFactory) CreateNewWriter(filePath string, _ int) (*os.File, WriteCloserFlusher, error) { + writer, file, err := NewAsyncWriter(filePath, f.numRingEntries, f.opts...) + if err != nil { + return nil, nil, err + } + + return file, writer, nil +} + +func NewIOUringFactory(numRingEntries uint32, opts ...uring.SetupOption) *IOUringFactory { + return &IOUringFactory{ + numRingEntries: numRingEntries, + opts: opts, + } +} + +// IsIOUringAvailable tests whether io_uring is supported by the kernel. +// It will return (true, nil) if that's the case, if it's not available it will be (false, nil). +// Any other error will be indicated by the error (either true/false). +func IsIOUringAvailable() (available bool, err error) { + ring, err := uring.New(1) + defer func() { + err = ring.Close() + }() + + return err == nil, err +} diff --git a/recordio/io_uring_test.go b/recordio/io_uring_test.go new file mode 100644 index 0000000..8eb40ab --- /dev/null +++ b/recordio/io_uring_test.go @@ -0,0 +1,15 @@ +package recordio + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestIsIOUringAvailable_HappyPath(t *testing.T) { + ok, err := IsIOUringAvailable() + require.NoError(t, err) + if !ok { + t.Skip("iouring not available here") + return + } +} diff --git a/recordio/recordio_test.go b/recordio/recordio_test.go index 6d51257..9e1c823 100644 --- a/recordio/recordio_test.go +++ b/recordio/recordio_test.go @@ -61,7 +61,7 @@ func TestReadWriteEndToEndDirectIO(t *testing.T) { return } - tmpFile, err := ioutil.TempFile("", "recordio_EndToEnd") + tmpFile, err := ioutil.TempFile("", "recordio_EndToEndDirectIO") require.NoError(t, err) defer func() { require.NoError(t, os.Remove(tmpFile.Name())) }() writer, err := NewFileWriter(File(tmpFile), DirectIO())