Skip to content

Commit

Permalink
Fix extract features training code (facebookresearch#157)
Browse files Browse the repository at this point in the history
Summary:
Fix the extract features training code and add tests to ensure that it does not regress

Pull Request resolved: fairinternal/ssl_scaling#157

Reviewed By: prigoyal

Differential Revision: D29232182

Pulled By: QuentinDuval

fbshipit-source-id: f66dfea202168d25e577949729e7d28e83296b73
  • Loading branch information
QuentinDuval authored and facebook-github-bot committed Jun 23, 2021
1 parent 7ad8eca commit bb78ee7
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 39 deletions.
37 changes: 37 additions & 0 deletions configs/config/feature_extraction/dataset/imagenette160.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# @package _global_
config:
DATA:
NUM_DATALOADER_WORKERS: 5
TRAIN:
DATA_SOURCES: [disk_folder]
LABEL_SOURCES: [disk_folder]
DATASET_NAMES: [imagenette_160_folder]
BATCHSIZE_PER_REPLICA: 256
TRANSFORMS:
- name: RandomResizedCrop
size: 128
- name: RandomHorizontalFlip
- name: ToTensor
- name: Normalize
mean: [0.485, 0.456, 0.406]
std: [0.229, 0.224, 0.225]
MMAP_MODE: True
COPY_TO_LOCAL_DISK: False
COPY_DESTINATION_DIR: /tmp/imagenette_160/
TEST:
DATA_SOURCES: [disk_folder]
LABEL_SOURCES: [disk_folder]
DATASET_NAMES: [imagenette_160_folder]
BATCHSIZE_PER_REPLICA: 256
TRANSFORMS:
- name: Resize
size: 160
- name: CenterCrop
size: 128
- name: ToTensor
- name: Normalize
mean: [0.485, 0.456, 0.406]
std: [0.229, 0.224, 0.225]
MMAP_MODE: True
COPY_TO_LOCAL_DISK: False
COPY_DESTINATION_DIR: /tmp/imagenette_160/
15 changes: 15 additions & 0 deletions configs/config/feature_extraction/with_head/rn50_swav.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# @package _global_
config:
MODEL:
FEATURE_EVAL_SETTINGS:
EVAL_MODE_ON: True
FREEZE_TRUNK_AND_HEAD: True
EVAL_TRUNK_AND_HEAD: True
TRUNK:
NAME: resnet
RESNETS:
DEPTH: 50
HEAD:
PARAMS: [
["swav_head", {"dims": [2048, 2048, 128], "use_bn": True, "num_clusters": [3000]}],
]
1 change: 1 addition & 0 deletions dev/run_quick_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ TEST_LIST=(
"test_regnet_fsdp_integration.py"
"test_state_checkpointing.py"
"test_layer_memory_tracking.py"
"test_extract_features.py"
)

echo "========================================================================"
Expand Down
107 changes: 107 additions & 0 deletions tests/test_extract_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright (c) Facebook, Inc. and its affiliates.

# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.


import os
import unittest

from hydra.experimental import compose, initialize_config_module
from vissl.utils.hydra_config import convert_to_attrdict
from vissl.utils.test_utils import (
gpu_test,
in_temporary_directory,
run_integration_test,
)


class TestExtractClusterWorkflow(unittest.TestCase):
@staticmethod
def _create_pretraining_config(num_gpu: int = 2):
with initialize_config_module(config_module="vissl.config"):
cfg = compose(
"defaults",
overrides=[
"config=test/integration_test/quick_swav",
"config.DATA.TRAIN.DATA_SOURCES=[synthetic]",
"config.DATA.TRAIN.DATA_LIMIT=40",
"config.SEED_VALUE=0",
"config.MODEL.AMP_PARAMS.USE_AMP=False",
"config.MODEL.SYNC_BN_CONFIG.CONVERT_BN_TO_SYNC_BN=True",
"config.MODEL.SYNC_BN_CONFIG.SYNC_BN_TYPE=pytorch",
"config.MODEL.AMP_PARAMS.AMP_TYPE=pytorch",
"config.LOSS.swav_loss.epsilon=0.03",
"config.MODEL.FSDP_CONFIG.flatten_parameters=True",
"config.MODEL.FSDP_CONFIG.mixed_precision=False",
"config.MODEL.FSDP_CONFIG.fp32_reduce_scatter=False",
"config.MODEL.FSDP_CONFIG.compute_dtype=float32",
f"config.DISTRIBUTED.NUM_PROC_PER_NODE={num_gpu}",
"config.LOG_FREQUENCY=1",
"config.OPTIMIZER.construct_single_param_group_only=True",
"config.DATA.TRAIN.BATCHSIZE_PER_REPLICA=4",
"config.OPTIMIZER.use_larc=False",
],
)

args, config = convert_to_attrdict(cfg)
return config

@staticmethod
def _create_extract_features_config(checkpoint_path: str, num_gpu: int = 2):
with initialize_config_module(config_module="vissl.config"):
cfg = compose(
"defaults",
overrides=[
"config=feature_extraction/extract_resnet_in1k_8gpu",
"+config/feature_extraction/with_head=rn50_swav",
f"config.MODEL.WEIGHTS_INIT.PARAMS_FILE={checkpoint_path}",
"config.DATA.TRAIN.DATA_SOURCES=[synthetic]",
"config.DATA.TRAIN.LABEL_SOURCES=[synthetic]",
"config.DATA.TEST.DATA_SOURCES=[synthetic]",
"config.DATA.TEST.LABEL_SOURCES=[synthetic]",
"config.DATA.TRAIN.DATA_LIMIT=40",
"config.DATA.TEST.DATA_LIMIT=40",
"config.SEED_VALUE=0",
"config.MODEL.AMP_PARAMS.USE_AMP=False",
"config.MODEL.SYNC_BN_CONFIG.CONVERT_BN_TO_SYNC_BN=True",
"config.MODEL.SYNC_BN_CONFIG.SYNC_BN_TYPE=pytorch",
"config.MODEL.AMP_PARAMS.AMP_TYPE=pytorch",
"config.LOSS.swav_loss.epsilon=0.03",
"config.MODEL.FSDP_CONFIG.flatten_parameters=True",
"config.MODEL.FSDP_CONFIG.mixed_precision=False",
"config.MODEL.FSDP_CONFIG.fp32_reduce_scatter=False",
"config.MODEL.FSDP_CONFIG.compute_dtype=float32",
f"config.DISTRIBUTED.NUM_PROC_PER_NODE={num_gpu}",
"config.LOG_FREQUENCY=1",
"config.OPTIMIZER.construct_single_param_group_only=True",
"config.DATA.TRAIN.BATCHSIZE_PER_REPLICA=4",
"config.OPTIMIZER.use_larc=False",
],
)
args, config = convert_to_attrdict(cfg)
return config

@gpu_test(gpu_count=2)
def test_extract_cluster_assignment_ddp(self):
with in_temporary_directory() as pretrain_dir:

pretrain_config = self._create_pretraining_config()
run_integration_test(pretrain_config)

with in_temporary_directory() as extract_dir:
extract_config = self._create_extract_features_config(
checkpoint_path=os.path.join(pretrain_dir, "checkpoint.torch")
)

run_integration_test(extract_config, engine_name="extract_features")
folder_content = os.listdir(extract_dir)
print(folder_content)
for rank in [0, 1]:
for feat_name in ["heads"]:
for file in [
f"rank{rank}_train_{feat_name}_features.npy",
f"rank{rank}_train_{feat_name}_inds.npy",
f"rank{rank}_train_{feat_name}_targets.npy",
]:
self.assertIn(file, folder_content)
74 changes: 46 additions & 28 deletions vissl/engines/extract_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
import os
from typing import Any, Callable, List

import torch
from classy_vision.hooks import ClassyHook
from vissl.config import AttrDict
from vissl.engines.engine_registry import Engine, register_engine
from vissl.hooks import default_hook_generator
from vissl.trainer import SelfSupervisionTrainer
from vissl.utils.checkpoint import get_checkpoint_folder
from vissl.utils.collect_env import collect_env_info
from vissl.utils.env import get_machine_local_and_dist_rank, set_env_vars
from vissl.utils.env import (
get_machine_local_and_dist_rank,
print_system_env_info,
set_env_vars,
)
from vissl.utils.hydra_config import print_cfg
from vissl.utils.io import save_file
from vissl.utils.logger import setup_logging, shutdown_logging
Expand All @@ -33,11 +37,17 @@ def run_engine(
node_id: int = 0,
hook_generator: Callable[[Any], List[ClassyHook]] = default_hook_generator,
):
extract_main(cfg, dist_run_id, local_rank, node_id)
extract_main(
cfg, dist_run_id, checkpoint_folder, local_rank=local_rank, node_id=node_id
)


def extract_main(
cfg: AttrDict, dist_run_id: str, local_rank: int = 0, node_id: int = 0
cfg: AttrDict,
dist_run_id: str,
checkpoint_folder: str,
local_rank: int = 0,
node_id: int = 0,
):
"""
Sets up and executes feature extraction workflow per machine.
Expand All @@ -58,59 +68,67 @@ def extract_main(
node_id (int): id of the current machine. starts from 0. valid for multi-gpu
"""

# setup logging
setup_logging(__name__)
# setup the environment variables
set_env_vars(local_rank, node_id, cfg)
dist_rank = int(os.environ["RANK"])

# setup logging
setup_logging(__name__, output_dir=checkpoint_folder, rank=dist_rank)

logging.info(f"Env set for rank: {local_rank}, dist_rank: {dist_rank}")
# print the environment info for the current node
if local_rank == 0:
current_env = os.environ.copy()
print_system_env_info(current_env)

# setup the multiprocessing to be forkserver.
# See https://fb.quip.com/CphdAGUaM5Wf
setup_multiprocessing_method(cfg.MULTI_PROCESSING_METHOD)

# set seeds
logging.info("Setting seed....")
dist_rank = int(os.environ["RANK"])
set_seeds(cfg, dist_rank)

# We set the CUDA device here as well as a safe solution for all downstream
# `torch.cuda.current_device()` calls to return correct device.
if cfg.MACHINE.DEVICE == "gpu" and torch.cuda.is_available():
local_rank, _ = get_machine_local_and_dist_rank()
torch.cuda.set_device(local_rank)

# print the training settings and system settings
local_rank, _ = get_machine_local_and_dist_rank()
if local_rank == 0:
print_cfg(cfg)
logging.info("System config:\n{}".format(collect_env_info()))

output_dir = get_checkpoint_folder(cfg)
trainer = SelfSupervisionTrainer(cfg, dist_run_id)
features = trainer.extract()

for split in features.keys():
logging.info(f"============== Split: {split} =======================")
layers = features[split].keys()
for layer in layers:
out_feat_file = (
f"{output_dir}/rank{local_rank}_{split}_{layer}_features.npy"
for layer_name, layer_features in features[split].items():
out_feat_file = os.path.join(
checkpoint_folder, f"rank{dist_rank}_{split}_{layer_name}_features.npy"
)
out_target_file = (
f"{output_dir}/rank{local_rank}_{split}_{layer}_targets.npy"
out_target_file = os.path.join(
checkpoint_folder, f"rank{dist_rank}_{split}_{layer_name}_targets.npy"
)
out_inds_file = f"{output_dir}/rank{local_rank}_{split}_{layer}_inds.npy"
out_inds_file = os.path.join(
checkpoint_folder, f"rank{dist_rank}_{split}_{layer_name}_inds.npy"
)
feat_shape = layer_features["features"].shape
logging.info(
"Saving extracted features: {} {} to: {}".format(
layer, features[split][layer]["features"].shape, out_feat_file
)
f"Saving extracted features of {layer_name} with shape {feat_shape} to: {out_feat_file}"
)
save_file(features[split][layer]["features"], out_feat_file)
save_file(layer_features["features"], out_feat_file)
logging.info(
"Saving extracted targets: {} to: {}".format(
features[split][layer]["targets"].shape, out_target_file
)
f"Saving extracted targets of {layer_name} to: {out_target_file}"
)
save_file(features[split][layer]["targets"], out_target_file)
save_file(layer_features["targets"], out_target_file)
logging.info(
"Saving extracted indices: {} to: {}".format(
features[split][layer]["inds"].shape, out_inds_file
)
f"Saving extracted indices of {layer_name} to: {out_inds_file}"
)
save_file(features[split][layer]["inds"], out_inds_file)
save_file(layer_features["inds"], out_inds_file)

logging.info("All Done!")
# close the logging streams including the filehandlers
shutdown_logging()
6 changes: 2 additions & 4 deletions vissl/models/model_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ def is_feature_extractor_model(model_config):
- trunk is frozen
- number of features specified for features extraction > 0
"""
if (
return (
model_config.FEATURE_EVAL_SETTINGS.EVAL_MODE_ON
and model_config.FEATURE_EVAL_SETTINGS.FREEZE_TRUNK_ONLY
and len(model_config.FEATURE_EVAL_SETTINGS.LINEAR_EVAL_FEAT_POOL_OPS_MAP) > 0
):
return True
return False
)


def get_trunk_output_feature_names(model_config):
Expand Down
11 changes: 4 additions & 7 deletions vissl/trainer/trainer_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,7 @@ def extract(self):
for split in self.task.available_splits:
logging.info(f"Extracting features for partition: {split.lower()}")
self.task.data_iterator = iter(self.task.dataloaders[split.lower()])
features[split.lower()] = self._get_split_features(
feat_names, self.cfg, self.task
)
features[split.lower()] = self._get_split_features(feat_names, self.task)
logging.info(f"Done getting features for partition: {split.lower()}")

if hasattr(self.task, "data_iterator"):
Expand All @@ -372,17 +370,16 @@ def extract(self):
gc.collect()
return features

def _flatten_features_list(self, features: Dict[str, Any]):
@staticmethod
def _flatten_features_list(features: Dict[str, Any]):
assert isinstance(features, list), "features must be of type list"
is_nested = isinstance(features[0], list)
if is_nested:
flat_features_list = [item for sublist in features for item in sublist]
return flat_features_list
return features

def _get_split_features(
self, feat_names: List[str], cfg: AttrDict, task: ClassyTask
):
def _get_split_features(self, feat_names: List[str], task: ClassyTask):
task.model.eval()
logging.info("Model set to eval mode during feature extraction...")

Expand Down

0 comments on commit bb78ee7

Please sign in to comment.