diff --git a/src/bin/commands/demux.rs b/src/bin/commands/demux.rs
index 4574772..afeb22e 100644
--- a/src/bin/commands/demux.rs
+++ b/src/bin/commands/demux.rs
@@ -16,11 +16,13 @@ use read_structure::SegmentType;
use seq_io::fastq::Reader as FastqReader;
use seq_io::fastq::Record;
use serde::{Deserialize, Serialize};
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
+use std::fmt::Display;
use std::fs::File;
use std::io::{BufRead, BufWriter, Write};
use std::iter::Filter;
use std::slice::Iter;
+use std::str::FromStr;
use std::{
fs,
path::{Path, PathBuf},
@@ -50,13 +52,40 @@ struct FastqSegment {
// ReadSet and it's impls
////////////////////////////////////////////////////////////////////////////////
+#[derive(Eq, Hash, PartialEq, Debug, Clone, Copy)]
+enum SkipReason {
+ /// The read had to few bases for the segment.
+ TooFewBases,
+}
+
+impl Display for SkipReason {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ SkipReason::TooFewBases => write!(f, "Too few bases"),
+ }
+ }
+}
+
+impl FromStr for SkipReason {
+ type Err = anyhow::Error;
+ fn from_str(string: &str) -> Result {
+ match string {
+ "too few bases" | "too-few-bases" | "toofewbases" => Ok(SkipReason::TooFewBases),
+ _ => Err(anyhow!("Invalid skip reason: {}", string)),
+ }
+ }
+}
+
/// One unit of FASTQ records separated into their component read segments.
-#[derive(PartialEq, Eq, Debug, Clone)]
+#[derive(PartialEq, Debug, Clone)]
struct ReadSet {
/// Header of the FASTQ record
header: Vec,
/// Segments of reads
segments: Vec,
+ /// The reason the read should be skipped for demultiplexing, or None if it should be
+ /// demultiplexed.
+ skip_reason: Option,
}
impl ReadSet {
@@ -243,6 +272,8 @@ struct ReadSetIterator {
read_structure: ReadStructure,
/// The file containing FASTQ records.
source: FastqReader>,
+ /// Valid reasons for skipping reads, otherwise panic!
+ skip_reasons: Vec,
}
impl Iterator for ReadSetIterator {
@@ -252,25 +283,53 @@ impl Iterator for ReadSetIterator {
if let Some(rec) = self.source.next() {
let mut segments = Vec::with_capacity(self.read_structure.number_of_segments());
let next_fq_rec = rec.expect("Unexpected error parsing FASTQs");
- let read_name = next_fq_rec.head().to_vec();
+ let read_name = next_fq_rec.head();
+ let bases = next_fq_rec.seq();
+ let quals = next_fq_rec.qual();
+
+ // Check that we have enough bases for this segment. For variable length segments,
+ // we must have at least one base.
+ let min_len = self.read_structure.iter().map(|s| s.length().unwrap_or(1)).sum();
+ if bases.len() < min_len {
+ if self.skip_reasons.contains(&SkipReason::TooFewBases) {
+ return Some(ReadSet {
+ header: read_name.to_vec(),
+ segments: vec![],
+ skip_reason: Some(SkipReason::TooFewBases),
+ });
+ }
+ panic!(
+ "Read {} had too few bases to demux {} vs. {} needed in read structure {}.",
+ String::from_utf8(read_name.to_vec()).unwrap(),
+ bases.len(),
+ min_len,
+ self.read_structure
+ );
+ }
- for read_segment in self.read_structure.iter() {
- let (seq, quals) = read_segment
- .extract_bases_and_quals(next_fq_rec.seq(), next_fq_rec.qual())
- .unwrap_or_else(|e| {
+ for (read_segment_index, read_segment) in self.read_structure.iter().enumerate() {
+ // Extract the bases and qualities for this segment
+ let (seq, quals) =
+ read_segment.extract_bases_and_quals(bases, quals).unwrap_or_else(|e| {
panic!(
- "Error extracting read segment bases or quals from FASTQ record {}; {}",
- String::from_utf8(next_fq_rec.head().to_vec()).unwrap(),
+ "Error extracting bases (len: {}) or quals (len: {}) for the {}th read segment ({}) in read structure ({}) from FASTQ record with name {}; {}",
+ bases.len(),
+ quals.len(),
+ read_segment_index,
+ read_segment,
+ self.read_structure,
+ String::from_utf8(read_name.to_vec()).unwrap(),
e
)
});
+ // Add a new FastqSegment
segments.push(FastqSegment {
seq: seq.to_vec(),
quals: quals.to_vec(),
segment_type: read_segment.kind,
});
}
- Some(ReadSet { header: read_name, segments })
+ Some(ReadSet { header: read_name.to_vec(), segments, skip_reason: None })
} else {
None
}
@@ -283,8 +342,9 @@ impl ReadSetIterator {
pub fn new(
read_structure: ReadStructure,
source: FastqReader>,
+ skip_reasons: Vec,
) -> Self {
- Self { read_structure, source }
+ Self { read_structure, source, skip_reasons }
}
}
@@ -467,7 +527,7 @@ impl DemuxMetric {
/// sample barcodes extracted from the bases in the sample metadata and associated read structures.
///
/// An observed barcode matches an expected barcocde if all the following are true:
-/// 1. The number of mismatches (edits/substitutions) is less than or equal to the maximum
+/// 1. The number of mismatches (edits/substitutions) is less than or equal to the maximum
/// mismatches (see `--max-mismatches`).
/// 2. The difference between number of mismatches in the best and second best barcodes is greater
/// than or equal to the minimum mismatch delta (`--min-mismatch-delta`).
@@ -551,6 +611,14 @@ pub(crate) struct Demux {
/// The level of compression to use to compress outputs.
#[clap(long, short = 'c', default_value = "5")]
compression_level: usize,
+
+ /// Skip demultiplexing reads for any of the following reasons, otherwise panic.
+ ///
+ /// 1. `too-few-bases`: there are too few bases or qualities to extract given the read
+ /// structures. For example, if a read is 8bp long but the read structure is `10B`, or
+ /// if a read is empty and the read structure is `+T`.
+ #[clap(long, short = 'S')]
+ skip_reasons: Vec,
}
impl Demux {
@@ -768,6 +836,7 @@ impl Demux {
}
impl Command for Demux {
+ #[allow(clippy::too_many_lines)]
/// Executes the demux command
fn execute(&self) -> Result<()> {
let (fq_readers, output_segment_types) = self.validate_and_prepare_inputs()?;
@@ -819,7 +888,8 @@ impl Command for Demux {
let mut fq_iterators = fq_sources
.zip(self.read_structures.clone().into_iter())
.map(|(source, read_structure)| {
- ReadSetIterator::new(read_structure, source).read_ahead(1000, 1000)
+ ReadSetIterator::new(read_structure, source, self.skip_reasons.clone())
+ .read_ahead(1000, 1000)
})
.collect::>();
@@ -831,7 +901,7 @@ impl Command for Demux {
.count_formatter(CountFormatterKind::Comma)
.level(log::Level::Info)
.build();
-
+ let mut skip_reasons: HashMap = HashMap::new();
loop {
let mut next_read_sets = Vec::with_capacity(fq_iterators.len());
for iter in &mut fq_iterators {
@@ -839,7 +909,13 @@ impl Command for Demux {
next_read_sets.push(rec);
}
}
- if next_read_sets.is_empty() {
+ // Either skip the reason if any FASTQ record could not be destructured, or break the
+ // loop if no read were found indicating EOF.
+ if let Some(reason) = next_read_sets.iter().find_map(|read_set| read_set.skip_reason) {
+ let previous = skip_reasons.get(&reason).unwrap_or(&0);
+ skip_reasons.insert(reason, 1 + previous);
+ continue;
+ } else if next_read_sets.is_empty() {
break;
}
assert_eq!(
@@ -866,6 +942,15 @@ impl Command for Demux {
pool.stop_pool()?;
info!("Output FASTQ writing complete.");
+ // Write out reasons for skipping records
+ if skip_reasons.is_empty() {
+ info!("No records were skipped.");
+ } else {
+ for (reason, count) in skip_reasons.iter().sorted_by_key(|(_, c)| *c) {
+ info!("{} records were skipped due to {}", count, reason);
+ }
+ }
+
// Write out the metrics
let metrics_path = self.output.join("demux-metrics.txt");
DemuxMetric::update(sample_metrics.as_mut_slice(), &mut unmatched_metric);
@@ -967,6 +1052,10 @@ mod tests {
);
}
+ fn read_set(segments: Vec) -> ReadSet {
+ ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments, skip_reason: None }
+ }
+
// ############################################################################################
// Test that ``Demux:: execute`` can succeed.
// ############################################################################################
@@ -1000,6 +1089,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
}
@@ -1042,6 +1132,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
}
@@ -1079,6 +1170,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
let demux_result = demux_inputs.execute();
permissions.set_readonly(false);
@@ -1116,6 +1208,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 2,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
}
@@ -1150,6 +1243,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 2,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
}
@@ -1179,6 +1273,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
@@ -1202,12 +1297,14 @@ mod tests {
let read_structures = vec![ReadStructure::from_str("10M8B100T").unwrap()];
let s1_barcode = "AAAAAAAA";
let s1_umi = "ATCGATCGAT";
- let sample_metadata = metadata_file(
+ let sample_metadata =
+ metadata_file(&tmp, &[s1_barcode, "CCCCCCCC", "GGGGGGGG", "TTTTTTTT"]);
+ let input_files = vec![fastq_file(
&tmp,
- &[s1_barcode, "CCCCCCCC", "GGGGGGGG", "TTTTTTTT"],
- );
- let input_files =
- vec![fastq_file(&tmp, "ex", "ex", &[&(s1_umi.to_owned() + &*s1_barcode.to_owned() + &"A".repeat(100))])];
+ "ex",
+ "ex",
+ &[&(s1_umi.to_owned() + &*s1_barcode.to_owned() + &"A".repeat(100))],
+ )];
let output_dir = tmp.path().to_path_buf().join("output");
@@ -1222,6 +1319,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
@@ -1303,6 +1401,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
@@ -1354,7 +1453,7 @@ mod tests {
fastq_file(&tmp, "ex_I2", "ex", &[&s1_barcode[8..]]),
];
- let output_dir = tmp.path().to_path_buf().join("output");
+ let output_dir: PathBuf = tmp.path().to_path_buf().join("output");
let demux = Demux {
inputs: input_files,
@@ -1367,6 +1466,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux.execute().unwrap();
@@ -1430,6 +1530,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux.execute().unwrap();
@@ -1493,6 +1594,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
@@ -1566,6 +1668,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
}
@@ -1599,6 +1702,7 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
}
@@ -1632,10 +1736,103 @@ mod tests {
min_mismatch_delta: 2,
threads: 5,
compression_level: 5,
+ skip_reasons: vec![],
+ };
+ demux_inputs.execute().unwrap();
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "Read ex_2 had too few bases to demux 0 vs. 1 needed in read structure +T."
+ )]
+ fn test_fails_if_reads_too_short() {
+ let tmp = TempDir::new().unwrap();
+ let read_structures =
+ vec![ReadStructure::from_str("+T").unwrap(), ReadStructure::from_str("7B").unwrap()];
+
+ let records = vec![
+ vec!["AAAAAAA", &SAMPLE1_BARCODE[0..7]], // barcode too short
+ vec!["CCCCCCC", SAMPLE1_BARCODE], // barcode the correct length
+ vec!["", SAMPLE1_BARCODE], // template basese too short
+ vec!["G", SAMPLE1_BARCODE], // barcode the correct length
+ ];
+
+ let input_files = vec![
+ fastq_file(&tmp, "read1", "ex", &[records[0][0], records[1][0], records[2][0]]),
+ fastq_file(&tmp, "index1", "ex", &[records[0][1], records[1][1], records[2][1]]),
+ ];
+ let sample_metadata = metadata(&tmp);
+ let output_dir: PathBuf = tmp.path().to_path_buf().join("output");
+
+ let demux_inputs = Demux {
+ inputs: input_files,
+ read_structures,
+ sample_metadata,
+ output_types: vec!['T', 'B'],
+ output: output_dir.clone(),
+ unmatched_prefix: "unmatched".to_owned(),
+ max_mismatches: 1,
+ min_mismatch_delta: 2,
+ threads: 5,
+ compression_level: 5,
+ skip_reasons: vec![],
};
demux_inputs.execute().unwrap();
}
+ #[test]
+ fn test_skip_reads_too_short() {
+ let tmp = TempDir::new().unwrap();
+ let read_structures =
+ vec![ReadStructure::from_str("+T").unwrap(), ReadStructure::from_str("7B").unwrap()];
+
+ let records = vec![
+ vec!["AAAAAAA", &SAMPLE1_BARCODE[0..7]], // barcode too short
+ vec!["CCCCCCC", SAMPLE1_BARCODE], // barcode the correct length
+ vec!["", SAMPLE1_BARCODE], // template basese too short
+ vec!["G", SAMPLE1_BARCODE], // barcode the correct length
+ ];
+
+ let input_files = vec![
+ fastq_file(&tmp, "read1", "ex", &[records[0][0], records[1][0], records[2][0]]),
+ fastq_file(&tmp, "index1", "ex", &[records[0][1], records[1][1], records[2][1]]),
+ ];
+ let sample_metadata = metadata(&tmp);
+ let output_dir: PathBuf = tmp.path().to_path_buf().join("output");
+
+ let demux_inputs = Demux {
+ inputs: input_files,
+ read_structures,
+ sample_metadata,
+ output_types: vec!['T', 'B'],
+ output: output_dir.clone(),
+ unmatched_prefix: "unmatched".to_owned(),
+ max_mismatches: 1,
+ min_mismatch_delta: 2,
+ threads: 5,
+ compression_level: 5,
+ skip_reasons: vec![SkipReason::TooFewBases],
+ };
+ demux_inputs.execute().unwrap();
+
+ // Write out the metrics
+ let metrics_path = output_dir.join("demux-metrics.txt");
+ let metrics: Vec = DelimFile::default().read_tsv(&metrics_path).unwrap();
+ let demuxed_reads: usize = metrics.iter().map(|m| m.templates).sum();
+ let sample0000_reads =
+ metrics.iter().find(|m| m.sample_id == "Sample0000").unwrap().templates;
+ assert_eq!(demuxed_reads, 2);
+ assert_eq!(sample0000_reads, 2);
+
+ let r1_path = output_dir.join("Sample0000.R1.fq.gz");
+ let r1_reads = read_fastq(&r1_path);
+ assert_eq!(r1_reads.len(), 2);
+
+ let r2_path = output_dir.join("Sample0000.I1.fq.gz");
+ let r2_reads = read_fastq(&r2_path);
+ assert_eq!(r2_reads.len(), 2);
+ }
+
////////////////////////////////////////////////////////////////////////////
// Tests for ReadSet::write_header_internal
////////////////////////////////////////////////////////////////////////////
@@ -1772,7 +1969,7 @@ mod tests {
seg("GACCCC".as_bytes(), SegmentType::MolecularBarcode),
];
- let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments };
+ let read_set = read_set(segments);
assert_eq!(read_set.sample_barcode_sequence(), "GATACAC".as_bytes().to_owned());
}
@@ -1789,7 +1986,7 @@ mod tests {
seg("CAC".as_bytes(), SegmentType::Template),
];
- let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments };
+ let read_set = read_set(segments);
assert_eq!(expected, read_set.template_segments().cloned().collect::>());
}
@@ -1806,7 +2003,7 @@ mod tests {
seg("CAC".as_bytes(), SegmentType::SampleBarcode),
];
- let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments };
+ let read_set = read_set(segments);
assert_eq!(expected, read_set.sample_barcode_segments().cloned().collect::>());
}
@@ -1823,7 +2020,7 @@ mod tests {
seg("CAC".as_bytes(), SegmentType::MolecularBarcode),
];
- let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments };
+ let read_set = read_set(segments);
assert_eq!(expected, read_set.molecular_barcode_segments().cloned().collect::>());
}
@@ -1836,31 +2033,27 @@ mod tests {
seg("C".as_bytes(), SegmentType::MolecularBarcode),
seg("T".as_bytes(), SegmentType::SampleBarcode),
];
- let read_set1 =
- ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: segments1.clone() };
+ let read_set1 = read_set(segments1.clone());
let segments2 = vec![
seg("AA".as_bytes(), SegmentType::Template),
seg("AG".as_bytes(), SegmentType::Template),
seg("AC".as_bytes(), SegmentType::MolecularBarcode),
seg("AT".as_bytes(), SegmentType::SampleBarcode),
];
- let read_set2 =
- ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: segments2.clone() };
+ let read_set2 = read_set(segments2.clone());
let segments3 = vec![
seg("AAA".as_bytes(), SegmentType::Template),
seg("AAG".as_bytes(), SegmentType::Template),
seg("AAC".as_bytes(), SegmentType::MolecularBarcode),
seg("AAT".as_bytes(), SegmentType::SampleBarcode),
];
- let read_set3 =
- ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: segments3.clone() };
+ let read_set3 = read_set(segments3.clone());
let mut expected_segments = Vec::new();
expected_segments.extend(segments1);
expected_segments.extend(segments2);
expected_segments.extend(segments3);
- let expected =
- ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: expected_segments };
+ let expected = read_set(expected_segments);
let result = ReadSet::combine_readsets(vec![read_set1, read_set2, read_set3]);
diff --git a/src/lib/barcode_matching.rs b/src/lib/barcode_matching.rs
index 18eab8f..4372c7c 100644
--- a/src/lib/barcode_matching.rs
+++ b/src/lib/barcode_matching.rs
@@ -87,6 +87,11 @@ impl BarcodeMatcher {
u8::try_from(count).expect("Overflow on number of mismatch bases")
}
+ /// Returns the expected barcode length, assuming a fixed length for all samples.
+ fn expected_barcode_length(&self) -> usize {
+ self.sample_barcodes[0].len()
+ }
+
/// Assigns the barcode that best matches the provided ``read_bases``.
#[must_use]
fn assign_internal(&self, read_bases: &[u8]) -> Option {
@@ -122,6 +127,10 @@ impl BarcodeMatcher {
/// if configured to do so and skipping calculation for reads that cannot match any barcode (
/// due to having too many no-called bases).
pub fn assign(&mut self, read_bases: &[u8]) -> Option {
+ // do not try matching if there are not enough bases
+ if read_bases.len() < self.expected_barcode_length() {
+ return None;
+ }
let num_no_calls = read_bases.iter().filter(|&&b| byte_is_nocall(b)).count();
if num_no_calls > self.max_mismatches as usize {
None