Skip to content

Commit

Permalink
Adding support for batch queries
Browse files Browse the repository at this point in the history
Adds support for batched queries, fixes issues with cassandra + ql
drivers.
  • Loading branch information
Kenneth Shaw committed Jun 11, 2018
1 parent a483acd commit 1ecf095
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 60 deletions.
8 changes: 5 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/usql
/usql.exe

/build
/vendor
/vendor.orig
/build/
/vendor/
/vendor.orig/
/db/

.usql_history*
.[a-f0-9]*

*.ini
*.csv
Expand Down
3 changes: 3 additions & 0 deletions drivers/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,8 @@ func init() {
}
return string(buf), nil
},
BatchQueryPrefixes: map[string]string{
"BEGIN BATCH": "APPLY BATCH",
},
})
}
38 changes: 35 additions & 3 deletions drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,20 @@ type Driver struct {
// to a string if defined.
ConvertMap func(map[string]interface{}) (string, error)

// ConvertSlice will be used by ConvertSlice to convert a []interface{} to
// a string if defined.
ConvertSlice func([]interface{}) (string, error)

// ConvertDefault will be used by ConvertDefault to convert a interface{}
// to a string if defined.
ConvertDefault func(interface{}) (string, error)

// ConvertSlice will be used by ConvertSlice to convert a []interface{} to
// a string if defined.
ConvertSlice func([]interface{}) (string, error)
// BatchAsTransaction will cause batched queries to be done in a
// transaction block.
BatchAsTransaction bool

// BatchQueryPrefixes will be used by BatchQueryPrefixes if defined.
BatchQueryPrefixes map[string]string
}

// drivers is the map of drivers funcs.
Expand Down Expand Up @@ -367,6 +374,31 @@ func ConvertDefault(u *dburl.URL) func(interface{}) (string, error) {
}
}

// BatchAsTransaction returns whether or not the the specified URL's driver requires
// batched queries to be done within a transaction block.
func BatchAsTransaction(u *dburl.URL) bool {
if d, ok := drivers[u.Driver]; ok {
return d.BatchAsTransaction
}
return false
}

// IsBatchQueryPrefix returns whether or not the supplied query prefix is a
// batch query prefix, and the closing prefix. Used to direct the handler to
// continue accumulating statements.
func IsBatchQueryPrefix(u *dburl.URL, prefix string) (string, string, bool) {
// normalize
typ, q := QueryExecType(prefix, "")

d, ok := drivers[u.Driver]
if q || !ok || d.BatchQueryPrefixes == nil {
return typ, "", false
}

end, ok := d.BatchQueryPrefixes[typ]
return typ, end, ok
}

// RowsAffected returns the rows affected for the SQL result for a specified
// URL's driver.
func RowsAffected(u *dburl.URL, res sql.Result) (int64, error) {
Expand Down
9 changes: 9 additions & 0 deletions drivers/ql/ql.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ql

import (
"database/sql"

// DRIVER: ql
_ "github.com/cznic/ql/driver"

Expand All @@ -11,5 +13,12 @@ func init() {
drivers.Register("ql", drivers.Driver{
AllowMultilineComments: true,
AllowCComments: true,
BatchQueryPrefixes: map[string]string{
"BEGIN TRANSACTION": "COMMIT",
},
BatchAsTransaction: true,
RowsAffected: func(res sql.Result) (int64, error) {
return 0, nil
},
})
}
17 changes: 9 additions & 8 deletions drivers/qtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ var queryMap = map[string]bool{
// documentation for any new queries introduced by PostgreSQL need to be
// manually scrutinized for variations.
var execMap = map[string]bool{
"USE": true, // use a schema / keyspace (mysql, cassandra)
// cassandra
"ALTER KEYSPACE": true, // alter a keyspace
"CREATE KEYSPACE": true, // create a keyspace
"DROP KEYSPACE": true, // drop a keyspace
"BEGIN BATCH": true, // begin batch
"APPLY BATCH": true, // apply batch

"ALTER": true, // alter catch-all
"CREATE": true, // create catch-all
"DROP": true, // drop catch-all

"ALTER KEYSPACE": true, // alter a keyspace (cassandra)
"CREATE KEYSPACE": true, // create a keyspace (cassandra)
"DROP KEYSPACE": true, // drop a keyspace (cassandra)
// ql
"BEGIN TRANSACTION": true, // begin batch

// postgresql
"ABORT": true, // abort the current transaction
"ALTER AGGREGATE": true, // change the definition of an aggregate function
"ALTER COLLATION": true, // change the definition of a collation
Expand Down
83 changes: 72 additions & 11 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Handler struct {
lastPrefix string
lastRaw string

// batch
batch bool
batchEnd string

// connection
u *dburl.URL
db *sql.DB
Expand Down Expand Up @@ -269,15 +273,56 @@ func (h *Handler) Run() error {

// execute buf
if execute || h.buf.Ready() || res.Exec != metacmd.ExecNone {
// intercept batch query
typ, end, batch := drivers.IsBatchQueryPrefix(h.u, h.buf.Prefix)
switch {
case h.batch && batch:
fmt.Fprintf(stderr, "error: cannot perform %s in existing batch", typ)
fmt.Fprintln(stderr)
continue

// cannot use \g* while accumulating statements for batch queries
case h.batch && typ != h.batchEnd && res.Exec != metacmd.ExecNone:
fmt.Fprintf(stderr, "error: cannot force batch execution", typ)
fmt.Fprintln(stderr)
continue

case batch:
h.batch, h.batchEnd = true, end

case h.batch:
var lend string
if len(h.last) != 0 {
lend = "\n"
}

// append to last
h.last += lend + h.buf.String()
h.lastPrefix = h.buf.Prefix
h.lastRaw += lend + h.buf.RawString()
h.buf.Reset(nil)

// break
if h.batchEnd != typ {
continue
}

h.lastPrefix = h.batchEnd
h.batch, h.batchEnd = false, ""
}

if h.buf.Len != 0 {
h.last, h.lastPrefix, h.lastRaw = h.buf.String(), h.buf.Prefix, h.buf.RawString()
h.buf.Reset(nil)
}

// log.Printf(">> PROCESS EXECUTE: (%s) `%s`", h.lastPrefix, h.last)
if h.last != "" && h.last != ";" {
err = h.Execute(stdout, res, h.lastPrefix, h.last)
if err != nil {
if !h.batch && h.last != "" && h.last != ";" {
// force a transaction for batched queries for certain drivers
_, _, forceBatch := drivers.IsBatchQueryPrefix(h.u, stmt.FindPrefix(h.last))

// execute
if err = h.Execute(stdout, res, h.lastPrefix, h.last, forceBatch); err != nil {
fmt.Fprintf(stderr, "error: %v", err)
fmt.Fprintln(stderr)
}
Expand All @@ -287,7 +332,7 @@ func (h *Handler) Run() error {
}

// Execute executes a query against the connected database.
func (h *Handler) Execute(w io.Writer, res metacmd.Res, prefix, qstr string) error {
func (h *Handler) Execute(w io.Writer, res metacmd.Res, prefix, qstr string, forceTrans bool) error {
if h.db == nil {
return text.ErrNotConnected
}
Expand All @@ -298,6 +343,13 @@ func (h *Handler) Execute(w io.Writer, res metacmd.Res, prefix, qstr string) err
return drivers.WrapErr(h.u.Driver, err)
}

// start a transaction if forced
if forceTrans {
if err = h.Begin(); err != nil {
return err
}
}

f := h.execOnly
switch res.Exec {
case metacmd.ExecSet:
Expand All @@ -306,7 +358,18 @@ func (h *Handler) Execute(w io.Writer, res metacmd.Res, prefix, qstr string) err
f = h.execExec
}

return drivers.WrapErr(h.u.Driver, f(w, prefix, qstr, qtyp, res.ExecParam))
if err = drivers.WrapErr(h.u.Driver, f(w, prefix, qstr, qtyp, res.ExecParam)); err != nil {
if forceTrans {
h.Reset(nil)
}
return err
}

if forceTrans {
return h.Commit()
}

return nil
}

// CommandRunner executes a set of commands.
Expand All @@ -315,8 +378,7 @@ func (h *Handler) CommandRunner(cmds []string) func() error {
return func() error {
for _, cmd := range cmds {
h.Reset([]rune(cmd))
err := h.Run()
if err != nil && err != io.EOF {
if err := h.Run(); err != nil && err != io.EOF {
return err
}
}
Expand All @@ -327,7 +389,7 @@ func (h *Handler) CommandRunner(cmds []string) func() error {
// Reset resets the handler's query statement buffer.
func (h *Handler) Reset(r []rune) {
h.buf.Reset(r)
h.last, h.lastPrefix, h.lastRaw = "", "", ""
h.last, h.lastPrefix, h.lastRaw, h.batch, h.batchEnd = "", "", "", false, ""
}

// Prompt creates the prompt text.
Expand All @@ -342,7 +404,7 @@ func (h *Handler) Prompt() string {
}

tx := ">"
if h.tx != nil {
if h.tx != nil || h.batch {
tx = "~"
}

Expand Down Expand Up @@ -835,8 +897,7 @@ func (h *Handler) execRows(w io.Writer, q *sql.Rows) error {

// execute
for _, qstr := range row {
err = h.Execute(w, res, stmt.FindPrefix(qstr, 4), qstr)
if err != nil {
if err = h.Execute(w, res, stmt.FindPrefix(qstr), qstr, false); err != nil {
return err
}
}
Expand Down
32 changes: 13 additions & 19 deletions metacmd/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,27 +368,21 @@ func init() {
Section: SectionTransaction,
Name: "begin",
Desc: "begin a transaction",
Process: func(p *Params) error {
return p.H.Begin()
},
},

Commit: {
Section: SectionTransaction,
Name: "commit",
Desc: "commit current transaction",
Process: func(p *Params) error {
return p.H.Commit()
Aliases: map[string]string{
"commit": "commit current transaction",
"rollback": "rollback (abort) current transaction",
},
},

Rollback: {
Section: SectionTransaction,
Name: "rollback",
Desc: "rollback (abort) current transaction",
Aliases: map[string]string{"abort": ""},
Process: func(p *Params) error {
return p.H.Rollback()
var f func() error
switch p.N {
case "begin":
f = p.H.Begin
case "commit":
f = p.H.Commit
case "rollback":
f = p.H.Rollback
}
return f()
},
},

Expand Down
14 changes: 4 additions & 10 deletions metacmd/metacmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// Copyright is the copyright meta command (\copyright).
Copyright

// Connect is the connect meta command (\c).
// Connect is the connect meta command (\c, \connect).
Connect

// Disconnect is the disconnect meta command (\Z).
Expand All @@ -64,10 +64,10 @@ const (
// Edit is the edit query buffer meta command (\e).
Edit

// Print is the print query buffer meta command (\p).
// Print is the print query buffer meta command (\p, \print, \raw).
Print

// Reset is the reset query buffer meta command (\r).
// Reset is the reset query buffer meta command (\r, \reset).
Reset

// Echo is the echo meta command (\echo).
Expand All @@ -88,15 +88,9 @@ const (
// Include is the system include file meta command (\i and variants).
Include

// Begin is the transaction begin meta command (\begin).
// Transact is the transaction meta command (\begin, \commit, \rollback).
Begin

// Commit is the transaction commit meta command (\commit).
Commit

// Rollback is the transaction rollback (abort) meta command (\rollback).
Rollback

// Prompt is the variable prompt meta command (\prompt).
Prompt

Expand Down
Loading

0 comments on commit 1ecf095

Please sign in to comment.