-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
asyncmap.jl
76 lines (62 loc) · 2.44 KB
/
asyncmap.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# This file is a part of Julia. License is MIT: https://julialang.org/license
using Random
# Test asyncmap
@test allunique(asyncmap(x->(sleep(1.0);objectid(current_task())), 1:10))
# num tasks
@test length(unique(asyncmap(x->(yield();objectid(current_task())), 1:20; ntasks=5))) == 5
# default num tasks
@test length(unique(asyncmap(x->(yield();objectid(current_task())), 1:200))) == 100
# ntasks as a function
let nt=0
global nt_func
nt_func() = (v=div(nt, 25); nt+=1; v) # increment number of tasks by 1 for every 25th call.
# nt_func() will be called initially once and then for every
# iteration
end
@test length(unique(asyncmap(x->(yield();objectid(current_task())), 1:200; ntasks=nt_func))) == 7
# batch mode tests
let ctr=0
global next_ctr
next_ctr() = (ctr+=1; ctr)
end
resp = asyncmap(x->(v=next_ctr(); map(_->v, x)), 1:22; ntasks=5, batch_size=5)
@test length(resp) == 22
@test length(unique(resp)) == 5
input = rand(1:1000, 100)
@test asyncmap(x->map(args->identity(args...), x), input; ntasks=5, batch_size=5) == input
# check whether shape is retained
a=rand(2,2)
b=asyncmap(identity, a)
@test a == b
@test size(a) == size(b)
# check with an iterator that does not implement size()
c=Channel(32); foreach(i->put!(c,i), 1:10); close(c)
b=asyncmap(identity, c)
@test Int[1:10...] == b
@test size(b) == (10,)
# check with an iterator that has only implements length()
len_only_iterable = (1,2,3,4,5)
@test Base.IteratorSize(len_only_iterable) == Base.HasLength()
@test asyncmap(identity, len_only_iterable) == map(identity, len_only_iterable)
# Error conditions
@test_throws ArgumentError asyncmap(identity, 1:10; batch_size=0)
@test_throws ArgumentError asyncmap(identity, 1:10; batch_size="10")
@test_throws ArgumentError asyncmap(identity, 1:10; ntasks="10")
# Check that we throw a `CapturedException` holding the stacktrace if `f` throws
f42105(i) = i == 5 ? error("captured") : i
let
e = try
asyncmap(f42105, 1:5)
catch e
e
end
@test e isa CapturedException
@test e.ex == ErrorException("captured")
@test e.processed_bt[2][1].func === :f42105
end
include("generic_map_tests.jl")
generic_map_tests(asyncmap, asyncmap!)
# asyncmap with various types. Test for equivalence with map
run_map_equivalence_tests(asyncmap)
using Base.Unicode: uppercase
@test asyncmap(uppercase, "Hello World!") == map(uppercase, "Hello World!")