-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Array.stable_sort_segment. #13343
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the feature request very reasonable, and the implementation is not invasive as the underlying code is already able to do this, it just needs to be exposed in the API.
I left various inline comments (testing and variable naming).
I wonder if people want to suggest other names. I am personally fine with the _segment
prefix. (I could see suggestions for a Subarray
submodule that could over time grow with other capabilities. This could work for iterators and map*_inplace
.)
I wondered why sort
does not get the same treatment. A quick look suggests that its implementation is more obscure, so it looks less easy to expose a segment version of it.
|
||
(* The main loop. *) | ||
let number_of_tests = 5000 | ||
let max_length = 128 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also decide to exhaustively test all segments on a size-10 arrays, which may provide similar test coverage (it is not robust to algorithms using larger cutoffs, but luckily the sort code uses cutoff = 5
) at a fraction of the compute cost -- there are 45 such segments, total length 210.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I have reduced number_of_tests
from 5000 to 1000 and have added tests of the kind that you suggest.
stdlib/array.ml
Outdated
@@ -385,7 +385,7 @@ let sort cmp a = | |||
|
|||
|
|||
let cutoff = 5 | |||
let stable_sort cmp a = | |||
let unsafe_stable_sort_segment cmp a ofs len = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer loud names for those variables such as init_ofs
and init_len
, to avoid using them by mistake instead of one of the more local variables in the auxiliary functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will do.
(In fact, I have written a version of the code where the auxiliary functions are moved to the top level and commented; but that's a more invasive change.)
Why not use |
So |
You rather want to plan a convention here, so that given an operation it's easy to check if you have a corresponding function working on a sub-array without having to guess how the heck it could have been called. For example it could be prefix the operation with
Didn't think much about it but I think I would rather go with the prefix. |
Perform just 1000 random tests, instead of 5000. Add a second kind of tests, where we test all segments of a short array containing random data.
I think I would prefer
Indeed, it is slightly more obscure. I think I could do it if desired. It is a matter of adding an offset in every call to |
For the record, here is my commented version of the code. I have added several assertions to check that my comments are correct. I was surprised and delighted to discover that I have modified the code to use Furthermore, I have added several optimizations (see If there is interest, I could propose a PR with this more invasive change. (* [disjoint a1 ofs1 len1 a2 ofs2 len2] tests whether the array segments
described by [a1], [ofs1], [len1] and [a2], [ofs2], [len2] are disjoint.
It is used only in assertions. *)
let disjoint a1 ofs1 len1 a2 ofs2 len2 =
a1 != a2 ||
ofs1 + len1 <= ofs2 ||
ofs2 + len2 <= ofs1
(* [suffix a1 ofs1 len1 a2 ofs2 len2] tests whether the array segment
described by [a1], [ofs1], [len1] is a suffix of the array segment
described by [a2], [ofs2], [len2]. It is used only in assertions. *)
let suffix a1 ofs1 len1 a2 ofs2 len2 =
a1 == a2 &&
ofs1 + len1 = ofs2 + len2 &&
len1 <= len2
(* If the input data is already sorted, or close to sorted, it may be the
case that the calls from [merge] (below) to [blit] copy an array segment
to itself. We recognize this situation, where there is nothing to do. *)
let[@inline] terminal_blit src srcofs dst dstofs len =
if src == dst && srcofs = dstofs then
()
else
blit src srcofs dst dstofs len
(* [merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs] merges
the sorted array segments described by [src1], [src1ofs], [src1len] and
[src2], [src2ofs], [src2len]. The resulting data is written into the
array segment described by [dst], [dstofs], and [src1len + src2len].
One of the source segments (say, the first one) must be disjoint with
the destination segment. The other source segment (say, the second one)
can be either disjoint with the destination segment or a suffix of the
destination segment. (This works because the data in the second source
segment is then moved left: it is read before it is overwritten.) *)
(* [src1len] and [src2len] must be nonzero. *)
(* This is a stable merge: when [s1] and [s2] are equal, [s1] is favored. *)
let merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs =
assert (0 < src1len && 0 < src2len);
assert (
let dstlen = src1len + src2len in
(* Either both source segments are disjoint with the destination segment, *)
disjoint src1 src1ofs src1len dst dstofs dstlen &&
disjoint src2 src2ofs src2len dst dstofs dstlen ||
(* or the first source segment is a suffix of the destination segment, *)
suffix src1 src1ofs src1len dst dstofs dstlen &&
disjoint src2 src2ofs src2len dst dstofs dstlen ||
(* or the second source segment is a suffix of the destination segment. *)
disjoint src1 src1ofs src1len dst dstofs dstlen &&
suffix src2 src2ofs src2len dst dstofs dstlen
);
let src1r = src1ofs + src1len
and src2r = src2ofs + src2len in
let rec loop i1 s1 i2 s2 d =
if cmp s1 s2 <= 0 then begin
unsafe_set dst d s1;
let i1 = i1 + 1 in
if i1 < src1r then
loop i1 (unsafe_get src1 i1) i2 s2 (d + 1)
else
terminal_blit src2 i2 dst (d + 1) (src2r - i2)
end else begin
unsafe_set dst d s2;
let i2 = i2 + 1 in
if i2 < src2r then
loop i1 s1 i2 (unsafe_get src2 i2) (d + 1)
else
terminal_blit src1 i1 dst (d + 1) (src1r - i1)
end
in
loop src1ofs (unsafe_get src1 src1ofs) src2ofs (unsafe_get src2 src2ofs) dstofs
(* Although [merge] (above) works in all situations, we can make it much
faster in the special case where the data in the first source segment is
less than or equal to the data in the second source segment. Indeed, in
that case, two calls to [blit] suffice. The cost of recognizing this
special case is two reads, a comparison, and a conditional. *)
let[@inline] optimistic_merge
cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs =
assert (0 < src1len && 0 < src2len);
let last1 = unsafe_get src1 (src1ofs + src1len - 1)
and first2 = unsafe_get src2 src2ofs in
if cmp last1 first2 <= 0 then begin
blit src1 src1ofs dst dstofs src1len;
blit src2 src2ofs dst (dstofs + src1len) src2len
end
else
merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs
(* Even better: in the special case where the second source segment is a
suffix of the destination segment, [optimistic_merge] is simplified: the
second call to [blit] has no effect, as it copies the second source
segment to itself. Thus, it can be removed. *)
let[@inline] magic_optimistic_merge
cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs =
(* Check that the second source segment
is a suffix of the destination segment. *)
assert (src2 == dst && dstofs + src1len = src2ofs);
assert (0 < src1len && 0 < src2len);
let last1 = unsafe_get src1 (src1ofs + src1len - 1)
and first2 = unsafe_get src2 src2ofs in
if cmp last1 first2 <= 0 then
blit src1 src1ofs dst dstofs src1len
else
merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs
(* [isortto cmp src srcofs dst dstofs len] sorts the array segment described
by [src], [srcofs], [len]. The resulting data is written into the array
segment described by [dst], [dstofs], [len]. The source and destination
segments must either coincide or be disjoint. This is an insertion sort. *)
let isortto cmp src srcofs dst dstofs len =
assert (
src == dst && srcofs = dstofs ||
disjoint src srcofs len dst dstofs len
);
for i = 0 to len - 1 do
let e = unsafe_get src (srcofs + i) in
let j = ref (dstofs + i - 1) in
while !j >= dstofs && cmp (unsafe_get dst !j) e > 0 do
unsafe_set dst (!j + 1) (unsafe_get dst !j);
decr j
done;
unsafe_set dst (!j + 1) e
done
(* The cutoff determines where we switch from merge sort to insertion sort. *)
let cutoff = 5
(* [sortto cmp src srcofs dst dstofs len] sorts the array segment described
by [src], [srcofs], [len]. The resulting data is written into the array
segment described by [dst], [dstofs], [len]. The destination segment must
be disjoint from the source segment. This is a merge sort, with an
insertion sort at the leaves. It is a stable sort. *)
let rec sortto cmp src srcofs dst dstofs len =
assert (disjoint src srcofs len dst dstofs len);
if len <= cutoff then
isortto cmp src srcofs dst dstofs len
else begin
let len1 = len / 2 in
let len2 = len - len1 in
(* The second half of [src] can be larger by one slot. *)
assert (len1 <= len2 && len2 <= len1 + 1);
(* Sort the second half of [src] and move it to the second half of [dst]. *)
sortto cmp src (srcofs + len1) dst (dstofs + len1) len2;
(* Sort the first half of [src] and move it to the second half of [src]. *)
(* This requires [len1 <= len2]. *)
sortto cmp src srcofs src (srcofs + len2) len1;
(* Merge the two sorted halves, moving the data to [dst]. *)
(* This is an in-place merge: the second source segment is contained
within the destination segment! *)
(* This is a stable sort, because the first half of the original
data (now moved and sorted) is the first argument to [merge]. *)
magic_optimistic_merge cmp
src (srcofs + len2) len1 dst (dstofs + len1) len2 dst dstofs
end
(* [unsafe_stable_sort_segment cmp a ofs len] sorts the array segment
described by [a], [ofs], [len]. This array segment is sorted in place.
This function is named [unsafe] because it does not validate [ofs] and
[len]. *)
let unsafe_stable_sort cmp a ofs len =
let base = ofs in
if len <= cutoff then
isortto cmp a base a base len
else begin
let len1 = len / 2 in
let len2 = len - len1 in
(* The second half of [a] can be larger by one slot. *)
assert (len1 <= len2 && len2 <= len1 + 1);
(* Allocate a temporary array that fits the second half of [a]. *)
let t = make len2 (unsafe_get a base) in
(* Sort the second half of [a] and move it to [t]. *)
sortto cmp a (base + len1) t 0 len2;
(* Sort the first half of [a] and move it to the second half of [a]. *)
(* This requires [len1 <= len2]. *)
sortto cmp a base a (base + len2) len1;
(* Merge the two sorted halves, moving the data to [a]. *)
(* This is an in-place merge: the first source segment is contained
within the destination segment! *)
(* This is a stable sort, because the first half of the original
data (now moved and sorted) is the first argument to [merge]. *)
optimistic_merge cmp a (base + len2) len1 t 0 len2 a base
end |
I think that if you want to have every If this PR had been called "Implement There are quite a few slicing operations in the Could you perhaps clarify why you think it's necessary to introduce new terminology in the |
I don't have a strong case, and am happy with keeping I think I would prefer to say |
I'm a little bit agnostic to this choice, so as far as I'm concerned you get to choose. I don't think there's a strong case justifying one over the other. The way I read it, Also about collecting them in a submodule I think I'd rather not. A submodule would make more sense if a new kind of value holding indices on the subarray was being defined. I'm not sure it's worth adding hierarchy for a bunch of operations. |
(We could have added a type This is good news, because it is easy to incrementally add |
@@ -339,6 +339,19 @@ val stable_sort : cmp:('a -> 'a -> int) -> 'a array -> unit | |||
than the current implementation of {!sort}. | |||
*) | |||
|
|||
val stable_sort_segment : ('a -> 'a -> int) -> 'a array -> int -> int -> unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if this is a dumb question, but shouldn't we use labels for ofs
and len
since we're in the Labels
version of this module?
This PR adds a new standard library function
Array.stable_sort_segment
.Motivation: this is needed in order to (efficiently) implement sorting for dynamic arrays (e.g.,
Dynarray.stable_sort
).Only a few lines of code are changed. The core sorting algorithm is unchanged.
Note: one might wish to also implement
Array.sort_segment
(a variant ofArray.sort
). I have not done so here.Note: I am surprised to see that the sorting algorithms use
get
andset
instead ofunsafe_get
andunsafe_set
. Some benchmarks suggest that this can make a 5% difference in performance.