-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.nf
119 lines (87 loc) · 5.98 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
nextflow.enable.dsl = 2
//================================================================================
// Include sub-workflows/modules and (soft) override workflow-level parameters
//================================================================================
include { CALL_WF } from './workflows/call_wf.nf'
include { VALIDATE_FASTQS_WF } from './workflows/validate_fastqs_wf.nf'
include { MAP_WF } from './workflows/map_wf.nf'
include { MERGE_WF } from './workflows/merge_wf.nf'
include { MINOR_VARIANT_ANALYSIS_WF } from './workflows/minor_variant_analysis_wf.nf'
include { QUALITY_CHECK_WF } from './workflows/quality_check_wf.nf'
include { REPORTS_WF } from './workflows/reports_wf.nf'
//================================================================================
// Main workflow
//================================================================================
workflow {
if (params.only_validate_fastqs) {
VALIDATE_FASTQS_WF(params.input_samplesheet)
} else {
validated_reads_ch = VALIDATE_FASTQS_WF( params.input_samplesheet )
QUALITY_CHECK_WF( validated_reads_ch )
MAP_WF( validated_reads_ch )
CALL_WF( MAP_WF.out.sorted_reads_ch )
MINOR_VARIANT_ANALYSIS_WF(CALL_WF.out.reformatted_lofreq_vcfs_tuple_ch)
//---------------------------------------------------------------------------------
// Filter the approved samples
//---------------------------------------------------------------------------------
//NOTE: Read the approved_samples tsv file and isolate the names of the approved samples
approved_samples_minor_variants_ch = MINOR_VARIANT_ANALYSIS_WF.out.approved_samples_ch
.splitCsv(header: false, skip: 1, sep: '\t' )
.map { row -> [ row.first() ] }
.collect()
.dump(tag:'MAIN: approved_samples_minor_variants_ch', pretty: true)
/* .view {"\n\n XBS-NF-LOG approved_samples_minor_variants_ch : $it \n\n"} */
//NOTE: Reshape the flattened output of gvch_ch into the tuples of [sampleName, gvcf, gvcf.tbi]
collated_gvcfs_ch = CALL_WF.out.gvcf_ch
.flatten()
.collate(3)
.dump(tag:'MAIN: collated_gvcfs_ch', pretty: true)
/* .view {"\n\n XBS-NF-LOG collated_gvcfs_ch : $it \n\n"} */
//.collectFile(name: "$params.outdir/collated_gvcfs_ch.txt")
//FIXME: Refactor this to emit two different files and use only the approved samples
//NOTE: Use the stats file for the entire cohort (from CALL_WF)
// and filter out the samples which pass all thresholds
approved_call_wf_samples_ch = CALL_WF.out.cohort_stats_tsv
.splitCsv(header: false, skip: 1, sep: '\t' )
.map { row -> [
row.first(), // SAMPLE
row.last().toInteger() // ALL_THRESHOLDS_MET
]
}
.filter { it[1] == 1} // Filter out samples which meet all the thresholds
.map { [ it[0] ] }
.dump(tag:'MAIN approved_call_wf_samples_ch', pretty: true)
/* approved_call_wf_samples_ch */
/* .collect() */
/* .dump(tag:'approved_call_wf_samples_ch.collect()') */
/* .view {"\n\n XBS-NF-LOG approved_call_wf_samples_ch.collect() : $it \n\n"} */
//NOTE: Join the approved samples from MINOR_VARIANT_ANALYSIS_WF and CALL_WF
fully_approved_samples_ch = approved_samples_minor_variants_ch
.join(approved_call_wf_samples_ch)
.flatten()
.dump(tag:'MAIN fully_approved_samples_ch', pretty: true)
/* .view {"\n\n XBS-NF-LOG fully_approved_samples_ch : $it \n\n"} */
//.collect()
//.collectFile(name: "$params.outdir/approved_samples_ch.txt")
//NOTE: Join the fully approved samples with the gvcf channel to select files for MERGE_WF
selected_gvcfs_ch = collated_gvcfs_ch.join(fully_approved_samples_ch)
.flatten()
.dump(tag:'MAIN selected_gvcfs_ch', pretty: true)
//NOTE: Join the fully approved samples with the gvcf channel to select files for MERGE_WF
filtered_selected_gvcfs_ch = selected_gvcfs_ch
.filter { it -> {
(it.class.name == "sun.nio.fs.UnixPath")
|| (it.class.name == "nextflow.cloud.azure.nio.AzPath")
|| (it.class.name == "com.upplication.s3fs.S3Path")
|| (it.class.name == "com.google.cloud.storage.contrib.nio.CloudStoragePath")
} }
.collect()
.dump(tag:'MAIN filtered_selected_gvcfs_ch', pretty: true)
//.collectFile(name: "$params.outdir/selected_gvcfs_ch")
//---------------------------------------------------------------------------------
MERGE_WF(filtered_selected_gvcfs_ch, CALL_WF.out.reformatted_lofreq_vcfs_tuple_ch)
REPORTS_WF(QUALITY_CHECK_WF.out.reports_fastqc_ch,
MINOR_VARIANT_ANALYSIS_WF.out.minor_variants_results_ch,
MERGE_WF.out.major_variants_results_ch)
}
}