Skip to content

Commit

Permalink
fix: remove error on explain analyze select into clause
Browse files Browse the repository at this point in the history
Signed-off-by: ZelinMa557 <3388706467@qq.com>
  • Loading branch information
ZelinMa557 committed Aug 19, 2023
1 parent 6eecffc commit 99c1eec
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 0 deletions.
18 changes: 18 additions & 0 deletions open_src/influx/coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"runtime/debug"
"sort"
"strings"
Expand Down Expand Up @@ -817,12 +818,29 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement
panic("impl me")
}

type ExplainAnalyzeWriter struct {
}

func (w *ExplainAnalyzeWriter) RetryWritePointRows(database string, retentionPolicy string, points []influx.Row) error {
src := make([]byte, 0)
src, err := influx.FastMarshalMultiRows(src, points)
if err != nil {
return err
}
_, err = io.Discard.Write(src)
if err != nil {
return err
}
return nil
}

func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query2.ExecutionContext) (models.Rows, error) {
stmt := q.Statement
trace, span := tracing.NewTrace("SELECT")
stmt.OmitTime = true
ctx := tracing.NewContextWithTrace(ectx.Context, trace)
ctx = tracing.NewContextWithSpan(ctx, span)
ctx = context.WithValue(ctx, executor.WRITER_CONTEXT, &ExplainAnalyzeWriter{})
span.AppendNameValue("statement", q.String())
span.Finish()

Expand Down
26 changes: 26 additions & 0 deletions tests/server_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,32 @@ func init() {
},
}

tests["explain_analyze_select_into"] = Test{
writes: Writes{
&Write{db: "db0", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
},
queries: []*Query{
&Query{
name: "show measurements after write to db0",
params: url.Values{"db": []string{"db0"}},
command: `SHOW MEASUREMENTS`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}]}`,
once: true,
},
&Query{
name: "explain analyze select into",
command: `explain analyze select * into db1..cpu from db0..cpu group by *`,
once: true,
},
&Query{
name: "show measurements after explain analyze",
params: url.Values{"db": []string{"db1"}},
command: `SHOW MEASUREMENTS`,
exp: `{"results":[{"statement_id":0}]}`,
once: true,
},
},
}
}

func (tests Tests) load(t *testing.T, key string) Test {
Expand Down
28 changes: 28 additions & 0 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,34 @@ func TestServer_Query_DropDatabaseIsolated(t *testing.T) {
}
}

func TestServer_ExplainAnalyzeSelectInto(t *testing.T) {
t.Parallel()
c := NewConfig()
s := OpenServer(c)
if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}
if err := s.CreateDatabaseAndRetentionPolicy("db1", NewRetentionPolicySpec("rp1", 1, 0), true); err != nil {
t.Fatal(err)
}
test := tests.load(t, "explain_analyze_select_into")
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if query.skip {
t.Skipf("SKIP:: %s", query.name)
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if strings.Contains(query.name, "show measurements") && !query.success() {
t.Error(query.failureMessage())
}
})
}
}

// Ensure retention policy commands work.
func TestServer_RetentionPolicyCommands(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 99c1eec

Please sign in to comment.