forked from kduxin/firelang
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Move parse_shape and parse_index to utils.
- Loading branch information
Showing
3 changed files
with
118 additions
and
106 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from __future__ import annotations | ||
from typing import List, Tuple, Union, Any | ||
import numpy as np | ||
import torch | ||
from torch import Tensor | ||
|
||
IndexLike = Union[ | ||
int, slice, List, Tensor, None, Tuple[Union[int, slice, List, Tensor, None]] | ||
] | ||
|
||
|
||
def parse_index(index: IndexLike, shape: Tuple[int]) -> Tensor: | ||
if not isinstance(index, tuple): | ||
index = (index,) | ||
index = _complete_ellipsis(index, ndim=len(shape)) | ||
|
||
nindex = len(index) | ||
|
||
nindex_notnan = len([idx for idx in index if idx is not None]) | ||
stride = int(np.prod(shape[nindex_notnan:])) | ||
|
||
ids = torch.tensor([0], dtype=torch.long) | ||
slice_shape = [] | ||
dim = len(index) - 1 | ||
shape_dim = nindex_notnan - 1 | ||
for dim in range(nindex - 1, -1, -1): | ||
if index[dim] is None: | ||
slice_shape.append(1) | ||
continue | ||
|
||
index_at_dim = index[dim] | ||
size_at_dim = shape[shape_dim] | ||
if isinstance(index_at_dim, int): | ||
ids = ids + index_at_dim * stride | ||
elif isinstance(index_at_dim, slice): | ||
offsets = torch.arange(size_at_dim)[index_at_dim] * stride | ||
ids = (offsets[:, None] + ids[None, :]).reshape(-1) | ||
slice_shape.append(len(offsets)) | ||
elif isinstance(index_at_dim, list): | ||
offsets = torch.tensor(index_at_dim) * stride | ||
ids = (offsets[:, None] + ids[None, :]).reshape(-1) | ||
slice_shape.append(len(offsets)) | ||
elif isinstance(index_at_dim, Tensor): | ||
assert index_at_dim.ndim == 1, ( | ||
f"Index at dimension {dim} should be 1-dimensional, " | ||
f"not {index_at_dim.ndim}-d." | ||
) | ||
ids = ids.to(index_at_dim.device) | ||
offsets = index_at_dim * stride | ||
ids = (offsets[:, None] + ids[None, :]).reshape(-1) | ||
slice_shape.append(len(offsets)) | ||
else: | ||
raise TypeError( | ||
f"Index at dimension {dim} should be " | ||
f"a `int`, a `slice`, or a `Tensor`, not {type(index_at_dim)}" | ||
) | ||
|
||
stride *= size_at_dim | ||
shape_dim -= 1 | ||
|
||
slice_shape = list(reversed(slice_shape)) | ||
return ids.reshape(slice_shape) | ||
|
||
|
||
def _complete_ellipsis(index: Tuple[Any | Ellipsis], ndim: int): | ||
num_ellip = index.count(Ellipsis) | ||
assert num_ellip <= 1, f"Invalid index {index}" | ||
if num_ellip == 0: | ||
return index | ||
|
||
i = index.index(Ellipsis) | ||
completed = ( | ||
list(index[:i]) + [slice(None)] * (ndim - len(index) + 1) + list(index[i + 1 :]) | ||
) | ||
return tuple(completed) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,45 @@ | ||
from typing import Tuple | ||
from __future__ import annotations | ||
from typing import Tuple, Iterable | ||
import numpy as np | ||
|
||
|
||
def check_shape_consistency(shape1: Tuple[int], shape2: Tuple[int]): | ||
assert len(shape1) == len( | ||
shape2 | ||
), f"Shape inconsistent in number of dimension between {shape1} and {shape2}." | ||
for d1, d2 in zip(shape1, shape2): | ||
if d1 != 1 and d2 != 1 and d1 != d2: | ||
raise ValueError(f"Inconsistent shape: {shape1} and {shape2}") | ||
raise ValueError(f"Inconsistent shape: {shape1} and {shape2}") | ||
|
||
|
||
def parse_shape(shape, num_elements): | ||
if len(shape) == 1: | ||
shape = shape[0] | ||
if isinstance(shape, int): | ||
shape = (shape,) | ||
elif isinstance(shape, Iterable): | ||
shape = tuple(shape) | ||
else: | ||
raise TypeError(f"Invalid shape {shape}") | ||
shape = _replace_minus_one(shape, num_elements=num_elements) | ||
given_num_elements = int(np.prod(shape)) | ||
assert ( | ||
given_num_elements == num_elements | ||
), f"Inconsistent shape: should have {num_elements} elements, not {given_num_elements}." | ||
return shape | ||
|
||
|
||
def _replace_minus_one(shape: Tuple[int], num_elements: int): | ||
num_minus_one = shape.count(-1) | ||
assert num_minus_one <= 1, f"Invalid shape {shape}" | ||
if num_minus_one == 0: | ||
return shape | ||
|
||
otherdim = int(np.prod([size for size in shape if size >= 1])) | ||
inferred = num_elements // otherdim | ||
assert ( | ||
inferred * otherdim == num_elements | ||
), f"Invalid new shape {shape} for {num_elements} elements" | ||
i = shape.index(-1) | ||
replaced = list(shape[:i]) + [inferred] + list(shape[i + 1 :]) | ||
return tuple(replaced) |