From 99c1eecc45cde07d237c271413dab6f3a8ddcdd1 Mon Sep 17 00:00:00 2001 From: ZelinMa557 <3388706467@qq.com> Date: Thu, 22 Jun 2023 18:23:20 +0800 Subject: [PATCH] fix: remove error on explain analyze select into clause Signed-off-by: ZelinMa557 <3388706467@qq.com> --- .../influx/coordinator/statement_executor.go | 18 ++++++++++++ tests/server_suite.go | 26 +++++++++++++++++ tests/server_test.go | 28 +++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/open_src/influx/coordinator/statement_executor.go b/open_src/influx/coordinator/statement_executor.go index 2615e072c..950a8f0a3 100644 --- a/open_src/influx/coordinator/statement_executor.go +++ b/open_src/influx/coordinator/statement_executor.go @@ -23,6 +23,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "runtime/debug" "sort" "strings" @@ -817,12 +818,29 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement panic("impl me") } +type ExplainAnalyzeWriter struct { +} + +func (w *ExplainAnalyzeWriter) RetryWritePointRows(database string, retentionPolicy string, points []influx.Row) error { + src := make([]byte, 0) + src, err := influx.FastMarshalMultiRows(src, points) + if err != nil { + return err + } + _, err = io.Discard.Write(src) + if err != nil { + return err + } + return nil +} + func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query2.ExecutionContext) (models.Rows, error) { stmt := q.Statement trace, span := tracing.NewTrace("SELECT") stmt.OmitTime = true ctx := tracing.NewContextWithTrace(ectx.Context, trace) ctx = tracing.NewContextWithSpan(ctx, span) + ctx = context.WithValue(ctx, executor.WRITER_CONTEXT, &ExplainAnalyzeWriter{}) span.AppendNameValue("statement", q.String()) span.Finish() diff --git a/tests/server_suite.go b/tests/server_suite.go index 4954f3375..9e17ccd81 100644 --- a/tests/server_suite.go +++ b/tests/server_suite.go @@ -1131,6 +1131,32 @@ func init() { }, } + tests["explain_analyze_select_into"] = Test{ + writes: Writes{ + &Write{db: "db0", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())}, + }, + queries: []*Query{ + &Query{ + name: "show measurements after write to db0", + params: url.Values{"db": []string{"db0"}}, + command: `SHOW MEASUREMENTS`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}]}`, + once: true, + }, + &Query{ + name: "explain analyze select into", + command: `explain analyze select * into db1..cpu from db0..cpu group by *`, + once: true, + }, + &Query{ + name: "show measurements after explain analyze", + params: url.Values{"db": []string{"db1"}}, + command: `SHOW MEASUREMENTS`, + exp: `{"results":[{"statement_id":0}]}`, + once: true, + }, + }, + } } func (tests Tests) load(t *testing.T, key string) Test { diff --git a/tests/server_test.go b/tests/server_test.go index 69900d4a3..6c8c3fe10 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -331,6 +331,34 @@ func TestServer_Query_DropDatabaseIsolated(t *testing.T) { } } +func TestServer_ExplainAnalyzeSelectInto(t *testing.T) { + t.Parallel() + c := NewConfig() + s := OpenServer(c) + if err := s.CreateDatabaseAndRetentionPolicy("db0", NewRetentionPolicySpec("rp0", 1, 0), true); err != nil { + t.Fatal(err) + } + if err := s.CreateDatabaseAndRetentionPolicy("db1", NewRetentionPolicySpec("rp1", 1, 0), true); err != nil { + t.Fatal(err) + } + test := tests.load(t, "explain_analyze_select_into") + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + for _, query := range test.queries { + t.Run(query.name, func(t *testing.T) { + if query.skip { + t.Skipf("SKIP:: %s", query.name) + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if strings.Contains(query.name, "show measurements") && !query.success() { + t.Error(query.failureMessage()) + } + }) + } +} + // Ensure retention policy commands work. func TestServer_RetentionPolicyCommands(t *testing.T) { t.Parallel()