Skip to content

Commit

Permalink
Merge pull request MeltanoLabs#126 from TyShkan/feat_add_source_file_…
Browse files Browse the repository at this point in the history
…metadata

feat: add source file metadata columns
  • Loading branch information
pnadolny13 authored Mar 21, 2023
2 parents c53e43c + 255a6dc commit 2d2d5e2
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Note: This tap currently does not support incremental state.
|:--------------------|:--------:|:-------:|:------------|
| files | False | None | An array of csv file stream settings. |
| csv_files_definition| False | None | A path to the JSON file holding an array of file settings. |
| add_metadata_columns| False | False | When True, add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to output. |

A full list of supported settings and capabilities is available by running: `tap-csv --about`

Expand Down
14 changes: 9 additions & 5 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ plugins:
files:
- entity: alphabet
file: alphabet.csv
path: ./tap_csv/tests/data/
keys:
- col1
add_metadata_columns: false
settings:
- description: Array of objects containing keys - `entity`, `file`, `keys`, `encoding` (Optional), `delimiter` (Optional), `doublequote` (Optional), `escapechar` (Optional), `quotechar` (Optional), `skipinitialspace` (Optional), `strict` (Optional)
- name: files
description: Array of objects containing keys - `entity`, `file`, `keys`, `encoding` (Optional), `delimiter` (Optional), `doublequote` (Optional), `escapechar` (Optional), `quotechar` (Optional), `skipinitialspace` (Optional), `strict` (Optional)
kind: array
name: files
- description: Project-relative path to JSON file holding array of objects with
keys: `entity`, `file`, `keys`, and `encoding` (Optional).
- name: csv_files_definition
description: "Project-relative path to JSON file holding array of objects with keys: `entity`, `file`, `keys`, and `encoding` (Optional)."
documentation: https://gitlab.com/meltano/tap-csv#run
label: CSV Files Definition
name: csv_files_definition
placeholder: Ex. files-def.json
- name: add_metadata_columns
description: When True, add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to output.
kind: boolean
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
36 changes: 32 additions & 4 deletions tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

import csv
import os
from datetime import datetime, timezone
from typing import Iterable, List, Optional

from singer_sdk import typing as th
from singer_sdk.streams import Stream

SDC_SOURCE_FILE_COLUMN = "_sdc_source_file"
SDC_SOURCE_LINENO_COLUMN = "_sdc_source_lineno"
SDC_SOURCE_FILE_MTIME_COLUMN = "_sdc_source_file_mtime"


class CSVStream(Stream):
"""Stream class for CSV streams."""

file_paths: List[str] = []
header: List[str] = []

def __init__(self, *args, **kwargs):
"""Init CSVStram."""
Expand All @@ -27,12 +33,22 @@ def get_records(self, context: Optional[dict]) -> Iterable[dict]:
require partitioning and should ignore the `context` argument.
"""
for file_path in self.get_file_paths():
headers: List[str] = []
file_last_modified = datetime.fromtimestamp(
os.path.getmtime(file_path), timezone.utc
)

file_lineno = -1

for row in self.get_rows(file_path):
if not headers:
headers = row
file_lineno += 1

if not file_lineno:
continue
yield dict(zip(headers, row))

if self.config.get("add_metadata_columns", False):
row = [file_path, file_last_modified, file_lineno] + row

yield dict(zip(self.header, row))

def get_file_paths(self) -> list:
"""Return a list of file paths to read.
Expand Down Expand Up @@ -110,8 +126,20 @@ def schema(self) -> dict:
break
break

# If enabled, add file's metadata to output
if self.config.get("add_metadata_columns", False):
header = [
SDC_SOURCE_FILE_COLUMN,
SDC_SOURCE_FILE_MTIME_COLUMN,
SDC_SOURCE_LINENO_COLUMN,
] + header

for column in header:
# Set all types to string
# TODO: Try to be smarter about inferring types.
properties.append(th.Property(column, th.StringType()))

# Cache header for future use
self.header = header

return th.PropertiesList(*properties).to_dict()
10 changes: 10 additions & 0 deletions tap_csv/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ class TapCSV(Tap):
th.StringType,
description="A path to the JSON file holding an array of file settings.",
),
th.Property(
"add_metadata_columns",
th.BooleanType,
required=False,
default=False,
description=(
"When True, add the metadata columns (`_sdc_source_file`, "
"`_sdc_source_file_mtime`, `_sdc_source_lineno`) to output."
),
),
).to_dict()

@classproperty
Expand Down
19 changes: 19 additions & 0 deletions tap_csv/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,22 @@ def test_standard_tap_tests_csv_dialect():
tests = get_standard_tap_tests(TapCSV, config=SAMPLE_CONFIG)
for test in tests:
test()


# Run standard built-in tap tests from the SDK, with metadata columns included:
def test_standard_tap_tests_metadata_cols():
"""Run standard tap tests from the SDK, with metadata columns included"""
test_data_dir = os.path.dirname(os.path.abspath(__file__))
SAMPLE_CONFIG = {
"add_metadata_columns": True,
"files": [
{
"entity": "test",
"path": f"{test_data_dir}/data/alphabet.csv",
"keys": [],
}
],
}
tests = get_standard_tap_tests(TapCSV, config=SAMPLE_CONFIG)
for test in tests:
test()

0 comments on commit 2d2d5e2

Please sign in to comment.