diff --git a/meltano.yml b/meltano.yml index c424233..f73c468 100644 --- a/meltano.yml +++ b/meltano.yml @@ -18,16 +18,19 @@ plugins: file: alphabet.csv keys: - col1 + add_metadata_columns: false settings: - - description: Array of objects containing keys - `entity`, `file`, `keys`, `encoding` (Optional), `delimiter` (Optional), `doublequote` (Optional), `escapechar` (Optional), `quotechar` (Optional), `skipinitialspace` (Optional), `strict` (Optional) + - name: files + description: Array of objects containing keys - `entity`, `file`, `keys`, `encoding` (Optional), `delimiter` (Optional), `doublequote` (Optional), `escapechar` (Optional), `quotechar` (Optional), `skipinitialspace` (Optional), `strict` (Optional) kind: array - name: files - - description: Project-relative path to JSON file holding array of objects with - keys: `entity`, `file`, `keys`, and `encoding` (Optional). + - name: csv_files_definition + description: "Project-relative path to JSON file holding array of objects with keys: `entity`, `file`, `keys`, and `encoding` (Optional)." documentation: https://gitlab.com/meltano/tap-csv#run label: CSV Files Definition - name: csv_files_definition placeholder: Ex. files-def.json + - name: add_metadata_columns + description: Specifies whether to add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to stream output + kind: boolean loaders: - name: target-jsonl variant: andyh1203 diff --git a/tap_csv/client.py b/tap_csv/client.py index 0745db7..c981dcb 100644 --- a/tap_csv/client.py +++ b/tap_csv/client.py @@ -2,16 +2,22 @@ import csv import os +from datetime import datetime, timezone from typing import Iterable, List, Optional from singer_sdk import typing as th from singer_sdk.streams import Stream +SDC_SOURCE_FILE_COLUMN = "_sdc_source_file" +SDC_SOURCE_LINENO_COLUMN = "_sdc_source_lineno" +SDC_SOURCE_FILE_MTIME_COLUMN = "_sdc_source_file_mtime" + class CSVStream(Stream): """Stream class for CSV streams.""" file_paths: List[str] = [] + header: List[str] = [] def __init__(self, *args, **kwargs): """Init CSVStram.""" @@ -27,12 +33,20 @@ def get_records(self, context: Optional[dict]) -> Iterable[dict]: require partitioning and should ignore the `context` argument. """ for file_path in self.get_file_paths(): - headers: List[str] = [] + file_last_modified = datetime.fromtimestamp(os.path.getmtime(file_path), timezone.utc) + + file_lineno = -1 + for row in self.get_rows(file_path): - if not headers: - headers = row + file_lineno += 1 + + if not file_lineno: continue - yield dict(zip(headers, row)) + + if self.config.get("add_metadata_columns", False): + row = [file_path, file_last_modified, file_lineno] + row + + yield dict(zip(self.header, row)) def get_file_paths(self) -> list: """Return a list of file paths to read. @@ -73,9 +87,7 @@ def is_valid_filename(self, file_path: str) -> bool: if file_path[-4:] != ".csv": is_valid = False self.logger.warning(f"Skipping non-csv file '{file_path}'") - self.logger.warning( - "Please provide a CSV file that ends with '.csv'; e.g. 'users.csv'" - ) + self.logger.warning("Please provide a CSV file that ends with '.csv'; e.g. 'users.csv'") return is_valid def get_rows(self, file_path: str) -> Iterable[list]: @@ -110,8 +122,16 @@ def schema(self) -> dict: break break + # If enabled, add file's metadata to output + if self.config.get("add_metadata_columns", False): + header = [SDC_SOURCE_FILE_COLUMN, SDC_SOURCE_FILE_MTIME_COLUMN, SDC_SOURCE_LINENO_COLUMN] + header + for column in header: # Set all types to string # TODO: Try to be smarter about inferring types. properties.append(th.Property(column, th.StringType())) + + # Cache header for future use + self.header = header + return th.PropertiesList(*properties).to_dict() diff --git a/tap_csv/tap.py b/tap_csv/tap.py index a2514e2..257d0c5 100644 --- a/tap_csv/tap.py +++ b/tap_csv/tap.py @@ -42,6 +42,7 @@ class TapCSV(Tap): th.StringType, description="A path to the JSON file holding an array of file settings.", ), + th.Property("add_metadata_columns", th.BooleanType, required=False, default=False), ).to_dict() @classproperty diff --git a/tap_csv/tests/test_core.py b/tap_csv/tests/test_core.py index 8f0b902..cc830fb 100644 --- a/tap_csv/tests/test_core.py +++ b/tap_csv/tests/test_core.py @@ -69,3 +69,21 @@ def test_standard_tap_tests_csv_dialect(): tests = get_standard_tap_tests(TapCSV, config=SAMPLE_CONFIG) for test in tests: test() + +# Run standard built-in tap tests from the SDK, with metadata columns included: +def test_standard_tap_tests(): + """Run standard tap tests from the SDK, with metadata columns included""" + test_data_dir = os.path.dirname(os.path.abspath(__file__)) + SAMPLE_CONFIG = { + "add_metadata_columns": True, + "files": [ + { + "entity": "test", + "path": f"{test_data_dir}/data/alphabet.csv", + "keys": [], + } + ] + } + tests = get_standard_tap_tests(TapCSV, config=SAMPLE_CONFIG) + for test in tests: + test()