-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathutils.R
245 lines (208 loc) · 6.71 KB
/
utils.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# assertthat helpers ------------------------------------------------------
is_scalar_character <- function(x) {
rlang::is_scalar_character(x)
}
assertthat::on_failure(is_scalar_character) <- function(call, env) {
paste0(deparse(call$x), " is not a character scalar (a length one character vector).")
}
is_scalar_integerish <- function(x) {
rlang::is_scalar_integerish(x)
}
assertthat::on_failure(is_scalar_integerish) <- function(call, env) {
paste0(deparse(call$x), " is not an integer scalar (a length one integer vector).")
}
is_user_port <- function(x) {
x >= 1024 && x <= 49151
}
assertthat::on_failure(is_user_port) <- function(call, env) {
paste0(deparse(call$x), " is not an allowed port number (it must be in the range 1024-49151).")
}
is_available_port <- function(x) {
tryCatch({
srv <- httpuv::startServer("127.0.0.1", x, list())
on.exit(srv$stop())
TRUE
},
error = function(e) FALSE
)
}
assertthat::on_failure(is_available_port) <- function(call, env) {
paste0("Port ", deparse(call$x), " already in use. Maybe is headless Chrome already running?")
}
is_function <- function(x) {
rlang::is_function(x)
}
assertthat::on_failure(is_function) <- function(call, env) {
paste0(deparse(call$x), "must be a function.")
}
is_single_param_fun <- function(x) {
assertthat::assert_that(is_function(x))
length(rlang::fn_fmls(x)) == 1L
}
assertthat::on_failure(is_single_param_fun) <- function(call, env) {
paste0("Function ", deparse(call$x), " must have one parameter and only one.")
}
check_is_single_param_fun <- function(x) {
assertthat::assert_that(is_single_param_fun(x))
}
is_list <- function(x) {
rlang::is_list(x)
}
assertthat::on_failure(is_list) <- function(call, env) {
paste0(deparse(call$x), " must be a list.")
}
# http helpers ------------------------------------------------------------
is_remote_reachable <- function(host, port, secure, retry_delay = 0.2, max_attempts = 15L) {
url <- build_http_url(host = host, port = port, secure = secure)
remote_reached <- function(url) {
check_url <- purrr::safely(httr::GET, otherwise = list())
response <- check_url(url, httr::use_proxy(""))
isTRUE(response$result$status_code == 200)
}
succeeded <- FALSE
"!DEBUG Trying to find `url`"
for (i in 1:max_attempts) {
"!DEBUG attempt `i`..."
succeeded <- remote_reached(url)
if (isTRUE(succeeded)) break
Sys.sleep(retry_delay)
}
"!DEBUG `if(succeeded) paste(url, 'found') else paste('...cannot find', url)`"
succeeded
}
build_http_url <- function(host, port, secure, path = NULL, query = NULL) {
scheme <- if(isTRUE(secure)) "https" else "http"
httr::modify_url("", scheme = scheme, hostname = host, port = port, path = path, query = query)
}
parse_ws_url <- function(ws_url) {
# NOTE: ws_url must be a character scalar
ws_url <- httr::parse_url(ws_url)
# ws_url scheme must be ws or wss:
if(!identical(ws_url$scheme, "ws") && !identical(ws_url$scheme, "wss")) {
return(NULL)
}
# ws_url must contain a hostname:
if(is.null(ws_url$hostname)) {
return(NULL)
}
# ws_url must contain a port:
if(is.null(ws_url$port)) {
return(NULL)
}
# ws_url path must be of the form devtools/page/xxxx or devtools/browser/xxx-yyy
path <- strsplit(ws_url$path, "/")[[1]]
if(length(path) != 3L) {
return(NULL)
}
if(!identical(path[1:2], c("devtools", "page")) &&
!identical(path[1:2], c("devtools", "browser"))
) {
return(NULL)
}
structure(
list(
host = ws_url$hostname,
port = ws_url$port,
secure = identical(ws_url$scheme, "wss"),
type = path[2],
id = path[3]
),
class = "cdp_ws_url"
)
}
build_ws_url <- function(ws_url) {
stopifnot(inherits(ws_url, "cdp_ws_url"))
scheme <- if(ws_url$secure) "wss" else "ws"
path <- c("devtools", ws_url$type, ws_url$id)
httr::modify_url(
"",
scheme = scheme,
hostname = ws_url$host,
port = ws_url$port,
path = path
)
}
# miscellaneous -----------------------------------------------------------
stop_or_reject <- function(message, async = FALSE) {
err <- simpleError(message)
if(isTRUE(async)) {
return(promises::promise_reject(err))
}
stop(err)
}
#' create a predicate from various forms
#'
#' @param arg a function, a formula or a value that will be tested as identical
#' @param env see env from `rlang::as_function`
#'
#' @return a function that will apply the predicate and return TRUE or FALSE
#' @noRd
as_predicate <- function(arg, env = rlang::caller_env()) {
if(rlang::is_formula(arg) || rlang::is_function(arg)) {
fun <- rlang::as_function(arg, env = env)
} else {
fun <- function(x) identical(x, arg)
}
function(...) {
res <- fun(...)
if(!rlang::is_true(res) && !rlang::is_false(res)) {
stop("Predicate functions must return a single `TRUE` or `FALSE`.")
}
res
}
}
#' Combine predicates
#'
#' @param list_of_predicates A named list of predicates.
#'
#' @return A function that take a single parameter. The argument of the
#' returned function is expected to be a named list. The predicates
#' function are applied to the objects of the result
#' @noRd
combine_predicates <- function(list_of_predicates) {
if(length(list_of_predicates) == 0) return(function(...) TRUE)
function(result) {
# if a name of a predicate is missing in the result object, return FALSE early
if(length(setdiff(names(list_of_predicates), names(result))) > 0) {
return(FALSE)
}
bool <- purrr::imap_lgl(list_of_predicates, ~ .x(result[[.y]]))
all(bool)
}
}
# callbacks wrappers ------------------------------------------------------
dewrap <- function(x, ...) {
UseMethod("dewrap", x)
}
dewrap.default <- function(x, ...) {
x
}
dewrap.crrri_callback_wrapper <- function(x, ...) {
attr(x, "callback", exact = TRUE)
}
format.crrri_callback_wrapper <- function(x, ...) {
format_object <- paste(collapse = "\n", format(dewrap(x)))
paste("=== wrapper over function ===", format_object, sep = "\n")
}
print.crrri_callback_wrapper <- function(x, ...) {
cat(format(x), "\n")
}
new_callback_wrapper <- function(wrapper_fn, callback) {
stopifnot(rlang::is_function(wrapper_fn), rlang::is_function(callback))
attr(wrapper_fn, "callback") <- dewrap(callback)
if(!inherits(wrapper_fn, "crrri_callback_wrapper")) {
class(wrapper_fn) <- c("crrri_callback_wrapper", class(wrapper_fn))
}
wrapper_fn
}
# kill a zombie Chrome ----------------------------------------------------
# this is because sometimes my R session crashes and I get a zombie Chrome
kill_zombie <- function(port = 9222) {
client <- hold(CDPSession(port = port))
hold(client$Browser$close())
if(client$readyState() == 3L) {
message("zombie killed!")
} else {
message("zombie is still alive!") # nocov
}
}