Skip to content

Commit

Permalink
bug fixes and jsonl command
Browse files Browse the repository at this point in the history
  • Loading branch information
kdm9 committed Apr 2, 2024
1 parent 2b5acb2 commit 1a5bc83
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 39 deletions.
29 changes: 16 additions & 13 deletions blsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

cmds = {}


from .telogrep import telogrep_main
cmds["telogrep"] = telogrep_main

Expand All @@ -22,13 +23,12 @@
from .mask2bed import mask2bed_main
cmds["mask2bed"] = mask2bed_main

from .genigvjs import genigvjs_main
cmds["genigvjs"] = genigvjs_main


from .pansn_rename import main as pansn_rename_main
cmds["pansn-rename"] = pansn_rename_main

from .genigvjs import genigvjs_main
cmds["genigvjs"] = genigvjs_main

from .ildemux import main as ildemux_main
cmds["ildemux"] = ildemux_main

Expand Down Expand Up @@ -62,32 +62,31 @@
from .farename import farename_main
cmds["farename"] = farename_main

from .ebiosra2rl2s import main as rl2s_main
cmds["ebiosra2rl2s"] = rl2s_main

from .galhist import main as galhist_main
cmds["galhist"] = galhist_main

from .gffcat import gffcat_main
cmds["gffcat"] = gffcat_main

from .gffparse import gffparse_main
cmds["gffparse"] = gffparse_main


from .gffcsqify import main as gffcsqify_main
cmds["gffcsqify"] = gffcsqify_main

from .gfftagsane import main as gfftagsane_main
cmds["gfftagsane"] = gfftagsane_main

from .liftoff_gff3 import liftoff_gff3_main
cmds["liftoff-gff3"] = liftoff_gff3_main

from .gfftagsane import main as gfftagsane_main
cmds["gfftagsane"] = gfftagsane_main
from .ebiosra2rl2s import main as rl2s_main
cmds["ebiosra2rl2s"] = rl2s_main

from .galhist import main as galhist_main
cmds["galhist"] = galhist_main

from .pairslash import main as pairslash_main
cmds["pairslash"] = pairslash_main


try:
from .vcfstats import main as vcfstats_main
cmds["vcfstats"] = vcfstats_main
Expand All @@ -104,6 +103,10 @@
from .fastasanitiser import main as fastasanitiser_main
cmds["fastasanitiser"] = fastasanitiser_main

from .jsonl2csv import main as jsonl2csv_main
cmds["jsonl2csv"] = jsonl2csv_main


def mainhelp(argv=None):
"""Print this help message"""
print("USAGE: blsl <subtool> [options...]\n\n")
Expand Down
Empty file modified blsl/gffcsqify.py
100755 → 100644
Empty file.
57 changes: 57 additions & 0 deletions blsl/jsonl2csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import argparse
import random
import time
import os
from tqdm import tqdm
try:
import orjson as json
except ImportError:
import json

def main(argv=None):
ap = argparse.ArgumentParser()
ap.add_argument("--fraction", "-f", type=float,
help="Subsample this fraction of rows (0-1)")
ap.add_argument("--seed", type=float, default=time.time() + os.getpid(),
help="Seed the RNG with this seed (seeded adequately by default)")
ap.add_argument("--header-same", "-s", action="store_true",
help="Assume all rows have the same header, just use the first row's header")
ap.add_argument("--out-csv", "-o", type=argparse.FileType("wt"), default="/dev/stdout",
help="Output file")
ap.add_argument("in_json")

args=ap.parse_args(argv)
header = set()
first = True

rand = random.Random(args.seed)
with open(args.in_json) as fh:
for line in tqdm(fh, desc="Read Headers", unit="lines"):
if first or args.fraction is None or rand.random() < args.fraction:
dat = json.loads(line)
for key in dat:
header.add(key)
if first and args.header_same:
break
first = False

try:
sep = "\t" if args.out_csv.filename.endswith("tsv") else ","
except:
sep = ","
rand = random.Random(args.seed)
print(*header, sep=sep, file=args.out_csv)
with open(args.in_json) as fh:
for line in tqdm(fh, desc="Read Data", unit="lines"):
if args.fraction is None or rand.random() < args.fraction:
dat = json.loads(line)
outline = [dat.get(key, "") for key in header]
print(*outline, sep=sep, file=args.out_csv)


if __name__ == "__main__":
main()
Empty file modified blsl/vcfparallel.py
100755 → 100644
Empty file.
49 changes: 23 additions & 26 deletions blsl/vcfstats.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
#!/usr/bin/env python3
from tqdm import tqdm
from cyvcf2 import VCF
import pandas as pd

import sys
from sys import stdin, stdout, stderr
from subprocess import Popen, PIPE
import argparse
import math
import json
from concurrent.futures import ProcessPoolExecutor, as_completed

def parallel_regions(vcf, cores=1):
V = VCF(vcf)
# 10 chunks per chrom per core
chunks = len(V.seqlens)*10*cores
chunks = len(V.seqlens)*100*cores
for cname, clen in zip(V.seqnames, V.seqlens):
chunk = int(math.ceil(clen/chunks))
chunk = 10000#int(math.ceil(clen/chunks))
for start in range(0, clen, chunk):
s = start+1
e = start+chunk+1
yield f"{cname}:{s}-{e}"

def variant2dict(v):
def variant2dict(v, fields=None):
for i, alt in enumerate(v.ALT):
dat = {"CHROM": v.CHROM, "POS": v.POS, "REF": v.REF, "ALT": alt, "QUAL": v.QUAL}
dat["call_rate"] = v.call_rate
Expand All @@ -31,38 +31,35 @@ def variant2dict(v):
if isinstance(val, tuple) or isinstance(val, list):
val = val[i]
dat[f"INFO_{key}"] = val
yield dat
if fields:
dat = {K:V for K, V in dat.items() if K in fields}
yield json.dumps(dat)

def bcftools_info_with_tags(vbcf):
res = []
cmd=f"bcftools +fill-tags {vbcf} -Ou -- -d -t all,F_MISSING"
with Popen(cmd, shell=True, stdout=PIPE) as proc:
for v in tqdm(VCF(proc.stdout)):
for r in variant2dict(v):
res.append(r)
return res

def one_chunk_stats(vcf, chunk, fill=True):
def one_chunk_stats(vcf, chunk, fill=True, fields=None):
cmd=f"bcftools view -r {chunk} {vcf} -Ou"
if fill:
cmd = f"{cmd} | bcftools +fill-tags - -Ou -- -d -t all,F_MISSING"
res = []
with Popen(cmd, shell=True, stdout=PIPE) as proc:
for v in VCF(proc.stdout):
for r in variant2dict(v):
vcf = VCF(proc.stdout)
for v in vcf:
for r in variant2dict(v, fields=fields):
res.append(r)
del vcf
return res

def chunkwise_bcfools_stats(args):
with ProcessPoolExecutor(args.threads) as exc:
jobs = []
for region in parallel_regions(args.vcf):
jobs.append(exc.submit(one_chunk_stats, args.vcf, region, fill=args.fill_tags_first))
for job in tqdm(as_completed(jobs), total=len(jobs), unit="chunk"):
for res in job.result()
if args.fields:
res = {k: dat[k] for k in fields if k in dat}
print(json.dumps(res), file=args.output)
regions = list(parallel_regions(args.vcf))
with tqdm(total=len(regions), unit="chunk") as pbar:
for i in range(0, len(regions), 10000):
to=min(len(regions), i+10000)
with ProcessPoolExecutor(args.threads) as exc:
jobs = (exc.submit(one_chunk_stats, args.vcf, region, fill=args.fill_tags_first, fields=args.fields) for region in regions[i:to])
for job in as_completed(jobs):
pbar.update(1)
for res in job.result():
args.output.write(res)
args.output.write("\n")


def main(argv=None):
Expand Down

0 comments on commit 1a5bc83

Please sign in to comment.