Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
move reproduce to mine.py
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenzek committed May 11, 2020
1 parent b23c1dd commit 9118ce3
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 478 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ test:

test2:
python -m cc_net mine --config config/test_segment.json
python -m cc_net mine --config config/test_segment.json --metadata test_data2/mined
python -m cc_net mine --config config/test_segment.json -p fetch_metadata split
diff \
<(zcat test_data/mined/2019-09/fr_head_0000.json.gz | jq -c 'select(.cc_segment == "crawl-data/CC-MAIN-2019-09/segments/1550247479101.30/wet/CC-MAIN-20190215183319-20190215205319-00000.warc.wet.gz") | {url, perplexity}' | sort) \
<(zcat test_data2/mined/2019-09/CC-MAIN-20190215183319-20190215205319-00000.json.gz | jq -c 'select(.bucket == "head" and .language == "fr") | {url, perplexity}' | sort) \
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,16 @@ You can peak at those files using UNIX tools `zcat` and [`jq`](https://stedolan.

By contributing to `cc_net`, you agree that your contributions will be licensed
under the LICENSE file in the root directory of this source tree.


## Output

```
data/mined_by_segment/{dump}
seg_000_000.json.gz
seg_000_001.json.gz
seg_000_002.json.gz
data/reproduce_by_lang/{dump}
en_head_0000.json.gz
```
5 changes: 2 additions & 3 deletions cc_net/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
import func_argparse

import cc_net.mine
import cc_net.minify


def main():
parser = func_argparse.multi_argparser(
mine=cc_net.mine.get_main_parser(),
reproduce=func_argparse.func_argparser(cc_net.minify.reproduce),
mine=cc_net.mine.get_main_parser("mine"),
reproduce=cc_net.mine.get_main_parser("reproduce"),
)
func_argparse.parse_and_call(parser)

Expand Down
13 changes: 5 additions & 8 deletions cc_net/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class Executor(Protocol):
def __call__(self, function: Callable[..., Optional[str]], *args: Iterable) -> None:
def __call__(self, function: Callable[..., str], *args: Iterable) -> None:
...


Expand Down Expand Up @@ -88,7 +88,7 @@ def __call__(self, *args, **kwargs):
**options,
)

def submit_and_wait(function: Callable[..., Optional[str]], *args: Iterable):
def submit_and_wait(function: Callable[..., str], *args: Iterable):
f_name = function.__name__

assert len(args) > 0, f"No arguments passed to {f_name}"
Expand All @@ -105,15 +105,12 @@ def submit_and_wait(function: Callable[..., Optional[str]], *args: Iterable):
print(f"Started {f_name} in job array {job_array_id} ({len(jobs)} jobs).")
for job in submitit.helpers.as_completed(jobs):
done += 1
print(f"Finished job {job.job_id} ({done} / {total}).")
e = job.exception()
if not e:
message = job.result()
if message is not None:
print(message)
print(f"Finished job {job.job_id} ({done} / {total}).", job.result())
continue

print(f"Failed job {job.job_id}:", e)
print(f"Failed job {job.job_id} ({done} / {total}):", e)
failed_jobs.append(job)

if failed_jobs:
Expand All @@ -137,7 +134,7 @@ def debug_executor(function: Callable[..., Optional[str]], *args: Iterable) -> N
message = function(*x)
except Exception:
try:
import ipdb as pdb
import ipdb as pdb # type: ignore
except ImportError:
import pdb # type: ignore
import traceback
Expand Down
171 changes: 87 additions & 84 deletions cc_net/mine.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class Config(NamedTuple):
config_name
dump: CC dump id
output_dir: where to write the dataset
output_dir: working directory
mined_dir: name of the destination folder, full path will be {ouput_dir}/{mined_dir}/{dump_id}
execution: chose how to parallelize the execution
num_shards: number of shards to split the dump
num_segments_per_shard: allow to download a small portion of CC (eg for tests)
Expand All @@ -74,6 +75,7 @@ class Config(NamedTuple):
config_name: str = "base"
dump: str = "2017-51"
output_dir: Path = Path("data")
mined_dir: str = "mined"
execution: str = "slurm"
num_shards: int = 1600
num_segments_per_shard: int = -1
Expand Down Expand Up @@ -135,7 +137,7 @@ def from_json(cls, json_file: Path) -> "Config":

@property
def will_split(self) -> bool:
return "split" in self.pipeline or "split_by_segment" in self.pipeline
return "split_by_lang" in self.pipeline or "split_by_segment" in self.pipeline

def get_lm_languages(self) -> Sequence[str]:
if self.lm_languages is not None:
Expand All @@ -154,9 +156,18 @@ def _get_dir(self, name: str, regroup: bool = False) -> Path:
return self.output_dir / f"{name}_split" / self.dump
return self.output_dir / name / self.dump

def get_mined_dir(self) -> Path:
return self._get_dir(self.mined_dir)


BASE_CONFIG = Config()

BYLANG_CONFIG = Config(
config_name="by_lang",
mined_dir="mined_by_lang",
pipeline=list(BASE_CONFIG.pipeline[:-1]) + ["split_by_lang"],
)

TEST_CONFIG = BASE_CONFIG._replace(
config_name="test",
dump="2019-09",
Expand All @@ -174,6 +185,7 @@ def _get_dir(self, name: str, regroup: bool = False) -> Path:

PREDEF_CONFIGS = {
"base": BASE_CONFIG,
"by_lang": BYLANG_CONFIG,
"test": TEST_CONFIG,
"test_slurm": TEST_CONFIG._replace(execution="slurm,partition=dev"),
"debug": TEST_CONFIG._replace(config_name="debug", mine_num_processes=0),
Expand Down Expand Up @@ -248,13 +260,12 @@ def _hashes_shard(conf: Config, shard: int, output: Path):

def mine(conf: Config) -> List[Path]:
"""Remove dups, run LID and LMs, and split by lang and quality."""
mined_dir = conf.get_mined_dir() / conf.dump
if conf.will_split:
# Give a directories when splitting
mined_dir = conf.output_dir / "mined_split" / conf.dump
outputs = [mined_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
else:
# Files otherwise
mined_dir = conf.output_dir / "mined" / conf.dump
outputs = [
mined_dir / f"{shard:04d}.json.gz" for shard in range(conf.num_shards)
]
Expand Down Expand Up @@ -370,7 +381,7 @@ def _mine_shard(conf: Config, hashes: List[Path], shard: int, output: Path) -> s
steps["drop"] = perplexity.DropKeys(tok_field)

pattern = str(tmp_output / "{language}_{bucket}.json.gz")
steps["split"] = jsonql.split(pattern=str(pattern), mkdir=True)
steps["split_by_lang"] = jsonql.split(pattern=str(pattern), mkdir=True)

steps["split_by_segment"] = jsonql.split(
split_fn=lambda doc: _get_segment(tmp_output, doc), mkdir=True
Expand All @@ -391,54 +402,6 @@ def _mine_shard(conf: Config, hashes: List[Path], shard: int, output: Path) -> s
return f"Mined {output}"


def reproduce(conf: Config) -> List[Path]:
reproduce_dir = conf._get_dir("reproduce")
reproduce_dir.mkdir(parents=True, exist_ok=True)
if conf.will_split:
# Givedirectories en splitting
outputs = [reproduce_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
else:
# Files otherwise
outputs = [
reproduce_dir / f"{shard:04d}.json.gz" for shard in range(conf.num_shards)
]
missing_outputs = [(shard, o) for shard, o in enumerate(outputs) if not o.exists()]
if not missing_outputs:
return outputs

ex = conf.get_executor("reproduce", timeout_hour=2, mem_gb=2, cpus=2)
ex(_reproduce_shard, repeat(conf), *_transpose(missing_outputs))
return outputs


def _reproduce_shard(conf: Config, shard: int, output: Path) -> str:
from cc_net import transpose

assert conf.metadata is not None
tmp_output = tmp(output)
cc = process_wet_file.CCShardReader(
conf.dump,
shard,
num_shards=conf.num_shards,
num_segments_per_shard=conf.num_segments_per_shard,
cache_dir=conf.cache_dir,
)

unminifier = transpose.LinearUnminifier(conf.metadata / conf.dump)
# TODO: we should look at the conf to see how to split
pipeline: List[jsonql.Transformer] = [unminifier]

if conf.will_split:
pattern = str(tmp_output / "{language}_{bucket}.json.gz")
pipeline.append(jsonql.split(pattern=str(pattern), mkdir=True))

jsonql.run_pipes(
*pipeline, file=cc, output=tmp_output if not conf.will_split else None
)
tmp_output.rename(output)
return f"Unminified {output}"


def regroup(conf: Config, before: Callable[[Config], List[Path]], dirname: str) -> Path:
"""Reshards each language/quality after 'mine'."""
mined_dir = conf.output_dir / f"{dirname}_split" / conf.dump
Expand All @@ -450,7 +413,6 @@ def regroup(conf: Config, before: Callable[[Config], List[Path]], dirname: str)
print(f"No files found in {mined_dir} for regroup. Exiting.")
return regroup_dir

# check that mining is over.
all_files = [f for d in before(conf) for f in d.glob("*.json.gz")]
assert all_files, f"No files found inside mined dir: {mined_dir}"

Expand Down Expand Up @@ -535,7 +497,7 @@ def move_segments(conf: Config, first_stage: Callable, dirname: str) -> Path:
regroup_dir.mkdir(exist_ok=True)
ex = conf.get_executor(f"moveseg_{conf.dump}", mem_gb=1, timeout_hour=1, cpus=2)

def _move_segments(subdir: Path, regroup_dir: Path) -> Optional[str]:
def _move_segments(subdir: Path, regroup_dir: Path) -> str:
n = 0
for f in subdir.iterdir():
if not f.is_file() or f.is_symlink():
Expand All @@ -549,7 +511,7 @@ def _move_segments(subdir: Path, regroup_dir: Path) -> Optional[str]:
f.symlink_to(target)

if n == 0:
return None
return ""

return f"Moved {n} .json.gz files from {subdir} to {regroup_dir}"

Expand Down Expand Up @@ -615,19 +577,70 @@ def dump(x):


def get_main_parser() -> ArgumentParser:
# Generates the 'main' parser by patching a 'Config' parser
p = func_argparse.func_argparser(Config)
def _parser(entry_point: str) -> ArgumentParser:
# Generates the 'main' parser by patching a 'Config' parser
p = func_argparse.func_argparser(Config)

# Override defaults value to None, so we know what was set by the user.
# Note that it will keep the original default values in the help message.
p.set_defaults(**{f: None for f in Config._fields})

p.add_argument("--config", type=str, default="base")
p.set_defaults(__command=main)
p.set_defaults(entry_point=entry_point)
return p

return func_argparse.multi_argparser(
mine=_parser("mine"),
# TODO: we should hide parameters not used in `reproduce`
reproduce=_parser("reproduce"),
)

# Override defaults value to None, so we know what was set by the user.
# Note that it will keep the original default values in the help message.
p.set_defaults(**{f: None for f in Config._fields})

p.add_argument("--config", type=str, default="base")
p.set_defaults(__command=main)
return p
def reproduce(conf: Config) -> List[Path]:
reproduce_dir = conf._get_dir("reproduce")
reproduce_dir.mkdir(parents=True, exist_ok=True)
if conf.will_split:
# Givedirectories en splitting
outputs = [reproduce_dir / f"{shard:04d}" for shard in range(conf.num_shards)]
else:
# Files otherwise
outputs = [
reproduce_dir / f"{shard:04d}.json.gz" for shard in range(conf.num_shards)
]
missing_outputs = [(shard, o) for shard, o in enumerate(outputs) if not o.exists()]
if not missing_outputs:
return outputs

ex = conf.get_executor("reproduce", timeout_hour=2, mem_gb=2, cpus=2)
ex(_reproduce_shard, repeat(conf), *_transpose(missing_outputs))
return outputs

def main(config: str = "base", **config_as_dict: Any) -> None:

def _reproduce_shard(conf: Config, shard: int, output: Path) -> str:
metadata = conf.metadata
if metadata is None and (conf.output_dir / "mined").exists():
# TODO: better default
metadata = conf.output_dir / "mined"
print(f"Will use {metadata} as metadata source")
assert metadata is not None, "Need to set 'metadata' for reproduce"
cc = conf.get_cc_shard(shard)

unminifier = minify.MetadataFetcher(metadata / conf.dump)
# TODO: we should look at the conf to see how to split
pipeline: List[jsonql.Transformer] = [unminifier]

tmp_output = tmp(output)
if conf.will_split:
pattern = str(tmp(output) / "{language}_{bucket}.json.gz")
pipeline.append(jsonql.split(pattern=str(pattern), mkdir=True))

jsonql.run_pipes(*pipeline, file=cc, output=None if conf.will_split else tmp_output)
tmp_output.rename(output)
return f"Unminified {output}"


def main(entry_point: str, config: str = "base", **config_as_dict: Any) -> None:
# Use the given 'config' as default value.
config_base = config
if config_base in PREDEF_CONFIGS:
Expand All @@ -640,32 +653,22 @@ def main(config: str = "base", **config_as_dict: Any) -> None:
f"Choose from ({', '.join(PREDEF_CONFIGS)}) or give an existing .json file."
)
conf = conf._replace(**{k: v for (k, v) in config_as_dict.items() if v is not None})
print("Will run mine.py with the following config:", conf)

# Decide if we need to mine or if we have metadata available

if conf.metadata:
conf = conf._replace(pipeline=["split"])
# this is not very clean. We should either:
# - move back to the reproduce command
# - add an 'unminify' step that read conf.metadata

print(f"Will use pre-computed metadata from {conf.metadata}")
first_stage = reproduce
dir_name = "reproduce"

else:
first_stage = mine
dir_name = "mined"
print(f"Will run cc_net.mine.{entry_point} with the following config:", conf)
first_stage = {"mine": mine, "reproduce": reproduce}[entry_point]
dir_name = entry_point
regroup_dir = conf._get_dir(dir_name, regroup=True)
if "split" in conf.pipeline:
# Only regroup if we split the shards.

if "split_by_lang" in conf.pipeline:
# Only try regrouping if we split the shards.
regroup(conf, first_stage, dir_name)
elif "split_by_segment" in conf.pipeline:
# If we split by segment then regrouping is trivial, since segments appear in only one shard.
move_segments(conf, first_stage, dir_name)
else:
first_stage(conf)

if config_base == "test":
if conf.config_name == "test":
_validate_test(conf, regroup_dir)


Expand Down
Loading

0 comments on commit 9118ce3

Please sign in to comment.