Skip to content

Commit

Permalink
insert and query influxdb
Browse files Browse the repository at this point in the history
  • Loading branch information
toandac committed Apr 18, 2022
1 parent 0a40e53 commit 57f11cd
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 90 deletions.
116 changes: 59 additions & 57 deletions handle/record_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@ import (
"demo-server/models"
"demo-server/repository"
"encoding/json"
"errors"
"log"
"net/http"
"text/template"
"time"

"github.com/go-chi/chi"
"github.com/mssola/user_agent"
"github.com/zippoxer/bow"
)

type RecordHandle struct {
RecordRepo repository.RecordRepo
URL string
// RecordRepo repository.RecordRepo
EventRepo repository.EventRepo
URL string
}

func (rc *RecordHandle) SaveRecord(w http.ResponseWriter, r *http.Request) {
Expand All @@ -29,20 +28,19 @@ func (rc *RecordHandle) SaveRecord(w http.ResponseWriter, r *http.Request) {
}

var rec models.Record
if err := rc.RecordRepo.Get(req.ID, &rec); err != nil {
if !errors.Is(err, bow.ErrNotFound) {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
// if err := rc.RecordRepo.Get(req.ID, &rec); err != nil {
// if !errors.Is(err, bow.ErrNotFound) {
// log.Println(err)
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }
// }

ua := user_agent.New(r.UserAgent())

rec.ID = req.ID
rec.Events = append(rec.Events, req.Events...)
rec.User = req.User
rec.Meta = req.Meta
rec.UpdatedAt = time.Now()

browserName, browserVersion := ua.Browser()
Expand All @@ -53,22 +51,21 @@ func (rc *RecordHandle) SaveRecord(w http.ResponseWriter, r *http.Request) {
Version: browserVersion,
}

if err := rc.RecordRepo.Put(rec); err != nil {
// if err := rc.RecordRepo.Put(rec); err != nil {
// log.Println(err)
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }

err := rc.EventRepo.Insert(rec)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

func (rc *RecordHandle) RenderRecordScript(w http.ResponseWriter, r *http.Request) {
// tmplRecorder, err := template.New("recorder").Parse(rrwebRecord)
// if err != nil {
// log.Println(err)
// http.Error(w, err.Error(), http.StatusNotFound)
// return
// }
tmplRecorder := template.Must(template.ParseFiles("templates/record.js"))

err := tmplRecorder.Execute(w, struct{ URL string }{URL: rc.URL})
Expand All @@ -85,11 +82,11 @@ func (rc *RecordHandle) RenderRecordPlayer(w http.ResponseWriter, r *http.Reques

var record models.Record

if err := rc.RecordRepo.Get(id, &record); err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
// if err := rc.RecordRepo.Get(id, &record); err != nil {
// log.Println(err)
// http.Error(w, err.Error(), http.StatusNotFound)
// return
// }

tmplPlayerHTML := template.Must(template.ParseFiles("templates/session_by_id.html"))

Expand All @@ -108,42 +105,47 @@ func (rc *RecordHandle) RenderRecordPlayer(w http.ResponseWriter, r *http.Reques
w.WriteHeader(http.StatusOK)
}

func (rc *RecordHandle) RendersRecordsList(w http.ResponseWriter, r *http.Request) {
var record models.Record

records, err := rc.RecordRepo.Iter(record)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusNotFound)
return
}

tmplListHTML := template.Must(template.ParseFiles("templates/session_list.html"))

err = tmplListHTML.Execute(w, struct {
Records []models.Record
URL string
}{
Records: records,
URL: rc.URL,
})
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}
// func (rc *RecordHandle) RendersRecordsList(w http.ResponseWriter, r *http.Request) {
// var record models.Record

// records, err := rc.RecordRepo.Iter(record)
// if err != nil {
// log.Println(err)
// http.Error(w, err.Error(), http.StatusNotFound)
// return
// }

// tmplListHTML := template.Must(template.ParseFiles("templates/session_list.html"))

// err = tmplListHTML.Execute(w, struct {
// Records []models.Record
// URL string
// }{
// Records: records,
// URL: rc.URL,
// })
// if err != nil {
// log.Println(err)
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }

// w.WriteHeader(http.StatusOK)
// }

func (rc *RecordHandle) GetAllRecordByID(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
var record models.Record
// var record models.Record

if err := rc.RecordRepo.Get(id, &record); err != nil {
// if err := rc.RecordRepo.Get(id, &record); err != nil {
// log.Println(err)
// http.Error(w, err.Error(), http.StatusNotFound)
// return
// }

record, err := rc.EventRepo.Query(id)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusNotFound)
return
}

if err := json.NewEncoder(w).Encode(&record); err != nil {
Expand Down
36 changes: 18 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"demo-server/router"
"log"
"net/http"
"net/url"
"os"

"github.com/go-chi/chi"
Expand All @@ -17,26 +16,27 @@ import (
)

func run(c *cli.Context) error {
// influx := &database.InfluxDB{
// URL: "http://localhost:8086",
// Token: "162L5asvhPfFeE3hp4EWxT8Z6XXkNfzsh4XQ-R6XunRXrnYfJnd_AOlQ-dDyxcmC3OCQm829pbuWf_QNfgJOvA==",
// Bucket: "events_bucket",
// }
// influx.NewInfluxDB()
// defer influx.Close()

dbDSN, err := url.Parse(c.String("db-dsn"))
if err != nil {
log.Println(err)
influx := &database.InfluxDB{
URL: "http://localhost:8086",
Token: "162L5asvhPfFeE3hp4EWxT8Z6XXkNfzsh4XQ-R6XunRXrnYfJnd_AOlQ-dDyxcmC3OCQm829pbuWf_QNfgJOvA==",
Bucket: "records",
}
influx.NewInfluxDB()
defer influx.Close()

// dbDSN, err := url.Parse(c.String("db-dsn"))
// if err != nil {
// log.Println(err)
// }

badger := &database.BadgerDB{}
badger.NewBadgerDB(dbDSN)
defer badger.Close()
// badger := &database.BadgerDB{}
// badger.NewBadgerDB(dbDSN)
// defer badger.Close()

recordHandle := handle.RecordHandle{
RecordRepo: repoimpl.NewRecordRepo(badger),
URL: c.String("service-url"),
// RecordRepo: repoimpl.NewRecordRepo(badger),
EventRepo: repoimpl.NewEventRepo(influx),
URL: c.String("service-url"),
}

r := chi.NewRouter()
Expand All @@ -61,7 +61,7 @@ func main() {
Flags: []cli.Flag{
&cli.StringFlag{Name: "service-url", Value: "http://localhost:3000"},
&cli.StringFlag{Name: "address", Value: "127.0.0.1"},
&cli.StringFlag{Name: "db-dsn", Value: "badger:///tmp/badgerdb_1.3"},
// &cli.StringFlag{Name: "db-dsn", Value: "badger:///tmp/badgerdb_1.3"},
&cli.StringFlag{Name: "port", Value: "3000"},
},
Action: run,
Expand Down
14 changes: 7 additions & 7 deletions models/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ package models
import "time"

type Client struct {
ClientID string `json:"client_id"`
UserAgent string `json:"userAgent"`
OS string `json:"os"`
Browser string `json:"browser"`
Version string `json:"version"`
}

type Record struct {
ID string `json:"id" bow:"key"`
Events []Events `json:"events"`
Meta map[string]string `json:"meta"`
User User `json:"user"`
Client Client `json:"client"`
ClientID string `json:"client_id"`
UpdatedAt time.Time `json:"updatedAt"`
ID string `json:"id" bow:"key"`
Events []Events `json:"events"`
User User `json:"user"`
Client Client `json:"client"`
ClientID string `json:"client_id"`
UpdatedAt time.Time `json:"updatedAt"`
}

type Events struct {
Expand Down
8 changes: 8 additions & 0 deletions repository/event_repo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package repository

import "demo-server/models"

type EventRepo interface {
Insert(record models.Record) error
Query(id string) (models.Record, error)
}
110 changes: 110 additions & 0 deletions repository/repo_impl/event_repo_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package repoimpl

import (
"context"
"demo-server/database"
"demo-server/models"
"demo-server/repository"
"fmt"
"log"
"strings"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

type EventRepoImpl struct {
influx *database.InfluxDB
}

func NewEventRepo(influx *database.InfluxDB) repository.EventRepo {
return &EventRepoImpl{
influx: influx,
}
}

func (e *EventRepoImpl) Insert(record models.Record) error {
for _, elem := range record.Events {
p := influxdb2.NewPointWithMeasurement("test3").
AddTag("clientID", record.Client.ClientID).
AddTag("sessionID", record.ID).
AddTag("userID", record.User.ID).
AddTag("email", record.User.Email).
AddTag("name", record.User.Name).
AddTag("browser", record.Client.Browser).
AddTag("os", record.Client.OS).
AddTag("userAgent", record.Client.UserAgent).
AddTag("version", record.Client.Version).
AddField("data", elem.Data).
AddField("type", elem.Type).
AddField("timestamp", elem.Timestamp).
SetTime(record.UpdatedAt)

writeAPI := e.influx.Client.WriteAPIBlocking("tanda_organization", "records")
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
log.Println("Influxdb fails insert: ", err)
return err
}
}

return nil
}

func (e *EventRepoImpl) Query(id string) (models.Record, error) {
var record models.Record
var event models.Events
// var events []string
queryAPI := e.influx.Client.QueryAPI("tanda_organization")

query := `from(bucket: "records")
|> range(start: -1d)
|> filter(fn: (r) => r["_measurement"] == "test3")`

// get QueryTableResult
result, err := queryAPI.Query(context.Background(), query)
if err == nil {
// Iterate over query response
for result.Next() {
// Access data

values := result.Record().Values()
record.ID = values["sessionID"].(string)
// record.ClientID = values["clientID"].(string)

record.Client.OS = values["os"].(string)
record.Client.UserAgent = values["userAgent"].(string)
record.Client.Version = values["version"].(string)

// record.User.ID = values["userID"].(string)
record.User.Email = values["email"].(string)
record.User.Name = values["name"].(string)

record.Client.Browser = values["browser"].(string)
record.Client.Browser = values["browser"].(string)

if strings.Contains(result.Record().Field(), "type") {
event.Type = result.Record().Value().(int64)
}

if strings.Contains(result.Record().Field(), "data") {
event.Data = result.Record().Value()
}

if strings.Contains(result.Record().Field(), "timestamp") {
event.Timestamp = result.Record().Value().(int64)
}

record.Events = append(record.Events, event)

}
// check for an error
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}

} else {
log.Println(err)
}

return record, nil
}
2 changes: 1 addition & 1 deletion router/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type API struct {
}

func (api *API) SetupRouter() {
api.Chi.Get("/", api.RecordHandle.RendersRecordsList)
// api.Chi.Get("/", api.RecordHandle.RendersRecordsList)
api.Chi.Post("/sessions", api.RecordHandle.SaveRecord)
api.Chi.Get("/sessions/{id}", api.RecordHandle.RenderRecordPlayer)
api.Chi.Get("/api/v1/sessions/{id}", api.RecordHandle.GetAllRecordByID)
Expand Down
Loading

0 comments on commit 57f11cd

Please sign in to comment.