-
Notifications
You must be signed in to change notification settings - Fork 295
/
Copy patharrow.go
134 lines (124 loc) · 5.07 KB
/
arrow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package schema
import (
"fmt"
"github.com/apache/arrow/go/arrow"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/parquet"
)
// Schema metadata used to parse the native and converted types and
// creating the schema definitions
const (
convertedMetaDataTemplate = "name=%s, type=%s, convertedtype=%s"
primitiveMetaDataTemplate = "name=%s, type=%s"
rootNodeName = "Parquet45go45root"
)
// ConvertArrowToParquetSchema converts arrow schema to representation
// understandable by parquet-go library.
// We need this coversion and can't directly use arrow format because the
// go parquet type contains metadata which the base writer is using to
// determine the size of the objects.
func ConvertArrowToParquetSchema(schema *arrow.Schema) ([]string, error) {
metaData := make([]string, len(schema.Fields()))
var err error
for k, v := range schema.Fields() {
switch fieldType := v.Type; fieldType.Name() {
case arrow.PrimitiveTypes.Int8.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT32, parquet.ConvertedType_INT_8)
case arrow.PrimitiveTypes.Int16.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT32, parquet.ConvertedType_INT_16)
case arrow.PrimitiveTypes.Int32.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT32, parquet.ConvertedType_INT_32)
case arrow.PrimitiveTypes.Int64.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT64, parquet.ConvertedType_INT_64)
case arrow.PrimitiveTypes.Uint8.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT32, parquet.ConvertedType_UINT_8)
case arrow.PrimitiveTypes.Uint16.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT32, parquet.ConvertedType_UINT_16)
case arrow.PrimitiveTypes.Uint32.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT32, parquet.ConvertedType_UINT_32)
case arrow.PrimitiveTypes.Uint64.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate,
v.Name, parquet.Type_INT64, parquet.ConvertedType_UINT_64)
case arrow.PrimitiveTypes.Float32.Name():
metaData[k] = fmt.Sprintf(primitiveMetaDataTemplate, v.Name,
parquet.Type_FLOAT)
case arrow.PrimitiveTypes.Float64.Name():
metaData[k] = fmt.Sprintf(primitiveMetaDataTemplate, v.Name,
parquet.Type_DOUBLE)
case arrow.PrimitiveTypes.Date32.Name(),
arrow.PrimitiveTypes.Date64.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate, v.Name,
parquet.Type_INT32, parquet.ConvertedType_DATE)
case arrow.FixedWidthTypes.Date32.Name(), arrow.FixedWidthTypes.Date64.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate, v.Name,
parquet.Type_INT32, parquet.ConvertedType_DATE)
case arrow.BinaryTypes.Binary.Name():
metaData[k] = fmt.Sprintf(primitiveMetaDataTemplate, v.Name,
parquet.Type_BYTE_ARRAY)
case arrow.BinaryTypes.String.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate, v.Name,
parquet.Type_BYTE_ARRAY, parquet.ConvertedType_UTF8)
case arrow.FixedWidthTypes.Boolean.Name():
metaData[k] = fmt.Sprintf(primitiveMetaDataTemplate, v.Name,
parquet.Type_BOOLEAN)
case arrow.FixedWidthTypes.Time32ms.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate, v.Name,
parquet.Type_INT32, parquet.ConvertedType_TIME_MILLIS)
case arrow.FixedWidthTypes.Timestamp_ms.Name():
metaData[k] = fmt.Sprintf(convertedMetaDataTemplate, v.Name,
parquet.Type_INT64, parquet.ConvertedType_TIMESTAMP_MILLIS)
default:
return nil,
fmt.Errorf("Unsupported arrow format: %s", fieldType.Name())
}
}
return metaData, err
}
// NewSchemaHandlerFromArrow creates a schema handler from arrow format.
// This handler is needed since the base ParquetWriter does not understand
// arrow schema and we need to translate it to the native format which the
// parquet-go library understands.
func NewSchemaHandlerFromArrow(arrowSchema *arrow.Schema) (
*SchemaHandler, error) {
schemaList := make([]*parquet.SchemaElement, 0)
infos := make([]*common.Tag, 0)
fields, err := ConvertArrowToParquetSchema(arrowSchema)
if err != nil {
return nil, err
}
rootSchema := parquet.NewSchemaElement()
rootSchema.Name = rootNodeName
rootNumChildren := int32(len(fields))
rootSchema.NumChildren = &rootNumChildren
rt := parquet.FieldRepetitionType_REQUIRED
rootSchema.RepetitionType = &rt
schemaList = append(schemaList, rootSchema)
rootInfo := common.NewTag()
rootInfo.InName = rootNodeName
rootInfo.ExName = rootNodeName
rootInfo.RepetitionType = parquet.FieldRepetitionType_REQUIRED
infos = append(infos, rootInfo)
for _, field := range fields {
info, err := common.StringToTag(field)
if err != nil {
return nil, err
}
infos = append(infos, info)
schema, err := common.NewSchemaElementFromTagMap(info)
if err != nil {
return nil, err
}
schemaList = append(schemaList, schema)
}
res := NewSchemaHandlerFromSchemaList(schemaList)
res.Infos = infos
res.CreateInExMap()
return res, nil
}