Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #350

Merged
merged 2 commits into from
Jan 29, 2021
Merged

Dev #350

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ var jsonSchema string = `
* Parquet-go reads data as an object in Golang and every field must be a public field, which start with an upper letter. This field name we call it `InName`. Field name in parquet file we call it `ExName`. Function `common.HeadToUpper` converts `ExName` to `InName`. There are some restriction:
1. It's not allowed if two field names are only different at their first letter case. Such as `name` and `Name`.
2. `PARGO_PREFIX_` is a reserved string, which you'd better not use it as a name prefix. ([#294](https://github.com/xitongsys/parquet-go/issues/294))
3. Use `\x01` as the delimiter of fields to support `.` in some field name.([dot_in_name.go](https://github.com/xitongsys/parquet-go/blob/master/example/dot_in_name.go), [#349](https://github.com/xitongsys/parquet-go/issues/349))

## Concurrency

Expand Down Expand Up @@ -368,6 +369,7 @@ func NewCSVWriter(md []string, pfile ParquetFile.ParquetFile, np int64) (*CSVWri
|[type_alias.go](https://github.com/xitongsys/parquet-go/blob/master/example/type_alias.go)|example for type alias|
|[writer.go](https://github.com/xitongsys/parquet-go/blob/master/example/writer.go)|create ParquetWriter from io.Writer|
|[keyvalue_metadata.go](https://github.com/xitongsys/parquet-go/blob/master/example/keyvalue_metadata.go)|write keyvalue metadata|
|[dot_in_name.go](https://github.com/xitongsys/parquet-go/blob/master/example/dot_in_name.go)|`.` in filed name|



Expand Down
13 changes: 10 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,24 @@ func SizeOf(val reflect.Value) int64 {
return 4
}

const PAR_GO_PATH_DELIMITER = "\x01"

// . -> \x01
func ReformPathStr(pathStr string) string {
return strings.ReplaceAll(pathStr, ".", "\x01")
}

//Convert path slice to string
func PathToStr(path []string) string {
return strings.Join(path, ".")
return strings.Join(path, PAR_GO_PATH_DELIMITER)
}

//Convert string to path slice
func StrToPath(str string) []string {
return strings.Split(str, ".")
return strings.Split(str, PAR_GO_PATH_DELIMITER)
}

//Get the pathStr index in a path
func PathStrIndex(str string) int {
return len(strings.Split(str, "."))
return len(strings.Split(str, PAR_GO_PATH_DELIMITER))
}
11 changes: 6 additions & 5 deletions example/column_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
)
Expand Down Expand Up @@ -72,15 +73,15 @@ func main() {
}
num = int64(pr.GetNumRows())

pr.SkipRowsByPath("parquet_go_root.name", 5) //skip the first five rows
names, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.name", num)
pr.SkipRowsByPath(common.ReformPathStr("parquet_go_root.name"), 5) //skip the first five rows
names, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.name"), num)
log.Println("name", names, rls, dls, err)

classes, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.class.list.element", num)
classes, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.class.list.element"), num)
log.Println("class", classes, rls, dls, err)

scores_key, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.key", num)
scores_value, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.value", num)
scores_key, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.score.key_value.key"), num)
scores_value, rls, dls, err = pr.ReadColumnByPath(common.ReformPathStr("parquet_go_root.score.key_value.value"), num)
log.Println("parquet_go_root.scores_key", scores_key, err)
log.Println("parquet_go_root.scores_value", scores_value, err)

Expand Down
103 changes: 103 additions & 0 deletions example/dot_in_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package main

import (
"log"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
)

type A struct {
V1 int32 `parquet:"name=b.c, type=INT32, encoding=PLAIN"`
V2 B `parquet:"name=b"`
V3 int32 `parquet:"name=c, type=INT32, encoding=PLAIN"`
}

type B struct {
C int32 `parquet:"name=c, type=INT32, encoding=PLAIN"`
}

func main() {
var err error
fw, err := local.NewLocalFileWriter("a.parquet")
if err != nil {
log.Println("Can't create local file", err)
return
}

//write
pw, err := writer.NewParquetWriter(fw, new(A), 4)
if err != nil {
log.Println("Can't create parquet writer", err)
return
}

pw.RowGroupSize = 128 * 1024 * 1024 //128M
pw.PageSize = 8 * 1024 //8K
pw.CompressionType = parquet.CompressionCodec_SNAPPY
num := 10
for i := 0; i < num; i++ {
o := A{
V1: 1,
V2: B{
C: 2,
},
V3: 3,
}
if err = pw.Write(o); err != nil {
log.Println("Write error", err)
}
}
if err = pw.WriteStop(); err != nil {
log.Println("WriteStop error", err)
return
}
log.Println("Write Finished")
fw.Close()

///read all
fr, err := local.NewLocalFileReader("a.parquet")
if err != nil {
log.Println("Can't open file")
return
}

pr, err := reader.NewParquetReader(fr, new(A), 4)
if err != nil {
log.Println("Can't create parquet reader", err)
return
}
num = int(pr.GetNumRows())
os := make([]A, num)

if err = pr.Read(&os); err != nil {
log.Println("Read error", err)
}
log.Println(os)

pr.ReadStop()
fr.Close()

///read column by path
fr, err = local.NewLocalFileReader("a.parquet")
if err != nil {
log.Println("Can't open file")
return
}

pr, err = reader.NewParquetReader(fr, new(A), 4)
if err != nil {
log.Println("Can't create parquet reader", err)
return
}
cn := pr.GetNumRows()
v1, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01b.c",cn)
v2, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01b\x01c",cn)
v3, _, _, _ := pr.ReadColumnByPath("parquet_go_root\x01c", cn)
log.Println(v1,v2,v3)

pr.ReadStop()
fr.Close()
}
5 changes: 3 additions & 2 deletions example/read_partial2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"time"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
"github.com/xitongsys/parquet-go/parquet"
)

type Student struct {
Expand Down Expand Up @@ -80,7 +81,7 @@ func main() {
num = int(pr.GetNumRows())
//only read scores
scores := make([]map[string]int32, num)
pr.ReadPartial(&scores, "parquet_go_root.scores")
pr.ReadPartial(&scores, common.ReformPathStr("parquet_go_root.scores"))
log.Println(scores)

pr.ReadStop()
Expand Down
5 changes: 3 additions & 2 deletions example/read_partial_without_schema_predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"time"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/writer"
"github.com/xitongsys/parquet-go/parquet"
)

type Student struct {
Expand Down Expand Up @@ -79,7 +80,7 @@ func main() {

num = int(pr.GetNumRows())
//only read scores
res, err := pr.ReadPartialByNumber(num, "parquet_go_root.scores")
res, err := pr.ReadPartialByNumber(num, common.ReformPathStr("parquet_go_root.scores"))
if err != nil {
log.Println("Can't read", err)
return
Expand Down
7 changes: 3 additions & 4 deletions layout/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"math/bits"
"strings"

"github.com/apache/thrift/lib/go/thrift"
"github.com/xitongsys/parquet-go/common"
Expand Down Expand Up @@ -510,7 +509,7 @@ func (self *Page) GetRLDLFromRawData(schemaHandler *schema.SchemaHandler) (int64

table := new(Table)
table.Path = self.Path
name := strings.Join(self.Path, ".")
name := common.PathToStr(self.Path)
table.RepetitionType = schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].GetRepetitionType()
table.MaxRepetitionLevel = maxRepetitionLevel
table.MaxDefinitionLevel = maxDefinitionLevel
Expand Down Expand Up @@ -576,7 +575,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error
numNulls++
}
}
name := strings.Join(self.DataTable.Path, ".")
name := common.PathToStr(self.DataTable.Path)
var values []interface{}
var ct parquet.ConvertedType = -1
if schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].IsSetConvertedType() {
Expand Down Expand Up @@ -768,7 +767,7 @@ func ReadPage(thriftReader *thrift.TBufferedTransport, schemaHandler *schema.Sch
path := make([]string, 0)
path = append(path, schemaHandler.GetRootInName())
path = append(path, colMetaData.GetPathInSchema()...)
name := strings.Join(path, ".")
name := common.PathToStr(path)

if pageHeader.GetType() == parquet.PageType_DICTIONARY_PAGE {
page = NewDictPage()
Expand Down
2 changes: 1 addition & 1 deletion marshal/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func MarshalCSV(records []interface{}, schemaHandler *schema.SchemaHandler) (*ma
}

for i := 0; i < len(records[0].([]interface{})); i++ {
pathStr := schemaHandler.GetRootInName() + "." + schemaHandler.Infos[i+1].InName
pathStr := schemaHandler.GetRootInName() + common.PAR_GO_PATH_DELIMITER + schemaHandler.Infos[i+1].InName
table := layout.NewEmptyTable()
res[pathStr] = table
table.Path = common.StrToPath(pathStr)
Expand Down
12 changes: 6 additions & 6 deletions marshal/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
keys := node.Val.MapKeys()

if schema.GetConvertedType() == parquet.ConvertedType_MAP { //real map
pathStr = pathStr + ".Key_value"
pathStr = pathStr + common.PAR_GO_PATH_DELIMITER + "Key_value"
if len(keys) <= 0 {
for key, table := range res {
if strings.HasPrefix(key, node.PathMap.Path) &&
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down Expand Up @@ -160,7 +160,7 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
newPathStr := node.PathMap.Children[key].Path
for path, table := range res {
if strings.HasPrefix(path, newPathStr) &&
(len(path) == len(newPathStr) || path[len(newPathStr)] == '.') {
(len(path) == len(newPathStr) || path[len(newPathStr)] == common.PAR_GO_PATH_DELIMITER[0]) {

table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
Expand All @@ -175,11 +175,11 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
ln := node.Val.Len()

if schema.GetConvertedType() == parquet.ConvertedType_LIST { // real LIST
pathStr = pathStr + ".List" + ".Element"
pathStr = pathStr + common.PAR_GO_PATH_DELIMITER + "List" + common.PAR_GO_PATH_DELIMITER + "Element"
if ln <= 0 {
for key, table := range res {
if strings.HasPrefix(key, node.PathMap.Path) &&
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down Expand Up @@ -213,7 +213,7 @@ func MarshalJSON(ss []interface{}, schemaHandler *schema.SchemaHandler) (tb *map
if ln <= 0 {
for key, table := range res {
if strings.HasPrefix(key, node.PathMap.Path) &&
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == '.'){
(len(key) == len(node.PathMap.Path) || key[len(node.PathMap.Path)] == common.PAR_GO_PATH_DELIMITER[0]){
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down
6 changes: 3 additions & 3 deletions marshal/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (p *ParquetSlice) Marshal(node *Node, nodeBuf *NodeBufType) []*Node {
path := node.PathMap.Path
if *p.schemaHandler.SchemaElements[p.schemaHandler.MapIndex[node.PathMap.Path]].RepetitionType != parquet.FieldRepetitionType_REPEATED {
pathMap = pathMap.Children["List"].Children["Element"]
path += ".List" + ".Element"
path = path + common.PAR_GO_PATH_DELIMITER + "List" + common.PAR_GO_PATH_DELIMITER + "Element"
}
if ln <= 0 {
return nodes
Expand Down Expand Up @@ -158,7 +158,7 @@ type ParquetMap struct {

func (p *ParquetMap) Marshal(node *Node, nodeBuf *NodeBufType) []*Node {
nodes := make([]*Node, 0)
path := node.PathMap.Path + ".Key_value"
path := node.PathMap.Path + common.PAR_GO_PATH_DELIMITER + "Key_value"
keys := node.Val.MapKeys()
if len(keys) <= 0 {
return nodes
Expand Down Expand Up @@ -281,7 +281,7 @@ func Marshal(srcInterface []interface{}, schemaHandler *schema.SchemaHandler) (t
if numChildren > int32(0) {
for key, table := range res {
if strings.HasPrefix(key, path) &&
(len(key) == len(path) || key[len(path)] == '.') {
(len(key) == len(path) || key[len(path)] == common.PAR_GO_PATH_DELIMITER[0]) {
table.Values = append(table.Values, nil)
table.DefinitionLevels = append(table.DefinitionLevels, node.DL)
table.RepetitionLevels = append(table.RepetitionLevels, node.RL)
Expand Down
2 changes: 1 addition & 1 deletion schema/schemahandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (self *PathMapType) Add(path []string) {
}
c := path[1]
if _, ok := self.Children[c]; !ok {
self.Children[c] = NewPathMap(self.Path + "." + c)
self.Children[c] = NewPathMap(self.Path + common.PAR_GO_PATH_DELIMITER + c)
}
self.Children[c].Add(path[1:])
}
Expand Down