forked from hpcaitech/ColossalAI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_tensor_utils.py
96 lines (76 loc) · 3.38 KB
/
test_tensor_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import pytest
import colossalai
from colossalai.utils.cuda import get_current_device
from colossalai.gemini.tensor_utils import (colo_tensor_mem_usage, colo_model_data_tensor_move,
colo_model_data_tensor_move_inline, colo_model_data_move_to_cpu,
colo_model_tensor_clone)
from colossalai.gemini.stateful_tensor import StatefulTensor
from colossalai.utils import free_port
from colossalai.testing import rerun_if_address_is_in_use
import torch
from functools import partial
import torch.multiprocessing as mp
def _run_colo_tensor_mem_usage():
for i in range(1):
if i == 1:
t1 = StatefulTensor(torch.randn(2, 2))
t2 = StatefulTensor(torch.randn(4, 4))
c1, g1 = colo_tensor_mem_usage(t1)
c2, g2 = colo_tensor_mem_usage(t2)
assert c1 * 4 == c2
assert g1 * 4 == g2
else:
t1 = torch.randn(2, 2)
t2 = torch.randn(4, 4)
c1, g1 = colo_tensor_mem_usage(t1)
c2, g2 = colo_tensor_mem_usage(t2)
assert c1 * 4 == c2
assert g1 * 4 == g2
def _run_colo_model_data_tensor_move_inline():
for t in [StatefulTensor(torch.randn(2, 3)), torch.randn(2, 3)]:
colo_model_data_tensor_move_inline(t, get_current_device())
assert t.device == get_current_device()
def _run_colo_model_data_tensor_move():
for t in [(StatefulTensor(torch.ones(2, 3)), StatefulTensor(torch.zeros(2, 3).to(get_current_device()))),
(torch.ones(2, 3), torch.zeros(2, 3).to(get_current_device()))]:
cpu_t, cuda_t = t
colo_model_data_tensor_move(cpu_t, cuda_t)
assert cuda_t.device == get_current_device()
def _run_colo_model_data_move_to_cpu():
for t in [StatefulTensor(torch.randn(2, 2)), torch.randn(4, 4)]:
colo_model_data_move_to_cpu(t)
assert t.device == torch.device("cpu")
def _run_colo_model_tensor_clone():
for t in [
StatefulTensor(torch.randn(2, 2).cuda(torch.cuda.current_device())),
torch.randn(4, 4).cuda(torch.cuda.current_device())
]:
if issubclass(type(t), StatefulTensor):
assert t.payload.device == get_current_device()
else:
assert t.device == get_current_device()
p = colo_model_tensor_clone(t, get_current_device())
assert p.device == get_current_device()
for i in range(2):
for j in range(2):
if issubclass(type(t), StatefulTensor):
assert t.payload.device == p.device
assert t.payload[i][j] == p[i][j]
else:
assert t.device == p.device
assert t[i][j] == p[i][j]
def run_dist(rank, world_size, port):
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
_run_colo_tensor_mem_usage()
_run_colo_model_data_tensor_move_inline()
_run_colo_model_data_tensor_move()
_run_colo_model_data_move_to_cpu()
_run_colo_model_tensor_clone()
@pytest.mark.dist
@pytest.mark.parametrize("world_size", [2, 4])
@rerun_if_address_is_in_use()
def test_zero_tensor_utils(world_size):
run_func = partial(run_dist, world_size=world_size, port=free_port())
mp.spawn(run_func, nprocs=world_size)
if __name__ == '__main__':
test_zero_tensor_utils(world_size=2)