forked from MeltanoLabs/tap-csv
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtap.py
95 lines (82 loc) · 3.2 KB
/
tap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""CSV tap class."""
import json
import os
from typing import List
from singer_sdk import Stream, Tap
from singer_sdk import typing as th # JSON schema typing helpers
from singer_sdk.helpers._classproperty import classproperty
from tap_csv.client import CSVStream
class TapCSV(Tap):
"""CSV tap class."""
name = "tap-csv"
config_jsonschema = th.PropertiesList(
th.Property(
"files",
th.ArrayType(
th.ObjectType(
th.Property("entity", th.StringType, required=True),
th.Property("path", th.StringType, required=True),
th.Property("keys", th.ArrayType(th.StringType), required=True),
th.Property(
"encoding", th.StringType, required=False, default="utf-8"
),
th.Property("delimiter", th.StringType, required=False),
th.Property("doublequote", th.BooleanType, required=False),
th.Property("escapechar", th.StringType, required=False),
th.Property("quotechar", th.StringType, required=False),
th.Property("skipinitialspace", th.BooleanType, required=False),
th.Property("strict", th.BooleanType, required=False),
)
),
description="An array of csv file stream settings.",
),
th.Property(
"csv_files_definition",
th.StringType,
description="A path to the JSON file holding an array of file settings.",
),
th.Property(
"add_metadata_columns",
th.BooleanType,
required=False,
default=False,
description=(
"When True, add the metadata columns (`_sdc_source_file`, "
"`_sdc_source_file_mtime`, `_sdc_source_lineno`) to output."
),
),
).to_dict()
@classproperty
def capabilities(self) -> List[str]:
"""Get tap capabilites."""
return ["sync", "catalog", "discover"]
def get_file_configs(self) -> List[dict]:
"""Return a list of file configs.
Either directly from the config.json or in an external file
defined by csv_files_definition.
"""
csv_files = self.config.get("files")
csv_files_definition = self.config.get("csv_files_definition")
if csv_files_definition:
if os.path.isfile(csv_files_definition):
with open(csv_files_definition, "r") as f:
csv_files = json.load(f)
else:
self.logger.error(f"tap-csv: '{csv_files_definition}' file not found")
exit(1)
if not csv_files:
self.logger.error("No CSV file definitions found.")
exit(1)
return csv_files
def discover_streams(self) -> List[Stream]:
"""Return a list of discovered streams."""
return [
CSVStream(
tap=self,
name=file_config.get("entity"),
file_config=file_config,
)
for file_config in self.get_file_configs()
]
if __name__ == "__main__":
TapCSV.cli()