Skip to content

Commit

Permalink
kvdb: add postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
joostjager committed Sep 21, 2021
1 parent 9264185 commit 3eb80ca
Show file tree
Hide file tree
Showing 19 changed files with 2,055 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ jobs:
matrix:
unit_type:
- btcd unit-cover
- unit tags=kvdb_etcd
- unit tags="kvdb_etcd kvdb_postgres"
- travis-race
steps:
- name: git checkout
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/btcsuite/btcwallet/wallet/txrules v1.0.0
github.com/btcsuite/btcwallet/walletdb v1.3.6-0.20210803004036-eebed51155ec
github.com/btcsuite/btcwallet/wtxmgr v1.3.1-0.20210822222949-9b5a201c344c
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f
github.com/davecgh/go-spew v1.1.1
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-errors/errors v1.0.1
Expand Down Expand Up @@ -55,7 +55,7 @@ require (
github.com/urfave/cli v1.20.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
golang.org/x/net v0.0.0-20210913180222-943fd674d43e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164 // indirect
Expand Down
138 changes: 136 additions & 2 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions kvdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const (
// live instance of etcd.
EtcdBackendName = "etcd"

// PostgresBackendName is the name of the backend that should be passed
// into kvdb.Create to initialize a new instance of kvdb.Backend backed
// by a live instance of postgres.
PostgresBackendName = "postgres"

// DefaultBoltAutoCompactMinAge is the default minimum time that must
// have passed since a bolt database file was last compacted for the
// compaction to be considered again.
Expand Down
3 changes: 3 additions & 0 deletions kvdb/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ require (
github.com/btcsuite/btcwallet/walletdb v1.3.6-0.20210803004036-eebed51155ec
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/btree v1.0.1
github.com/fergusstrange/embedded-postgres v1.7.0
github.com/jackc/pgx/v4 v4.13.0
github.com/lightningnetwork/lnd/healthcheck v1.0.0
github.com/stretchr/testify v1.7.0
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
)

// This replace is for https://github.com/advisories/GHSA-w73w-5m7g-f7qc
Expand Down
145 changes: 141 additions & 4 deletions kvdb/go.sum

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion kvdb/log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kvdb

import "github.com/btcsuite/btclog"
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/kvdb/postgres"
)

// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
Expand All @@ -9,4 +12,6 @@ var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger

postgres.UseLogger(log)
}
9 changes: 9 additions & 0 deletions kvdb/postgres/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package postgres

import "time"

// Config holds postgres configuration data.
type Config struct {
Dsn string `long:"dsn" description:"Database connection string."`
Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."`
}
241 changes: 241 additions & 0 deletions kvdb/postgres/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// +build kvdb_postgres

package postgres

import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/btcsuite/btcwallet/walletdb"
_ "github.com/jackc/pgx/v4/stdlib"
)

const (
// kvTableName is the name of the table that will contain all the kv
// pairs.
kvTableName = "kv"
)

// KV stores a key/value pair.
type KV struct {
key string
val string
}

// db holds a reference to the postgres connection connection.
type db struct {
// cfg is the postgres connection config.
cfg *Config

// prefix is the table name prefix that is used to simulate namespaces.
// We don't use schemas because at least sqlite does not support that.
prefix string

// ctx is the overall context for the database driver.
//
// TODO: This is an anti-pattern that is in place until the kvdb
// interface supports a context.
ctx context.Context

// db is the underlying database connection instance.
db *sql.DB

// lock is the global write lock that ensures single writer.
lock sync.RWMutex

// table is the name of the table that contains the data for all
// top-level buckets that have keys that cannot be mapped to a distinct
// sql table.
table string
}

// Enforce db implements the walletdb.DB interface.
var _ walletdb.DB = (*db)(nil)

// newPostgresBackend returns a db object initialized with the passed backend
// config. If postgres connection cannot be estabished, then returns error.
func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
*db, error) {

if prefix == "" {
return nil, errors.New("empty postgres prefix")
}

dbConn, err := sql.Open("pgx", config.Dsn)
if err != nil {
return nil, err
}

// Compose system table names.
table := fmt.Sprintf(
"%s_%s", prefix, kvTableName,
)

// Execute the create statements to set up a kv table in postgres. Every
// row points to the bucket that it is one via its parent_id field. A
// NULL parent_id means that the key belongs to the upper-most bucket in
// this table. A constraint on parent_id is enforcing referential
// integrity.
//
// Furthermore there is a <table>_p index on parent_id that is required
// for the foreign key constraint.
//
// Finally there are unique indices on (parent_id, key) to prevent the
// same key being present in a bucket more than once (<table>_up and
// <table>_unp). In postgres, a single index wouldn't enforce the unique
// constraint on rows with a NULL parent_id. Therefore two indices are
// defined.
_, err = dbConn.ExecContext(ctx, `
CREATE SCHEMA IF NOT EXISTS public;
CREATE TABLE IF NOT EXISTS public.`+table+`
(
key bytea NOT NULL,
value bytea,
parent_id bigint,
id bigserial PRIMARY KEY,
sequence bigint,
CONSTRAINT `+table+`_parent FOREIGN KEY (parent_id)
REFERENCES public.`+table+` (id)
ON UPDATE NO ACTION
ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS `+table+`_p
ON public.`+table+` (parent_id);
CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_up
ON public.`+table+`
(parent_id, key) WHERE parent_id IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_unp
ON public.`+table+` (key) WHERE parent_id IS NULL;
`)
if err != nil {
_ = dbConn.Close()

return nil, err
}

backend := &db{
cfg: config,
prefix: prefix,
ctx: ctx,
db: dbConn,
table: table,
}

return backend, nil
}

// getTimeoutCtx gets a timeout context for database requests.
func (db *db) getTimeoutCtx() (context.Context, func()) {
if db.cfg.Timeout == time.Duration(0) {
return db.ctx, func() {}
}

return context.WithTimeout(db.ctx, db.cfg.Timeout)
}

// getPrefixedTableName returns a table name for this prefix (namespace).
func (db *db) getPrefixedTableName(table string) string {
return fmt.Sprintf("%s_%s", db.prefix, table)
}

// catchPanic executes the specified function. If a panic occurs, it is returned
// as an error value.
func catchPanic(f func() error) (err error) {
defer func() {
if r := recover(); r != nil {
err = r.(error)
log.Criticalf("Caught unhandled error: %v", err)
}
}()

err = f()

return
}

// View opens a database read transaction and executes the function f with the
// transaction passed as a parameter. After f exits, the transaction is rolled
// back. If f errors, its error is returned, not a rollback error (if any
// occur). The passed reset function is called before the start of the
// transaction and can be used to reset intermediate state. As callers may
// expect retries of the f closure (depending on the database backend used), the
// reset function will be called before each retry respectively.
func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error {
return db.executeTransaction(
func(tx walletdb.ReadWriteTx) error {
return f(tx.(walletdb.ReadTx))
},
reset, true,
)
}

// Update opens a database read/write transaction and executes the function f
// with the transaction passed as a parameter. After f exits, if f did not
// error, the transaction is committed. Otherwise, if f did error, the
// transaction is rolled back. If the rollback fails, the original error
// returned by f is still returned. If the commit fails, the commit error is
// returned. As callers may expect retries of the f closure, the reset function
// will be called before each retry respectively.
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) (err error) {
return db.executeTransaction(f, reset, false)
}

// executeTransaction creates a new read-only or read-write transaction and
// executes the given function within it.
func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error,
reset func(), readOnly bool) error {

reset()

tx, err := newReadWriteTx(db, readOnly)
if err != nil {
return err
}

err = catchPanic(func() error { return f(tx) })
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
log.Errorf("Error rolling back tx: %v", rollbackErr)
}

return err
}

return tx.Commit()
}

// PrintStats returns all collected stats pretty printed into a string.
func (db *db) PrintStats() string {
return "stats not supported by Postgres driver"
}

// BeginReadWriteTx opens a database read+write transaction.
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
return newReadWriteTx(db, false)
}

// BeginReadTx opens a database read transaction.
func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
return newReadWriteTx(db, true)
}

// Copy writes a copy of the database to the provided writer. This call will
// start a read-only transaction to perform all operations.
// This function is part of the walletdb.Db interface implementation.
func (db *db) Copy(w io.Writer) error {
return errors.New("not implemented")
}

// Close cleanly shuts down the database and syncs all data.
// This function is part of the walletdb.Db interface implementation.
func (db *db) Close() error {
return db.db.Close()
}
56 changes: 56 additions & 0 deletions kvdb/postgres/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// +build kvdb_postgres

package postgres

import (
"testing"
"time"

"github.com/btcsuite/btcwallet/walletdb"
"github.com/btcsuite/btcwallet/walletdb/walletdbtest"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

// TestInterface performs all interfaces tests for this database driver.
func TestInterface(t *testing.T) {
f := NewFixture(t)
defer f.Cleanup()

// dbType is the database type name for this driver.
const dbType = "postgres"

ctx := context.Background()
cfg := &Config{
Dsn: testDsn,
}

walletdbtest.TestInterface(t, dbType, ctx, cfg, prefix)
}

// TestPanic tests recovery from panic conditions.
func TestPanic(t *testing.T) {
f := NewFixture(t)
defer f.Cleanup()

d := f.NewBackend()

err := d.(*db).Update(func(tx walletdb.ReadWriteTx) error {
bucket, err := tx.CreateTopLevelBucket([]byte("test"))
require.NoError(t, err)

// Stop database server.
f.Cleanup()

// Keep trying to get data until Get panics because the
// connection is lost.
for i := 0; i < 50; i++ {
bucket.Get([]byte("key"))
time.Sleep(100 * time.Millisecond)
}

return nil
}, func() {})

require.Contains(t, err.Error(), "terminating connection")
}
Loading

0 comments on commit 3eb80ca

Please sign in to comment.