Skip to content

Commit

Permalink
feat: add source file metadata columns
Browse files Browse the repository at this point in the history
  • Loading branch information
TyShkan committed Mar 21, 2023
1 parent c53e43c commit 8b16293
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 12 deletions.
13 changes: 8 additions & 5 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ plugins:
file: alphabet.csv
keys:
- col1
add_metadata_columns: false
settings:
- description: Array of objects containing keys - `entity`, `file`, `keys`, `encoding` (Optional), `delimiter` (Optional), `doublequote` (Optional), `escapechar` (Optional), `quotechar` (Optional), `skipinitialspace` (Optional), `strict` (Optional)
- name: files
description: Array of objects containing keys - `entity`, `file`, `keys`, `encoding` (Optional), `delimiter` (Optional), `doublequote` (Optional), `escapechar` (Optional), `quotechar` (Optional), `skipinitialspace` (Optional), `strict` (Optional)
kind: array
name: files
- description: Project-relative path to JSON file holding array of objects with
keys: `entity`, `file`, `keys`, and `encoding` (Optional).
- name: csv_files_definition
description: "Project-relative path to JSON file holding array of objects with keys: `entity`, `file`, `keys`, and `encoding` (Optional)."
documentation: https://gitlab.com/meltano/tap-csv#run
label: CSV Files Definition
name: csv_files_definition
placeholder: Ex. files-def.json
- name: add_metadata_columns
description: Specifies whether to add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to stream output
kind: boolean
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
34 changes: 27 additions & 7 deletions tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

import csv
import os
from datetime import datetime, timezone
from typing import Iterable, List, Optional

from singer_sdk import typing as th
from singer_sdk.streams import Stream

SDC_SOURCE_FILE_COLUMN = "_sdc_source_file"
SDC_SOURCE_LINENO_COLUMN = "_sdc_source_lineno"
SDC_SOURCE_FILE_MTIME_COLUMN = "_sdc_source_file_mtime"


class CSVStream(Stream):
"""Stream class for CSV streams."""

file_paths: List[str] = []
header: List[str] = []

def __init__(self, *args, **kwargs):
"""Init CSVStram."""
Expand All @@ -27,12 +33,20 @@ def get_records(self, context: Optional[dict]) -> Iterable[dict]:
require partitioning and should ignore the `context` argument.
"""
for file_path in self.get_file_paths():
headers: List[str] = []
file_last_modified = datetime.fromtimestamp(os.path.getmtime(file_path), timezone.utc)

file_lineno = -1

for row in self.get_rows(file_path):
if not headers:
headers = row
file_lineno += 1

if not file_lineno:
continue
yield dict(zip(headers, row))

if self.config.get("add_metadata_columns", False):
row = [file_path, file_last_modified, file_lineno] + row

yield dict(zip(self.header, row))

def get_file_paths(self) -> list:
"""Return a list of file paths to read.
Expand Down Expand Up @@ -73,9 +87,7 @@ def is_valid_filename(self, file_path: str) -> bool:
if file_path[-4:] != ".csv":
is_valid = False
self.logger.warning(f"Skipping non-csv file '{file_path}'")
self.logger.warning(
"Please provide a CSV file that ends with '.csv'; e.g. 'users.csv'"
)
self.logger.warning("Please provide a CSV file that ends with '.csv'; e.g. 'users.csv'")
return is_valid

def get_rows(self, file_path: str) -> Iterable[list]:
Expand Down Expand Up @@ -110,8 +122,16 @@ def schema(self) -> dict:
break
break

# If enabled, add file's metadata to output
if self.config.get("add_metadata_columns", False):
header = [SDC_SOURCE_FILE_COLUMN, SDC_SOURCE_FILE_MTIME_COLUMN, SDC_SOURCE_LINENO_COLUMN] + header

for column in header:
# Set all types to string
# TODO: Try to be smarter about inferring types.
properties.append(th.Property(column, th.StringType()))

# Cache header for future use
self.header = header

return th.PropertiesList(*properties).to_dict()
1 change: 1 addition & 0 deletions tap_csv/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class TapCSV(Tap):
th.StringType,
description="A path to the JSON file holding an array of file settings.",
),
th.Property("add_metadata_columns", th.BooleanType, required=False, default=False),
).to_dict()

@classproperty
Expand Down
18 changes: 18 additions & 0 deletions tap_csv/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,21 @@ def test_standard_tap_tests_csv_dialect():
tests = get_standard_tap_tests(TapCSV, config=SAMPLE_CONFIG)
for test in tests:
test()

# Run standard built-in tap tests from the SDK, with metadata columns included:
def test_standard_tap_tests():
"""Run standard tap tests from the SDK, with metadata columns included"""
test_data_dir = os.path.dirname(os.path.abspath(__file__))
SAMPLE_CONFIG = {
"add_metadata_columns": True,
"files": [
{
"entity": "test",
"path": f"{test_data_dir}/data/alphabet.csv",
"keys": [],
}
]
}
tests = get_standard_tap_tests(TapCSV, config=SAMPLE_CONFIG)
for test in tests:
test()

0 comments on commit 8b16293

Please sign in to comment.