Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CSV Reader] [Skip Option] Tests and fixes #12213

Merged
merged 16 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Small fixes, all tests passing
  • Loading branch information
pdet committed May 23, 2024
commit 0e3e7658fbf3d5b6d1da13cf739712c6c1ec559d
13 changes: 7 additions & 6 deletions src/execution/operator/csv_scanner/scanner/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ BaseScanner::BaseScanner(shared_ptr<CSVBufferManager> buffer_manager_p, shared_p
shared_ptr<CSVErrorHandler> error_handler_p, bool sniffing_p,
shared_ptr<CSVFileScan> csv_file_scan_p, CSVIterator iterator_p)
: csv_file_scan(std::move(csv_file_scan_p)), sniffing(sniffing_p), error_handler(std::move(error_handler_p)),
state_machine(std::move(state_machine_p)), iterator(iterator_p), buffer_manager(std::move(buffer_manager_p)) {
state_machine(std::move(state_machine_p)), buffer_manager(std::move(buffer_manager_p)), iterator(iterator_p) {
D_ASSERT(buffer_manager);
D_ASSERT(state_machine);
// Initialize current buffer handle
Expand Down Expand Up @@ -41,14 +41,15 @@ bool BaseScanner::FinishedFile() {
return iterator.pos.buffer_pos + 1 == cur_buffer_handle->actual_size;
}

void BaseScanner::SkipCSVRows(idx_t rows_to_skip) {
CSVIterator BaseScanner::SkipCSVRows(shared_ptr<CSVBufferManager> buffer_manager,
const shared_ptr<CSVStateMachine> &state_machine, idx_t rows_to_skip) {
if (rows_to_skip == 0) {
return;
return {};
}
SkipScanner row_skipper(buffer_manager, state_machine, error_handler, rows_to_skip);
auto error_handler = make_shared_ptr<CSVErrorHandler>();
SkipScanner row_skipper(std::move(buffer_manager), state_machine, error_handler, rows_to_skip);
row_skipper.ParseChunk();
iterator.pos = row_skipper.GetIteratorPosition();
lines_read += row_skipper.GetLinesRead();
return row_skipper.GetIterator();
}

CSVIterator &BaseScanner::GetIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ ColumnCountScanner::ColumnCountScanner(shared_ptr<CSVBufferManager> buffer_manag
}

unique_ptr<StringValueScanner> ColumnCountScanner::UpgradeToStringValueScanner() {
auto scanner = make_uniq<StringValueScanner>(0U, buffer_manager, state_machine, error_handler, nullptr, true);
scanner->SkipCSVRows(scanner->state_machine->dialect_options.skip_rows.GetValue());
return scanner;
auto iterator = SkipCSVRows(buffer_manager, state_machine, state_machine->dialect_options.skip_rows.GetValue());
if (iterator.done) {
return make_uniq<StringValueScanner>(0U, buffer_manager, state_machine, error_handler, nullptr, true);
}
return make_uniq<StringValueScanner>(0U, buffer_manager, state_machine, error_handler, nullptr, true, iterator);
}

ColumnCountResult &ColumnCountScanner::ParseChunk() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ idx_t CSVIterator::GetFileIdx() const {
}

idx_t CSVIterator::GetBufferIdx() const {
return boundary.buffer_idx;
return pos.buffer_idx;
}

idx_t CSVIterator::GetBoundaryIdx() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ StringValueResult::StringValueResult(CSVStates &states, CSVStateMachine &state_m
if (iterator.first_one) {
lines_read +=
state_machine.dialect_options.skip_rows.GetValue() + state_machine.dialect_options.header.GetValue();
if (lines_read == 0){
if (lines_read == 0) {
SkipBOM();
}
}
Expand Down Expand Up @@ -948,7 +948,6 @@ void StringValueScanner::Flush(DataChunk &insert_chunk) {

void StringValueScanner::Initialize() {
states.Initialize();

if (result.result_size != 1 && !(sniffing && state_machine->options.null_padding &&
!state_machine->options.dialect_options.skip_rows.IsSetByUser())) {
SetStart();
Expand Down Expand Up @@ -1233,8 +1232,8 @@ bool StringValueScanner::MoveToNextBuffer() {
}

void StringValueResult::SkipBOM() {
if (buffer_size >= 3 && buffer_ptr[0] == '\xEF' && buffer_ptr[1] == '\xBB' &&
buffer_ptr[2] == '\xBF' && iterator.pos.buffer_pos == 0) {
if (buffer_size >= 3 && buffer_ptr[0] == '\xEF' && buffer_ptr[1] == '\xBB' && buffer_ptr[2] == '\xBF' &&
iterator.pos.buffer_pos == 0) {
iterator.pos.buffer_pos = 3;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ void CSVSniffer::SkipLines(vector<unique_ptr<ColumnCountScanner>> &csv_state_mac
auto &first_scanner = *csv_state_machines[0];
// We figure out the iterator position for the first scanner
if (options.dialect_options.skip_rows.IsSetByUser()) {
first_scanner.SkipCSVRows(options.dialect_options.skip_rows.GetValue());
// The iterator position is the same regardless of the scanner configuration, hence we apply the same iterator
// To the remaining scanners
const auto first_iterator = first_scanner.GetIterator();
const auto first_iterator = BaseScanner::SkipCSVRows(first_scanner.buffer_manager, first_scanner.state_machine,
options.dialect_options.skip_rows.GetValue());
for (idx_t i = 1; i < csv_state_machines.size(); i++) {
auto &cur_scanner = *csv_state_machines[i];
cur_scanner.SetIterator(first_iterator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ void CSVSniffer::DetectTypes() {

// Reset candidate for parsing
auto candidate = candidate_cc->UpgradeToStringValueScanner();

// Parse chunk and read csv with info candidate
auto &data_chunk = candidate->ParseChunk().ToChunk();
idx_t row_idx = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ class BaseScanner {

bool ever_quoted = false;

//! Shared pointer to the buffer_manager, this is shared across multiple scanners
shared_ptr<CSVBufferManager> buffer_manager;

//! Skips Notes and/or parts of the data, starting from the top.
//! notes are dirty lines on top of the file, before the actual data
void SkipCSVRows(idx_t rows_to_skip);
static CSVIterator SkipCSVRows(shared_ptr<CSVBufferManager> buffer_manager,
const shared_ptr<CSVStateMachine> &state_machine, idx_t rows_to_skip);

protected:
//! Boundaries of this scanner
Expand All @@ -110,9 +114,6 @@ class BaseScanner {
//! Hold the current buffer ptr
char *buffer_handle_ptr = nullptr;

//! Shared pointer to the buffer_manager, this is shared across multiple scanners
shared_ptr<CSVBufferManager> buffer_manager;

//! If this scanner has been initialized
bool initialized = false;
//! How many lines were read by this scanner
Expand Down
2 changes: 1 addition & 1 deletion test/sql/copy/csv/test_skip.test
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ SELECT * EXCLUDE (prompt) from sniff_csv('__TEST_DIR__/skip.csv',skip=3000)
query IIIIIIIIII
SELECT * EXCLUDE (prompt) from sniff_csv('__TEST_DIR__/skip.csv',skip=11000)
----
, " " \n 11000 0 [{'name': column0, 'type': VARCHAR}] NULL NULL skip=11000
, " " \n 11000 0 [{'name': column0, 'type': BIGINT}] NULL NULL skip=11000

# Test with different buffer sizes

Expand Down