diff --git a/src/bin/commands/demux.rs b/src/bin/commands/demux.rs index 4574772..afeb22e 100644 --- a/src/bin/commands/demux.rs +++ b/src/bin/commands/demux.rs @@ -16,11 +16,13 @@ use read_structure::SegmentType; use seq_io::fastq::Reader as FastqReader; use seq_io::fastq::Record; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; +use std::fmt::Display; use std::fs::File; use std::io::{BufRead, BufWriter, Write}; use std::iter::Filter; use std::slice::Iter; +use std::str::FromStr; use std::{ fs, path::{Path, PathBuf}, @@ -50,13 +52,40 @@ struct FastqSegment { // ReadSet and it's impls //////////////////////////////////////////////////////////////////////////////// +#[derive(Eq, Hash, PartialEq, Debug, Clone, Copy)] +enum SkipReason { + /// The read had to few bases for the segment. + TooFewBases, +} + +impl Display for SkipReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SkipReason::TooFewBases => write!(f, "Too few bases"), + } + } +} + +impl FromStr for SkipReason { + type Err = anyhow::Error; + fn from_str(string: &str) -> Result { + match string { + "too few bases" | "too-few-bases" | "toofewbases" => Ok(SkipReason::TooFewBases), + _ => Err(anyhow!("Invalid skip reason: {}", string)), + } + } +} + /// One unit of FASTQ records separated into their component read segments. -#[derive(PartialEq, Eq, Debug, Clone)] +#[derive(PartialEq, Debug, Clone)] struct ReadSet { /// Header of the FASTQ record header: Vec, /// Segments of reads segments: Vec, + /// The reason the read should be skipped for demultiplexing, or None if it should be + /// demultiplexed. + skip_reason: Option, } impl ReadSet { @@ -243,6 +272,8 @@ struct ReadSetIterator { read_structure: ReadStructure, /// The file containing FASTQ records. source: FastqReader>, + /// Valid reasons for skipping reads, otherwise panic! + skip_reasons: Vec, } impl Iterator for ReadSetIterator { @@ -252,25 +283,53 @@ impl Iterator for ReadSetIterator { if let Some(rec) = self.source.next() { let mut segments = Vec::with_capacity(self.read_structure.number_of_segments()); let next_fq_rec = rec.expect("Unexpected error parsing FASTQs"); - let read_name = next_fq_rec.head().to_vec(); + let read_name = next_fq_rec.head(); + let bases = next_fq_rec.seq(); + let quals = next_fq_rec.qual(); + + // Check that we have enough bases for this segment. For variable length segments, + // we must have at least one base. + let min_len = self.read_structure.iter().map(|s| s.length().unwrap_or(1)).sum(); + if bases.len() < min_len { + if self.skip_reasons.contains(&SkipReason::TooFewBases) { + return Some(ReadSet { + header: read_name.to_vec(), + segments: vec![], + skip_reason: Some(SkipReason::TooFewBases), + }); + } + panic!( + "Read {} had too few bases to demux {} vs. {} needed in read structure {}.", + String::from_utf8(read_name.to_vec()).unwrap(), + bases.len(), + min_len, + self.read_structure + ); + } - for read_segment in self.read_structure.iter() { - let (seq, quals) = read_segment - .extract_bases_and_quals(next_fq_rec.seq(), next_fq_rec.qual()) - .unwrap_or_else(|e| { + for (read_segment_index, read_segment) in self.read_structure.iter().enumerate() { + // Extract the bases and qualities for this segment + let (seq, quals) = + read_segment.extract_bases_and_quals(bases, quals).unwrap_or_else(|e| { panic!( - "Error extracting read segment bases or quals from FASTQ record {}; {}", - String::from_utf8(next_fq_rec.head().to_vec()).unwrap(), + "Error extracting bases (len: {}) or quals (len: {}) for the {}th read segment ({}) in read structure ({}) from FASTQ record with name {}; {}", + bases.len(), + quals.len(), + read_segment_index, + read_segment, + self.read_structure, + String::from_utf8(read_name.to_vec()).unwrap(), e ) }); + // Add a new FastqSegment segments.push(FastqSegment { seq: seq.to_vec(), quals: quals.to_vec(), segment_type: read_segment.kind, }); } - Some(ReadSet { header: read_name, segments }) + Some(ReadSet { header: read_name.to_vec(), segments, skip_reason: None }) } else { None } @@ -283,8 +342,9 @@ impl ReadSetIterator { pub fn new( read_structure: ReadStructure, source: FastqReader>, + skip_reasons: Vec, ) -> Self { - Self { read_structure, source } + Self { read_structure, source, skip_reasons } } } @@ -467,7 +527,7 @@ impl DemuxMetric { /// sample barcodes extracted from the bases in the sample metadata and associated read structures. /// /// An observed barcode matches an expected barcocde if all the following are true: -/// 1. The number of mismatches (edits/substitutions) is less than or equal to the maximum +/// 1. The number of mismatches (edits/substitutions) is less than or equal to the maximum /// mismatches (see `--max-mismatches`). /// 2. The difference between number of mismatches in the best and second best barcodes is greater /// than or equal to the minimum mismatch delta (`--min-mismatch-delta`). @@ -551,6 +611,14 @@ pub(crate) struct Demux { /// The level of compression to use to compress outputs. #[clap(long, short = 'c', default_value = "5")] compression_level: usize, + + /// Skip demultiplexing reads for any of the following reasons, otherwise panic. + /// + /// 1. `too-few-bases`: there are too few bases or qualities to extract given the read + /// structures. For example, if a read is 8bp long but the read structure is `10B`, or + /// if a read is empty and the read structure is `+T`. + #[clap(long, short = 'S')] + skip_reasons: Vec, } impl Demux { @@ -768,6 +836,7 @@ impl Demux { } impl Command for Demux { + #[allow(clippy::too_many_lines)] /// Executes the demux command fn execute(&self) -> Result<()> { let (fq_readers, output_segment_types) = self.validate_and_prepare_inputs()?; @@ -819,7 +888,8 @@ impl Command for Demux { let mut fq_iterators = fq_sources .zip(self.read_structures.clone().into_iter()) .map(|(source, read_structure)| { - ReadSetIterator::new(read_structure, source).read_ahead(1000, 1000) + ReadSetIterator::new(read_structure, source, self.skip_reasons.clone()) + .read_ahead(1000, 1000) }) .collect::>(); @@ -831,7 +901,7 @@ impl Command for Demux { .count_formatter(CountFormatterKind::Comma) .level(log::Level::Info) .build(); - + let mut skip_reasons: HashMap = HashMap::new(); loop { let mut next_read_sets = Vec::with_capacity(fq_iterators.len()); for iter in &mut fq_iterators { @@ -839,7 +909,13 @@ impl Command for Demux { next_read_sets.push(rec); } } - if next_read_sets.is_empty() { + // Either skip the reason if any FASTQ record could not be destructured, or break the + // loop if no read were found indicating EOF. + if let Some(reason) = next_read_sets.iter().find_map(|read_set| read_set.skip_reason) { + let previous = skip_reasons.get(&reason).unwrap_or(&0); + skip_reasons.insert(reason, 1 + previous); + continue; + } else if next_read_sets.is_empty() { break; } assert_eq!( @@ -866,6 +942,15 @@ impl Command for Demux { pool.stop_pool()?; info!("Output FASTQ writing complete."); + // Write out reasons for skipping records + if skip_reasons.is_empty() { + info!("No records were skipped."); + } else { + for (reason, count) in skip_reasons.iter().sorted_by_key(|(_, c)| *c) { + info!("{} records were skipped due to {}", count, reason); + } + } + // Write out the metrics let metrics_path = self.output.join("demux-metrics.txt"); DemuxMetric::update(sample_metrics.as_mut_slice(), &mut unmatched_metric); @@ -967,6 +1052,10 @@ mod tests { ); } + fn read_set(segments: Vec) -> ReadSet { + ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments, skip_reason: None } + } + // ############################################################################################ // Test that ``Demux:: execute`` can succeed. // ############################################################################################ @@ -1000,6 +1089,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); } @@ -1042,6 +1132,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); } @@ -1079,6 +1170,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; let demux_result = demux_inputs.execute(); permissions.set_readonly(false); @@ -1116,6 +1208,7 @@ mod tests { min_mismatch_delta: 2, threads: 2, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); } @@ -1150,6 +1243,7 @@ mod tests { min_mismatch_delta: 2, threads: 2, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); } @@ -1179,6 +1273,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); @@ -1202,12 +1297,14 @@ mod tests { let read_structures = vec![ReadStructure::from_str("10M8B100T").unwrap()]; let s1_barcode = "AAAAAAAA"; let s1_umi = "ATCGATCGAT"; - let sample_metadata = metadata_file( + let sample_metadata = + metadata_file(&tmp, &[s1_barcode, "CCCCCCCC", "GGGGGGGG", "TTTTTTTT"]); + let input_files = vec![fastq_file( &tmp, - &[s1_barcode, "CCCCCCCC", "GGGGGGGG", "TTTTTTTT"], - ); - let input_files = - vec![fastq_file(&tmp, "ex", "ex", &[&(s1_umi.to_owned() + &*s1_barcode.to_owned() + &"A".repeat(100))])]; + "ex", + "ex", + &[&(s1_umi.to_owned() + &*s1_barcode.to_owned() + &"A".repeat(100))], + )]; let output_dir = tmp.path().to_path_buf().join("output"); @@ -1222,6 +1319,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); @@ -1303,6 +1401,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); @@ -1354,7 +1453,7 @@ mod tests { fastq_file(&tmp, "ex_I2", "ex", &[&s1_barcode[8..]]), ]; - let output_dir = tmp.path().to_path_buf().join("output"); + let output_dir: PathBuf = tmp.path().to_path_buf().join("output"); let demux = Demux { inputs: input_files, @@ -1367,6 +1466,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux.execute().unwrap(); @@ -1430,6 +1530,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux.execute().unwrap(); @@ -1493,6 +1594,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); @@ -1566,6 +1668,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); } @@ -1599,6 +1702,7 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); } @@ -1632,10 +1736,103 @@ mod tests { min_mismatch_delta: 2, threads: 5, compression_level: 5, + skip_reasons: vec![], + }; + demux_inputs.execute().unwrap(); + } + + #[test] + #[should_panic( + expected = "Read ex_2 had too few bases to demux 0 vs. 1 needed in read structure +T." + )] + fn test_fails_if_reads_too_short() { + let tmp = TempDir::new().unwrap(); + let read_structures = + vec![ReadStructure::from_str("+T").unwrap(), ReadStructure::from_str("7B").unwrap()]; + + let records = vec![ + vec!["AAAAAAA", &SAMPLE1_BARCODE[0..7]], // barcode too short + vec!["CCCCCCC", SAMPLE1_BARCODE], // barcode the correct length + vec!["", SAMPLE1_BARCODE], // template basese too short + vec!["G", SAMPLE1_BARCODE], // barcode the correct length + ]; + + let input_files = vec![ + fastq_file(&tmp, "read1", "ex", &[records[0][0], records[1][0], records[2][0]]), + fastq_file(&tmp, "index1", "ex", &[records[0][1], records[1][1], records[2][1]]), + ]; + let sample_metadata = metadata(&tmp); + let output_dir: PathBuf = tmp.path().to_path_buf().join("output"); + + let demux_inputs = Demux { + inputs: input_files, + read_structures, + sample_metadata, + output_types: vec!['T', 'B'], + output: output_dir.clone(), + unmatched_prefix: "unmatched".to_owned(), + max_mismatches: 1, + min_mismatch_delta: 2, + threads: 5, + compression_level: 5, + skip_reasons: vec![], }; demux_inputs.execute().unwrap(); } + #[test] + fn test_skip_reads_too_short() { + let tmp = TempDir::new().unwrap(); + let read_structures = + vec![ReadStructure::from_str("+T").unwrap(), ReadStructure::from_str("7B").unwrap()]; + + let records = vec![ + vec!["AAAAAAA", &SAMPLE1_BARCODE[0..7]], // barcode too short + vec!["CCCCCCC", SAMPLE1_BARCODE], // barcode the correct length + vec!["", SAMPLE1_BARCODE], // template basese too short + vec!["G", SAMPLE1_BARCODE], // barcode the correct length + ]; + + let input_files = vec![ + fastq_file(&tmp, "read1", "ex", &[records[0][0], records[1][0], records[2][0]]), + fastq_file(&tmp, "index1", "ex", &[records[0][1], records[1][1], records[2][1]]), + ]; + let sample_metadata = metadata(&tmp); + let output_dir: PathBuf = tmp.path().to_path_buf().join("output"); + + let demux_inputs = Demux { + inputs: input_files, + read_structures, + sample_metadata, + output_types: vec!['T', 'B'], + output: output_dir.clone(), + unmatched_prefix: "unmatched".to_owned(), + max_mismatches: 1, + min_mismatch_delta: 2, + threads: 5, + compression_level: 5, + skip_reasons: vec![SkipReason::TooFewBases], + }; + demux_inputs.execute().unwrap(); + + // Write out the metrics + let metrics_path = output_dir.join("demux-metrics.txt"); + let metrics: Vec = DelimFile::default().read_tsv(&metrics_path).unwrap(); + let demuxed_reads: usize = metrics.iter().map(|m| m.templates).sum(); + let sample0000_reads = + metrics.iter().find(|m| m.sample_id == "Sample0000").unwrap().templates; + assert_eq!(demuxed_reads, 2); + assert_eq!(sample0000_reads, 2); + + let r1_path = output_dir.join("Sample0000.R1.fq.gz"); + let r1_reads = read_fastq(&r1_path); + assert_eq!(r1_reads.len(), 2); + + let r2_path = output_dir.join("Sample0000.I1.fq.gz"); + let r2_reads = read_fastq(&r2_path); + assert_eq!(r2_reads.len(), 2); + } + //////////////////////////////////////////////////////////////////////////// // Tests for ReadSet::write_header_internal //////////////////////////////////////////////////////////////////////////// @@ -1772,7 +1969,7 @@ mod tests { seg("GACCCC".as_bytes(), SegmentType::MolecularBarcode), ]; - let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments }; + let read_set = read_set(segments); assert_eq!(read_set.sample_barcode_sequence(), "GATACAC".as_bytes().to_owned()); } @@ -1789,7 +1986,7 @@ mod tests { seg("CAC".as_bytes(), SegmentType::Template), ]; - let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments }; + let read_set = read_set(segments); assert_eq!(expected, read_set.template_segments().cloned().collect::>()); } @@ -1806,7 +2003,7 @@ mod tests { seg("CAC".as_bytes(), SegmentType::SampleBarcode), ]; - let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments }; + let read_set = read_set(segments); assert_eq!(expected, read_set.sample_barcode_segments().cloned().collect::>()); } @@ -1823,7 +2020,7 @@ mod tests { seg("CAC".as_bytes(), SegmentType::MolecularBarcode), ]; - let read_set = ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments }; + let read_set = read_set(segments); assert_eq!(expected, read_set.molecular_barcode_segments().cloned().collect::>()); } @@ -1836,31 +2033,27 @@ mod tests { seg("C".as_bytes(), SegmentType::MolecularBarcode), seg("T".as_bytes(), SegmentType::SampleBarcode), ]; - let read_set1 = - ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: segments1.clone() }; + let read_set1 = read_set(segments1.clone()); let segments2 = vec![ seg("AA".as_bytes(), SegmentType::Template), seg("AG".as_bytes(), SegmentType::Template), seg("AC".as_bytes(), SegmentType::MolecularBarcode), seg("AT".as_bytes(), SegmentType::SampleBarcode), ]; - let read_set2 = - ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: segments2.clone() }; + let read_set2 = read_set(segments2.clone()); let segments3 = vec![ seg("AAA".as_bytes(), SegmentType::Template), seg("AAG".as_bytes(), SegmentType::Template), seg("AAC".as_bytes(), SegmentType::MolecularBarcode), seg("AAT".as_bytes(), SegmentType::SampleBarcode), ]; - let read_set3 = - ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: segments3.clone() }; + let read_set3 = read_set(segments3.clone()); let mut expected_segments = Vec::new(); expected_segments.extend(segments1); expected_segments.extend(segments2); expected_segments.extend(segments3); - let expected = - ReadSet { header: "NOT_IMPORTANT".as_bytes().to_owned(), segments: expected_segments }; + let expected = read_set(expected_segments); let result = ReadSet::combine_readsets(vec![read_set1, read_set2, read_set3]); diff --git a/src/lib/barcode_matching.rs b/src/lib/barcode_matching.rs index 18eab8f..4372c7c 100644 --- a/src/lib/barcode_matching.rs +++ b/src/lib/barcode_matching.rs @@ -87,6 +87,11 @@ impl BarcodeMatcher { u8::try_from(count).expect("Overflow on number of mismatch bases") } + /// Returns the expected barcode length, assuming a fixed length for all samples. + fn expected_barcode_length(&self) -> usize { + self.sample_barcodes[0].len() + } + /// Assigns the barcode that best matches the provided ``read_bases``. #[must_use] fn assign_internal(&self, read_bases: &[u8]) -> Option { @@ -122,6 +127,10 @@ impl BarcodeMatcher { /// if configured to do so and skipping calculation for reads that cannot match any barcode ( /// due to having too many no-called bases). pub fn assign(&mut self, read_bases: &[u8]) -> Option { + // do not try matching if there are not enough bases + if read_bases.len() < self.expected_barcode_length() { + return None; + } let num_no_calls = read_bases.iter().filter(|&&b| byte_is_nocall(b)).count(); if num_no_calls > self.max_mismatches as usize { None