Skip to content

Commit

Permalink
Add GFA support for uncolored graphs, fixes #20
Browse files Browse the repository at this point in the history
  • Loading branch information
Guilucand committed Nov 1, 2024
1 parent dc91c77 commit e5beab3
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 89 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 24 additions & 1 deletion crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub use ggcat_logging::MessageLevel;
use ggcat_logging::UnrecoverableErrorLogging;
use hashes::MinimizerHashFunctionFactory;
use hashes::{cn_nthash::CanonicalNtHashIteratorFactory, fw_nthash::ForwardNtHashIteratorFactory};
use io::concurrent::structured_sequences::fasta::FastaWriterWrapper;
use io::concurrent::structured_sequences::gfa::GFAWriterWrapper;
use io::concurrent::structured_sequences::StructuredSequenceBackendWrapper;
use io::sequences_stream::fasta::FastaFileSequencesStream;
use io::sequences_stream::GenericSequencesStream;
use parallel_processor::enable_counters_logging;
Expand Down Expand Up @@ -86,6 +89,9 @@ pub struct GGCATConfig {

/// The messages callback, if present, no output will be automatically written to stdout
pub messages_callback: Option<fn(MessageLevel, &str)>,

/// Output GFA format instead of FASTA
pub gfa_output: bool,
}

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -217,6 +223,8 @@ impl GGCATInstance {
min_multiplicity: usize,

extra_elab: ExtraElaboration,

gfa_output: bool,
) -> anyhow::Result<PathBuf> {
let bucketing_hash_dispatch = if forward_only {
<ForwardNtHashIteratorFactory as MinimizerHashFunctionFactory>::dynamic_dispatch_id()
Expand All @@ -236,10 +244,25 @@ impl GGCATInstance {
NonColoredManager::dynamic_dispatch_id()
};

if gfa_output && colors {
anyhow::bail!("GFA output is not supported with colors");
}

let output_mode = if gfa_output {
GFAWriterWrapper::dynamic_dispatch_id()
} else {
FastaWriterWrapper::dynamic_dispatch_id()
};

let temp_dir = create_tempdir(self.0.temp_dir.clone());

let output_file = assembler::dynamic_dispatch::run_assembler(
(bucketing_hash_dispatch, merging_hash_dispatch, colors_hash),
(
bucketing_hash_dispatch,
merging_hash_dispatch,
colors_hash,
output_mode,
),
kmer_length,
minimizer_length.unwrap_or(::utils::compute_best_m(kmer_length)),
debug::DEBUG_ASSEMBLER_FIRST_STEP.lock().clone(),
Expand Down
124 changes: 72 additions & 52 deletions crates/assembler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ use config::{
};
use hashes::{HashFunctionFactory, MinimizerHashFunctionFactory};
use io::concurrent::structured_sequences::binary::StructSeqBinaryWriter;
use io::concurrent::structured_sequences::fasta::FastaWriter;
use io::concurrent::structured_sequences::StructuredSequenceWriter;
use io::concurrent::structured_sequences::fasta::FastaWriterWrapper;
use io::concurrent::structured_sequences::gfa::GFAWriterWrapper;
use io::concurrent::structured_sequences::{
IdentSequenceWriter, StructuredSequenceBackend, StructuredSequenceBackendInit,
StructuredSequenceBackendWrapper, StructuredSequenceWriter,
};
use io::sequences_stream::general::GeneralSequenceBlockData;
use io::{compute_stats_from_input_blocks, generate_bucket_names};
use parallel_processor::buckets::concurrent::BucketsThreadBuffer;
Expand Down Expand Up @@ -49,6 +53,23 @@ pub enum AssemblerStartingStep {
MaximalUnitigsLinks = 6,
}

fn get_writer<
C: IdentSequenceWriter,
L: IdentSequenceWriter,
W: StructuredSequenceBackend<C, L> + StructuredSequenceBackendInit,
>(
output_file: &PathBuf,
) -> W {
match output_file.extension() {
Some(ext) => match ext.to_string_lossy().to_string().as_str() {
"lz4" => W::new_compressed_lz4(&output_file, 2),
"gz" => W::new_compressed_gzip(&output_file, 2),
_ => W::new_plain(&output_file),
},
None => W::new_plain(&output_file),
}
}

#[dynamic_dispatch(BucketingHash = [
hashes::cn_nthash::CanonicalNtHashIteratorFactory,
#[cfg(not(feature = "devel-build"))] hashes::fw_nthash::ForwardNtHashIteratorFactory
Expand All @@ -70,11 +91,15 @@ pub enum AssemblerStartingStep {
], AssemblerColorsManager = [
#[cfg(not(feature = "devel-build"))] colors::bundles::multifile_building::ColorBundleMultifileBuilding,
colors::non_colored::NonColoredManager,
], OutputMode = [
FastaWriterWrapper,
#[cfg(not(feature = "devel-build"))] GFAWriterWrapper
])]
pub fn run_assembler<
BucketingHash: MinimizerHashFunctionFactory,
MergingHash: HashFunctionFactory,
AssemblerColorsManager: ColorsManager,
OutputMode: StructuredSequenceBackendWrapper,
>(
k: usize,
m: usize,
Expand Down Expand Up @@ -336,14 +361,7 @@ pub fn run_assembler<
}

let final_unitigs_file = StructuredSequenceWriter::new(
match output_file.extension() {
Some(ext) => match ext.to_string_lossy().to_string().as_str() {
"lz4" => FastaWriter::new_compressed_lz4(&output_file, 2),
"gz" => FastaWriter::new_compressed_gzip(&output_file, 2),
_ => FastaWriter::new_plain(&output_file),
},
None => FastaWriter::new_plain(&output_file),
},
get_writer::<_, _, OutputMode::Backend<_, _>>(&output_file),
k,
);

Expand All @@ -365,40 +383,44 @@ pub fn run_assembler<
None
};

let (reorganized_reads, _final_unitigs_bucket) = if step
<= AssemblerStartingStep::ReorganizeReads
{
if generate_maximal_unitigs_links || compute_tigs_mode.is_some() {
reorganize_reads::<
BucketingHash,
MergingHash,
AssemblerColorsManager,
StructSeqBinaryWriter<_, _>,
>(
sequences,
reads_map,
temp_dir.as_path(),
compressed_temp_unitigs_file.as_ref().unwrap(),
buckets_count,
)
let (reorganized_reads, _final_unitigs_bucket) =
if step <= AssemblerStartingStep::ReorganizeReads {
if generate_maximal_unitigs_links || compute_tigs_mode.is_some() {
reorganize_reads::<
BucketingHash,
MergingHash,
AssemblerColorsManager,
StructSeqBinaryWriter<_, _>,
>(
sequences,
reads_map,
temp_dir.as_path(),
compressed_temp_unitigs_file.as_ref().unwrap(),
buckets_count,
)
} else {
reorganize_reads::<
BucketingHash,
MergingHash,
AssemblerColorsManager,
OutputMode::Backend<_, _>,
>(
sequences,
reads_map,
temp_dir.as_path(),
&final_unitigs_file,
buckets_count,
)
}
} else {
reorganize_reads::<BucketingHash, MergingHash, AssemblerColorsManager, FastaWriter<_, _>>(
sequences,
reads_map,
temp_dir.as_path(),
&final_unitigs_file,
buckets_count,
(
generate_bucket_names(temp_dir.join("reads_bucket"), buckets_count, Some("tmp")),
(generate_bucket_names(temp_dir.join("reads_bucket_lonely"), 1, Some("tmp"))
.into_iter()
.next()
.unwrap()),
)
}
} else {
(
generate_bucket_names(temp_dir.join("reads_bucket"), buckets_count, Some("tmp")),
(generate_bucket_names(temp_dir.join("reads_bucket_lonely"), 1, Some("tmp"))
.into_iter()
.next()
.unwrap()),
)
};
};

if last_step <= AssemblerStartingStep::ReorganizeReads {
PHASES_TIMES_MONITOR
Expand Down Expand Up @@ -427,7 +449,12 @@ pub fn run_assembler<
k,
);
} else {
build_unitigs::<BucketingHash, MergingHash, AssemblerColorsManager, FastaWriter<_, _>>(
build_unitigs::<
BucketingHash,
MergingHash,
AssemblerColorsManager,
OutputMode::Backend<_, _>,
>(
reorganized_reads,
unitigs_map,
temp_dir.as_path(),
Expand Down Expand Up @@ -483,22 +510,15 @@ pub fn run_assembler<
final_unitigs_file.finalize();

let final_unitigs_file = StructuredSequenceWriter::new(
match output_file.extension() {
Some(ext) => match ext.to_string_lossy().to_string().as_str() {
"lz4" => FastaWriter::new_compressed_lz4(&output_file, 2),
"gz" => FastaWriter::new_compressed_gzip(&output_file, 2),
_ => FastaWriter::new_plain(&output_file),
},
None => FastaWriter::new_plain(&output_file),
},
get_writer::<_, _, OutputMode::Backend<_, _>>(&output_file),
k,
);

build_maximal_unitigs_links::<
BucketingHash,
MergingHash,
AssemblerColorsManager,
FastaWriter<_, _>,
OutputMode::Backend<_, _>,
>(temp_path, temp_dir.as_path(), &final_unitigs_file, k);
final_unitigs_file.finalize();
}
Expand Down
8 changes: 4 additions & 4 deletions crates/assembler/src/pipeline/maximal_unitig_links.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub fn build_maximal_unitigs_links<

let buckets_count = 1 << DEFAULT_BUCKET_HASHES_SIZE_LOG;


let self_complemental_unitigs = DashSet::new();

// Hash all the extremities
Expand Down Expand Up @@ -137,7 +136,7 @@ pub fn build_maximal_unitigs_links<
let first_hash_unx = first_hash.to_unextendable();
let last_hash_unx = last_hash.to_unextendable();

let self_complemental = (first_hash_unx == last_hash_unx)
let self_complemental = (first_hash_unx == last_hash_unx)
&& (first_hash.is_rc_symmetric() || (first_hash.is_forward() != last_hash.is_forward()));

if self_complemental {
Expand Down Expand Up @@ -308,8 +307,9 @@ pub fn build_maximal_unitigs_links<

// Rewrite the output file to include found links
{
let self_complemental_unitigs = self_complemental_unitigs.into_iter().collect::<HashSet<_>>();

let self_complemental_unitigs = self_complemental_unitigs
.into_iter()
.collect::<HashSet<_>>();

PHASES_TIMES_MONITOR
.write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,28 @@ impl IdentSequenceWriter for DoubleMaximalUnitigLinks {
}

#[allow(unused_variables)]
fn write_as_gfa(&self, stream: &mut impl Write, extra_buffer: &Self::TempBuffer) {
todo!()
fn write_as_gfa(
&self,
k: u64,
index: u64,
stream: &mut impl Write,
extra_buffer: &Self::TempBuffer,
) {
for entries in &self.links {
let entries = entries.entries.get_slice(extra_buffer);
for entry in entries {
writeln!(
stream,
"L\t{}\t{}\t{}\t{}\t{}M",
index,
if entry.flags.flip_current() { "-" } else { "+" },
entry.index,
if entry.flags.flip_other() { "-" } else { "+" },
k - 1
)
.unwrap();
}
}
}

fn parse_as_ident<'a>(_ident: &[u8], _extra_buffer: &mut Self::TempBuffer) -> Option<Self> {
Expand Down
14 changes: 9 additions & 5 deletions crates/cmdline/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ struct AssemblerArgs {

#[structopt(flatten)]
pub common_args: CommonArgs,

/// Output the graph in GFA format
#[structopt(short = "h", long = "gfa")]
pub gfa_output: bool,
}

#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -240,7 +244,7 @@ struct QueryArgs {
// #[cfg(feature = "mem-analysis")]
// static DEBUG_ALLOCATOR: DebugAllocator = DebugAllocator::new();

fn initialize(args: &CommonArgs, out_file: &PathBuf) -> &'static GGCATInstance {
fn initialize(args: &CommonArgs, out_file: &PathBuf, gfa_output: bool) -> &'static GGCATInstance {
let instance = GGCATInstance::create(GGCATConfig {
temp_dir: Some(args.temp_dir.clone()),
memory: args.memory,
Expand All @@ -249,6 +253,7 @@ fn initialize(args: &CommonArgs, out_file: &PathBuf) -> &'static GGCATInstance {
intermediate_compression_level: args.intermediate_compression_level,
stats_file: Some(out_file.with_extension("stats.log")),
messages_callback: None,
gfa_output,
});

ggcat_api::debug::DEBUG_KEEP_FILES.store(args.keep_temp_files, Ordering::Relaxed);
Expand Down Expand Up @@ -428,6 +433,7 @@ fn run_assembler_from_args(instance: &GGCATInstance, args: AssemblerArgs) {
} else {
ExtraElaboration::None
},
args.gfa_output,
)
.unwrap();

Expand Down Expand Up @@ -501,7 +507,7 @@ fn main() {
&["ix86arch::INSTRUCTION_RETIRED", "ix86arch::LLC_MISSES"],
);

let instance = initialize(&args.common_args, &args.output_file);
let instance = initialize(&args.common_args, &args.output_file, args.gfa_output);

run_assembler_from_args(&instance, args);
}
Expand All @@ -526,8 +532,6 @@ fn main() {
return; // Skip final memory deallocation
}
CliArgs::Query(args) => {
initialize(&args.common_args, &args.output_file_prefix);

if !args.colors && args.colored_query_output_format.is_some() {
println!("Warning: colored query output format is specified, but the graph is not colored");
}
Expand All @@ -537,7 +541,7 @@ fn main() {
&["ix86arch::INSTRUCTION_RETIRED", "ix86arch::LLC_MISSES"],
);

let instance = initialize(&args.common_args, &args.output_file_prefix);
let instance = initialize(&args.common_args, &args.output_file_prefix, false);

let output_file_name = run_querier_from_args(&instance, args);
println!("Final output saved to: {}", output_file_name.display());
Expand Down
8 changes: 7 additions & 1 deletion crates/colors/src/managers/multiple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,13 @@ impl IdentSequenceWriter for UnitigColorData {
}

#[allow(unused_variables)]
fn write_as_gfa(&self, stream: &mut impl Write, extra_buffer: &Self::TempBuffer) {
fn write_as_gfa(
&self,
_k: u64,
_index: u64,
stream: &mut impl Write,
extra_buffer: &Self::TempBuffer,
) {
if self.slice.len() > 0 {
write!(stream, "CS",).unwrap();
}
Expand Down
Loading

0 comments on commit e5beab3

Please sign in to comment.