|
// This is an application that pulls messages from a message queue, applies a transformation, and |
|
// then stores the resulting data in a datastore. |
|
|
|
package main |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"hash/fnv" |
|
"strconv" |
|
"time" |
|
|
|
"go-etl-review/backend" |
|
) |
|
|
|
func main() { |
|
ctx, cancel := context.WithTimeout(context.Background(), time.Hour) |
|
defer cancel() |
|
|
|
for { |
|
mqClient := GetNewMQClient("my-queue") |
|
dbClient := GetNewDBClient("my-db") |
|
|
|
err := mqClient.Receive( |
|
ctx, func(ctx context.Context, msg *backend.Message) { |
|
fmt.Printf("Received message: %q\n", msg.Data) |
|
|
|
// Apply transformation to the message data |
|
transformedMsg, err := messageFormatter(msg) |
|
if err != nil { |
|
fmt.Printf("Error formatting message content: %v\n", err.Error()) |
|
msg.Reject() |
|
return |
|
} |
|
|
|
// Create a row to insert into the database |
|
row := &backend.Row{Content: string(transformedMsg.Data)} |
|
|
|
// Use the database client to insert the row into the table |
|
if err = dbClient.Insert(ctx, "my-table", row); err != nil { |
|
println(fmt.Sprintf("Failed to insert data to database: %v", err)) |
|
} |
|
|
|
// Acknowledge the message |
|
msg.Acknowledge() |
|
}, |
|
) |
|
|
|
if err != nil { |
|
panic("Failed to receive messages") |
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
func GetNewMQClient(queue string) backend.MQClient { |
|
ctx := context.Background() |
|
// Initialize a new MQ client |
|
mqClient, err := backend.NewMQClient(ctx, queue) |
|
if err != nil { |
|
fmt.Printf("Failed to create message queue client: %v", err) |
|
return nil |
|
} |
|
return mqClient |
|
} |
|
|
|
func GetNewDBClient(table string) backend.DBClient { |
|
ctx := context.Background() |
|
// Initialize a new database client |
|
dbClient, err := backend.NewDBClient(ctx, table) |
|
if err != nil { |
|
fmt.Printf("Failed to create database client: %v", err) |
|
return nil |
|
} |
|
return dbClient |
|
} |
|
|
|
func messageFormatter(msg *backend.Message) (*backend.Message, error) { |
|
h := fnv.New32a() |
|
_, _ = h.Write(msg.Data) |
|
hash := h.Sum32() |
|
newData := backend.Message{ |
|
ID: strconv.Itoa(int(hash)), |
|
} |
|
newData.Data = []byte(string(msg.Data) + " - This is the transformed data") |
|
return &newData, nil |
|
} |