Skip to content

Commit

Permalink
resize datasets and sharded transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Nov 29, 2020
1 parent de4667f commit 9f5ef6b
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 68 deletions.
5 changes: 3 additions & 2 deletions examples/upload_tfds.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

if __name__ == "__main__":
with Timer("Eurosat TFDS"):
out_ds = hub.Dataset.from_tfds('eurosat')
res_ds = out_ds.store("./data/test/tfds_new/eurosat")
out_ds = hub.Dataset.from_tfds("coco", num=100)
res_ds = out_ds.store("./data/test/tfds_new/eurosat")
print(res_ds.shape)
29 changes: 23 additions & 6 deletions hub/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,7 @@ def __init__(
raise SchemaArgumentNotFoundException()
self.schema: HubFeature = featurify(schema)
self.shape = tuple(shape)
self.meta = {
"shape": shape,
"schema": hub.features.serialize.serialize(self.schema),
"version": 1,
}
fs_map["meta.json"] = bytes(json.dumps(self.meta), "utf-8")
self.meta = self._store_meta()
self._flat_tensors = tuple(flatten(self.schema))
self._tensors = dict(self._generate_storage_tensors())
self.flush()
Expand All @@ -188,6 +183,15 @@ def __init__(
self.username, self.dataset_name, self.meta
)

def _store_meta(self) -> dict:
meta = {
"shape": self.shape,
"schema": hub.features.serialize.serialize(self.schema),
"version": 1,
}
self._fs_map["meta.json"] = bytes(json.dumps(meta), "utf-8")
return meta

def _check_and_prepare_dir(self):
"""
Checks if input data is ok.
Expand Down Expand Up @@ -341,6 +345,19 @@ def __setitem__(self, slice_, value):
else:
self._tensors[subpath][slice_list] = value

def resize_shape(self, size: int) -> None:
""" Resize the shape of the dataset by resizing each tensor first dimension"""
self.shape = (size,)
for t in self._tensors.values():
t.resize_shape(size)
self.meta = self._store_meta()
self._update_dataset_state()

def append_shape(self, size: int):
""" Append the shape """
size += self.shape[0]
self.resize_shape(size)

def delete(self):
fs, path = self._fs, self._path
exist_meta = fs.exists(posixpath.join(path, "meta.json"))
Expand Down
8 changes: 8 additions & 0 deletions hub/api/datasetview.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,11 @@ def to_pytorch(self, Transform=None):
return self.dataset.to_pytorch(
Transform=Transform, num_samples=self.num_samples, offset=self.offset
)

def resize_shape(self, size: int) -> None:
"""Resize dataset shape, not DatasetView"""
self.dataset.reisze_shape(size)

def commit(self) -> None:
"""Commit dataset"""
self.dataset.commit()
21 changes: 8 additions & 13 deletions hub/api/tests/test_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,8 @@ def __getitem__(self, idx):
landmarks = 7 * np.ones((10, 10, 10))
named = "testing text labels"
sample = {
"data": {
'image': image,
'landmarks': landmarks
},
"labels": {
"named": named
}
"data": {"image": image, "landmarks": landmarks},
"labels": {"named": named},
}

if self.transform:
Expand All @@ -154,9 +149,9 @@ def __getitem__(self, idx):
tds = TestDataset()
ds = hub.Dataset.from_pytorch(tds)
ds = ds.store("./data/test_from_pytorch/test1")
assert(ds["data", "image", 3].numpy() == 5 * np.ones((50, 50))).all()
assert(ds["data", "landmarks", 2].numpy() == 7 * np.ones((10, 10, 10))).all()
assert(ds["labels", "named", 5].numpy() == "testing text labels")
assert (ds["data", "image", 3].numpy() == 5 * np.ones((50, 50))).all()
assert (ds["data", "landmarks", 2].numpy() == 7 * np.ones((10, 10, 10))).all()
assert ds["labels", "named", 5].numpy() == "testing text labels"


@pytest.mark.skipif(not pytorch_loaded(), reason="requires pytorch to be loaded")
Expand All @@ -174,14 +169,14 @@ def test_to_from_pytorch():
)
for i in range(10):
ds["label", "d", "e", i] = i * np.ones((5, 3))

ds = ds.to_pytorch()
out_ds = hub.Dataset.from_pytorch(ds)
res_ds = out_ds.store(
"./data/test_from_pytorch/test3"
)
res_ds = out_ds.store("./data/test_from_pytorch/test3")
for i in range(10):
assert (res_ds["label", "d", "e", i].numpy() == i * np.ones((5, 3))).all()


if __name__ == "__main__":
# test_from_tensorflow()
test_to_from_pytorch()
12 changes: 12 additions & 0 deletions hub/api/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,20 @@ def test_tensorview_slicing():
assert (tv4.shape == np.array([[12], [12]])).all()


def test_append_dataset():
dt = {"first": Tensor(shape=(250, 300)), "second": "float"}
ds = Dataset(schema=dt, shape=(100,), url="./data/test/model", mode="w")
ds.append_shape(20)
assert len(ds) == 120
assert ds["first"].shape[0] == 120
assert ds["first", 5:10].shape[0] == 5
assert ds["second"].shape[0] == 120


if __name__ == "__main__":
# test_tensorview_slicing()
# test_datasetview_slicing()
# test_dataset()
test_append_dataset()
exit()
test_dataset2()
173 changes: 127 additions & 46 deletions hub/compute/transform.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import os
import posix
import itertools
import zarr
import numpy as np
import math
from psutil import virtual_memory
from typing import Dict, Iterable
from hub.api.dataset import Dataset
from tqdm import tqdm
Expand All @@ -10,12 +14,27 @@
from hub.api.datasetview import DatasetView
from pathos.pools import ProcessPool, ThreadPool
from hub.features import Primitive
from hub.features.features import featurify
import posixpath


try:
from ray.util.multiprocessing import Pool as RayPool
except Exception:
pass
def get_sample_size_in_memory(schema):
"""Given Schema, looks into memory how many samples can fit and returns it"""
schema = featurify(schema)
mem = virtual_memory()
sample_size = 0
for feature in schema._flatten():
shp = list(feature.max_shape)
if len(shp) == 0:
shp = [1]

sz = np.dtype(feature.dtype).itemsize
sample_size += math.prod(shp) * sz

if sample_size > mem.total:
return 1

return mem.total // sample_size


class Transform:
Expand Down Expand Up @@ -52,6 +71,10 @@ def __init__(
elif scheduler == "single":
self.map = map
elif scheduler == "ray":
try:
from ray.util.multiprocessing import Pool as RayPool
except Exception:
pass
self.map = RayPool().map
else:
raise Exception(
Expand Down Expand Up @@ -136,7 +159,21 @@ def _split_list_to_dicts(self, xs):
xs_new[key] = [value]
return xs_new

def upload(self, results, url: str, token: dict, progressbar: bool = True):
def create_dataset(self, url, length=None, token=None):
"""Helper function to creat a dataset"""
shape = (length,)
ds = Dataset(
url,
mode="w",
shape=shape,
schema=self.schema,
token=token,
fs=zarr.storage.MemoryStore() if "tmp" in url else None,
cache=False,
)
return ds

def upload(self, results, ds: Dataset, token: dict, progressbar: bool = True):
"""Batchified upload of results
For each tensor batchify based on its chunk and upload
If tensor is dynamic then still upload element by element
Expand All @@ -154,18 +191,7 @@ def upload(self, results, url: str, token: dict, progressbar: bool = True):
ds: hub.Dataset
Uploaded dataset
"""

shape = (len(list(results.values())[0]),)
ds = Dataset(
url,
mode="w",
shape=shape,
schema=self.schema,
token=token,
fs=zarr.storage.MemoryStore() if "tmp" in url else None,
cache=False,
)

# ds_out = self.create_dataset(url, len(list(results.values())[0]), token)
for key, value in results.items():

length = ds[key].chunksize[0]
Expand All @@ -178,32 +204,32 @@ def upload(self, results, url: str, token: dict, progressbar: bool = True):
def upload_chunk(i_batch):
i, batch = i_batch
if not ds[key].is_dynamic:
if len(batch) != 1:
ds[key, i * length : (i + 1) * length] = batch
batch_length = len(batch)
if batch_length != 1:
ds[key, i * length : i * length + batch_length] = batch
else:
ds[key, i * length] = batch[0]
else:
print(key, len(batch))
for k, el in enumerate(batch):
ds[key, i * length + k] = el

index_batched_values = list(
zip(list(range(len(batched_values))), batched_values)
)
index_batched_values = self._pbar(progressbar)(
index_batched_values,
desc=f"Storing {key} tensor",
total=len(value) // length,
)

# Disable dynamic arrays
ds._tensors[f"/{key}"].disable_dynamicness()
ds.dataset._tensors[f"/{key}"].disable_dynamicness()

list(self.map(upload_chunk, index_batched_values))

# Enable and rewrite shapes
if ds._tensors[f"/{key}"].is_dynamic:
ds._tensors[f"/{key}"].enable_dynamicness()
[ds._tensors[f"/{key}"].set_shape([i], v) for i, v in enumerate(value)]
if ds.dataset._tensors[f"/{key}"].is_dynamic:
ds.dataset._tensors[f"/{key}"].enable_dynamicness()
[
ds.dataset._tensors[f"/{key}"].set_shape([i], v)
for i, v in enumerate(value)
]

ds.commit()
return ds
Expand Down Expand Up @@ -232,6 +258,44 @@ def _unwrap(cls, results):
items.extend(r)
return items

def store_shard(self, ds_in: Iterable, ds_out: Dataset, offset: int, token=None):
"""
Takes a shard of iteratable ds_in, compute and stores in DatasetView
"""

def _func_argd(item):
return self._func(item, **self.kwargs)

ds_in = list(ds_in)
results = self.map(
_func_argd,
ds_in,
)

results = self._unwrap(results)
results = self.map(lambda x: self._flatten_dict(x, schema=self.schema), results)
results = list(results)

results = self._split_list_to_dicts(results)
# print(results)
results_values = list(results.values())
if len(results_values) == 0:
return 0

n_results = len(results_values[0])
if n_results == 0:
return 0

additional = max(offset + n_results - ds_out.shape[0], 0)
ds_out.append_shape(additional)

self.upload(
results,
ds_out[offset : offset + n_results],
token=token,
)
return n_results

def store(
self,
url: str,
Expand Down Expand Up @@ -261,26 +325,43 @@ def store(
uploaded dataset
"""

_ds = ds or self._ds
if isinstance(_ds, Transform):
_ds = _ds.store(
"{}_{}".format(url, _ds._func.__name__),
ds_in = ds or self._ds
if isinstance(ds_in, Transform):
ds_in = ds_in.store(
"{}_{}".format(url, ds_in._func.__name__),
token=token,
progressbar=progressbar,
)

def _func_argd(item):
return self._func(item, **self.kwargs)

results = self.map(
_func_argd, self._pbar(progressbar)(_ds, desc="Computing the transormation")
)
results = self._unwrap(results)
results = self.map(lambda x: self._flatten_dict(x, schema=self.schema), results)
results = self._split_list_to_dicts(results)

ds = self.upload(results, url=url, token=token, progressbar=progressbar)
return ds
# compute sample size
n_samples = get_sample_size_in_memory(self.schema)
n_samples = min(10000, n_samples)
length = len(ds_in) if hasattr(ds_in, "__len__") else n_samples
if length < n_samples:
n_samples = length

ds_out = self.create_dataset(url, length=length, token=token)

start = 0
total = 0
with tqdm(
total=total,
unit_scale=True,
unit=" items",
desc="Computing the transormation",
) as pbar:
while True:
ds_in_shard = itertools.islice(ds_in, start, start + n_samples)
n_results = self.store_shard(ds_in_shard, ds_out, start, token=token)
total += n_results
if n_results < n_samples or n_results == 0:
break
start += n_samples
pbar.update(n_samples)

ds_out.resize_shape(total)
ds_out.commit()
return ds_out

def __len__(self):
return self.shape[0]
Expand Down Expand Up @@ -311,7 +392,7 @@ def __getitem__(self, slice_):
squeeze_dim=isinstance(slice_list[0], int),
)

path = os.path.expanduser("~/.activeloop/tmparray")
path = posixpath.expanduser("~/.activeloop/tmparray")
new_ds = self.store(path, length=num, ds=ds_view, progressbar=False)

index = 1 if len(slice_) > 1 else 0
Expand Down
Loading

0 comments on commit 9f5ef6b

Please sign in to comment.