Skip to content

Commit

Permalink
fix type
Browse files Browse the repository at this point in the history
  • Loading branch information
xitongsys committed Dec 19, 2017
1 parent 744b96e commit aee4ee1
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 36 deletions.
6 changes: 4 additions & 2 deletions Marshal/Unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func Unmarshal(tableMap *map[string]*Layout.Table, bgn int, end int, dstInterfac
for name, table := range *tableMap {
path := table.Path
end := tableEnd[name]
schemaIndex := schemaHandler.MapIndex[Common.PathToStr(path)]
pT, cT := schemaHandler.SchemaElements[schemaIndex].Type, schemaHandler.SchemaElements[schemaIndex].ConvertedType

if tableIndex[name] >= end {
continue
Expand Down Expand Up @@ -185,7 +187,7 @@ func Unmarshal(tableMap *map[string]*Layout.Table, bgn int, end int, dstInterfac
mapRecord[po].KeyValues = append(mapRecord[po].KeyValues,
KeyValue{Key: reflect.ValueOf(nil), Value: reflect.ValueOf(nil)})
}
mapRecord[po].KeyValues[mapRecord[po].Index].Key = reflect.ValueOf(ParquetType.ParquetTypeToGoType(table.Values[tableIndex[name]]))
mapRecord[po].KeyValues[mapRecord[po].Index].Key = reflect.ValueOf(ParquetType.ParquetTypeToGoType(table.Values[tableIndex[name]], pT, cT))
break
}
} else {
Expand All @@ -203,7 +205,7 @@ func Unmarshal(tableMap *map[string]*Layout.Table, bgn int, end int, dstInterfac
po = po.Elem()

} else {
po.Set(reflect.ValueOf(ParquetType.ParquetTypeToGoType(table.Values[tableIndex[name]])))
po.Set(reflect.ValueOf(ParquetType.ParquetTypeToGoType(table.Values[tableIndex[name]], pT, cT)))
break
}
} //for pathIndex < len(path)
Expand Down
87 changes: 61 additions & 26 deletions ParquetType/ParquetType.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"github.com/xitongsys/parquet-go/parquet"
"log"
"reflect"
)

//base type
Expand Down Expand Up @@ -35,30 +34,66 @@ type TIMESTAMP_MICROS int64
type INTERVAL string // length=12
type DECIMAL string

func ParquetTypeToGoType(value interface{}) interface{} {
if value == nil {
func ParquetTypeToGoType(src interface{}, pT *parquet.Type, cT *parquet.ConvertedType) interface{} {
if src == nil {
return nil
}
typeName := reflect.TypeOf(value).Name()
switch typeName {
case "BOOLEAN":
return bool(value.(BOOLEAN))
case "INT32":
return int32(value.(INT32))
case "INT64":
return int64(value.(INT64))
case "INT96":
return string(value.(INT96))
case "FLOAT":
return float32(value.(FLOAT))
case "DOUBLE":
return float64(value.(DOUBLE))
case "BYTE_ARRAY":
return string(value.(BYTE_ARRAY))
case "FIXED_LEN_BYTE_ARRAY":
return string(value.(FIXED_LEN_BYTE_ARRAY))
if cT == nil {
if *pT == parquet.Type_BOOLEAN {
return bool(src.(BOOLEAN))
} else if *pT == parquet.Type_INT32 {
return int32(src.(INT32))
} else if *pT == parquet.Type_INT64 {
return int64(src.(INT64))
} else if *pT == parquet.Type_INT96 {
return string(src.(INT96))
} else if *pT == parquet.Type_FLOAT {
return float32(src.(FLOAT))
} else if *pT == parquet.Type_DOUBLE {
return float64(src.(DOUBLE))
} else if *pT == parquet.Type_BYTE_ARRAY {
return string(src.(BYTE_ARRAY))
} else if *pT == parquet.Type_FIXED_LEN_BYTE_ARRAY {
return string(src.(FIXED_LEN_BYTE_ARRAY))
}
return nil
}

if *cT == parquet.ConvertedType_UTF8 {
return string(src.(BYTE_ARRAY))
} else if *cT == parquet.ConvertedType_INT_8 {
return int32(src.(INT32))
} else if *cT == parquet.ConvertedType_INT_16 {
return int32(src.(INT32))
} else if *cT == parquet.ConvertedType_INT_32 {
return int32(src.(INT32))
} else if *cT == parquet.ConvertedType_INT_64 {
return int64(src.(INT64))
} else if *cT == parquet.ConvertedType_UINT_8 {
return uint32(src.(INT32))
} else if *cT == parquet.ConvertedType_UINT_16 {
return uint32(src.(INT32))
} else if *cT == parquet.ConvertedType_UINT_32 {
return uint32(src.(INT32))
} else if *cT == parquet.ConvertedType_UINT_64 {
return uint64(src.(INT64))
} else if *cT == parquet.ConvertedType_DATE {
return int32(src.(INT32))
} else if *cT == parquet.ConvertedType_TIME_MILLIS {
return int32(src.(INT32))
} else if *cT == parquet.ConvertedType_TIME_MICROS {
return int64(src.(INT64))
} else if *cT == parquet.ConvertedType_TIMESTAMP_MILLIS {
return int64(src.(INT64))
} else if *cT == parquet.ConvertedType_TIMESTAMP_MICROS {
return int64(src.(INT64))
} else if *cT == parquet.ConvertedType_INTERVAL {
return string(src.(FIXED_LEN_BYTE_ARRAY))
} else if *cT == parquet.ConvertedType_DECIMAL {
return string(src.(BYTE_ARRAY))
} else {
return nil
}
return nil
}

//Scan a string to parquet value
Expand Down Expand Up @@ -139,13 +174,13 @@ func GoTypeToParquetType(src interface{}, pT *parquet.Type, cT *parquet.Converte
} else if *cT == parquet.ConvertedType_INT_64 {
return INT64(src.(int64))
} else if *cT == parquet.ConvertedType_UINT_8 {
return INT32(src.(int32))
return INT32(src.(uint32))
} else if *cT == parquet.ConvertedType_UINT_16 {
return INT32(src.(int32))
return INT32(src.(uint32))
} else if *cT == parquet.ConvertedType_UINT_32 {
return INT32(src.(int32))
return INT32(src.(uint32))
} else if *cT == parquet.ConvertedType_UINT_64 {
return INT64(src.(int64))
return INT64(src.(uint64))
} else if *cT == parquet.ConvertedType_DATE {
return INT32(src.(int32))
} else if *cT == parquet.ConvertedType_TIME_MILLIS {
Expand Down
16 changes: 8 additions & 8 deletions example/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type TypeList struct {
Int_16 int32 `parquet:"name=int_16, type=INT_16"`
Int_32 int32 `parquet:"name=int_32, type=INT_32"`
Int_64 int64 `parquet:"name=int_64, type=INT_64"`
Uint_8 int32 `parquet:"name=uint_8, type=UINT_8"`
Uint_16 int32 `parquet:"name=uint_16, type=UINT_16"`
Uint_32 int32 `parquet:"name=uint_32, type=UINT_32"`
Uint_64 int64 `parquet:"name=uint_64, type=UINT_64"`
Uint_8 uint32 `parquet:"name=uint_8, type=UINT_8"`
Uint_16 uint32 `parquet:"name=uint_16, type=UINT_16"`
Uint_32 uint32 `parquet:"name=uint_32, type=UINT_32"`
Uint_64 uint64 `parquet:"name=uint_64, type=UINT_64"`
Date int32 `parquet:"name=date, type=DATE"`
TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"`
TimeMicros int64 `parquet:"name=timemicros, type=TIME_MICROS"`
Expand Down Expand Up @@ -56,10 +56,10 @@ func main() {
Int_16: int32(i),
Int_32: int32(i),
Int_64: int64(i),
Uint_8: int32(i),
Uint_16: int32(i),
Uint_32: int32(i),
Uint_64: int64(i),
Uint_8: uint32(i),
Uint_16: uint32(i),
Uint_32: uint32(i),
Uint_64: uint64(i),
Date: int32(i),
TimeMillis: int32(i),
TimeMicros: int64(i),
Expand Down

0 comments on commit aee4ee1

Please sign in to comment.