-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathconcurrent.cr
145 lines (138 loc) · 3.31 KB
/
concurrent.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
require "fiber"
require "channel"
require "crystal/scheduler"
require "crystal/tracing"
# Blocks the current fiber for the specified number of seconds.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
@[Deprecated("Use `::sleep(Time::Span)` instead")]
def sleep(seconds : Number) : Nil
if seconds < 0
raise ArgumentError.new "Sleep seconds must be positive"
end
Crystal::Scheduler.sleep(seconds.seconds)
end
# Blocks the current Fiber for the specified time span.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(time : Time::Span) : Nil
Crystal::Scheduler.sleep(time)
end
# Blocks the current fiber forever.
#
# Meanwhile, other ready-to-execute fibers might start their execution.
def sleep : Nil
Crystal::Scheduler.reschedule
end
# Spawns a new fiber.
#
# NOTE: The newly created fiber doesn't run as soon as spawned.
#
# Example:
# ```
# # Write "1" every 1 second and "2" every 2 seconds for 6 seconds.
#
# require "wait_group"
#
# wg = WaitGroup.new 2
#
# spawn do
# 6.times do
# sleep 1.second
# puts 1
# end
# ensure
# wg.done
# end
#
# spawn do
# 3.times do
# sleep 2.seconds
# puts 2
# end
# ensure
# wg.done
# end
#
# wg.wait
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
end
# Spawns a fiber by first creating a `Proc`, passing the *call*'s
# expressions to it, and letting the `Proc` finally invoke the *call*.
#
# Compare this:
#
# ```
# i = 0
# while i < 5
# spawn { print(i) }
# i += 1
# end
# Fiber.yield
# # Output: 55555
# ```
#
# To this:
#
# ```
# i = 0
# while i < 5
# spawn print(i)
# i += 1
# end
# Fiber.yield
# # Output: 01234
# ```
#
# This is because in the first case all spawned fibers refer to
# the same local variable, while in the second example copies of
# *i* are passed to a `Proc` that eventually invokes the call.
macro spawn(call, *, name = nil, same_thread = false, &block)
{% if block %}
{% raise "`spawn(call)` can't be invoked with a block, did you mean `spawn(name: ...) { ... }`?" %}
{% end %}
{% if call.is_a?(Call) %}
->(
{% for arg, i in call.args %}
__arg{{i}} : typeof({{arg.is_a?(Splat) ? arg.exp : arg}}),
{% end %}
{% if call.named_args %}
{% for narg, i in call.named_args %}
__narg{{i}} : typeof({{narg.value}}),
{% end %}
{% end %}
) {
spawn(name: {{name}}, same_thread: {{same_thread}}) do
{% if call.receiver %}{{ call.receiver }}.{% end %}{{call.name}}(
{% for arg, i in call.args %}
{% if arg.is_a?(Splat) %}*{% end %}__arg{{i}},
{% end %}
{% if call.named_args %}
{% for narg, i in call.named_args %}
{{narg.name}}: __narg{{i}},
{% end %}
{% end %}
)
end
}.call(
{% for arg in call.args %}
{{arg.is_a?(Splat) ? arg.exp : arg}},
{% end %}
{% if call.named_args %}
{{call.named_args.map(&.value).splat}}
{% end %}
)
{% else %}
spawn do
{{call}}
end
{% end %}
end