Skip to content

Commit

Permalink
Added the scx_ml_collect scheduler, as well as the task_sched_data.h …
Browse files Browse the repository at this point in the history
…file. These will attempt to collect all the data that Yeonju is interested in
  • Loading branch information
AlejandroByrne committed Nov 15, 2024
1 parent db46e27 commit 03e20df
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 1 deletion.
2 changes: 1 addition & 1 deletion scheds/c/meson.build
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
c_scheds = ['scx_simple', 'scx_qmap', 'scx_central', 'scx_userland', 'scx_nest',
'scx_flatcg', 'scx_pair']
'scx_flatcg', 'scx_pair', 'scx_ml_collect']

foreach sched: c_scheds
thread_dep = dependency('threads')
Expand Down
179 changes: 179 additions & 0 deletions scheds/c/scx_ml_collect.bpf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/* SPDX-License-Identifier: GPL-2.0 */
/*
* A simple scheduler.
*
* By default, it operates as a simple global weighted vtime scheduler and can
* be switched to FIFO scheduling. It also demonstrates the following niceties.
*
* - Statistics tracking how many tasks are queued to local and global dsq's.
* - Termination notification for userspace.
*
* While very simple, this scheduler should work reasonably well on CPUs with a
* uniform L3 cache topology. While preemption is not implemented, the fact that
* the scheduling queue is shared across all CPUs means that whatever is at the
* front of the queue is likely to be executed fairly quickly given enough
* number of CPUs. The FIFO scheduling mode may be beneficial to some workloads
* but comes with the usual problems with FIFO scheduling where saturating
* threads can easily drown out interactive ones.
*
* Copyright (c) 2022 Meta Platforms, Inc. and affiliates.
* Copyright (c) 2022 Tejun Heo <tj@kernel.org>
* Copyright (c) 2022 David Vernet <dvernet@meta.com>
*/
#include <scx/common.bpf.h>
#include "task_sched_data.h"
#include <string.h>

char _license[] SEC("license") = "GPL";

const volatile bool fifo_sched;

static u64 vtime_now;
UEI_DEFINE(uei);

/*
* Built-in DSQs such as SCX_DSQ_GLOBAL cannot be used as priority queues
* (meaning, cannot be dispatched to with scx_bpf_dispatch_vtime()). We
* therefore create a separate DSQ with ID 0 that we dispatch to and consume
* from. If scx_simple only supported global FIFO scheduling, then we could
* just use SCX_DSQ_GLOBAL.
*/
#define SHARED_DSQ 0

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u64));
__uint(max_entries, 2); /* [local, global] */
} stats SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(key_size, sizeof(pid_t));
__uint(value_size, sizeof(struct task_sched_data));
__uint(max_entries, 256);
} task_data SEC(".maps");

static void stat_inc(u32 idx)
{
u64 *cnt_p = bpf_map_lookup_elem(&stats, &idx);
if (cnt_p)
(*cnt_p)++;
}

static inline bool vtime_before(u64 a, u64 b)
{
return (s64)(a - b) < 0;
}

s32 BPF_STRUCT_OPS(ml_collect_select_cpu, struct task_struct *p, s32 prev_cpu, u64 wake_flags)
{
bool is_idle = false;
s32 cpu;

cpu = scx_bpf_select_cpu_dfl(p, prev_cpu, wake_flags, &is_idle);
if (is_idle) {
stat_inc(0); /* count local queueing */
scx_bpf_dispatch(p, SCX_DSQ_LOCAL, SCX_SLICE_DFL, 0);
}

return cpu;
}

void BPF_STRUCT_OPS(ml_collect_enqueue, struct task_struct *p, u64 enq_flags)
{
pid_t pid = p->pid;
struct task_sched_data * tsk_ptr = bpf_map_lookup_elem(&task_data, &pid);
if (tsk_ptr != NULL) { // already aware of this task (pid)
bpf_printk("Found a pid that is already accounted for\n");
} else { // new task, create and insert new data struct for it
struct task_sched_data tsk_data = {.pid = p->pid, /*.min_flt = p->min_flt*/};
// strncpy(tsk_data.name, p->comm, TASK_COMM_LEN);
if (bpf_map_update_elem(&task_data, &pid, &tsk_data, BPF_NOEXIST) == 0) {
bpf_printk("Successfully added a struct to the task_data hash map\n");
} else {
bpf_printk("Adding a struct to the task_data hash map failed\n");
}
}

stat_inc(1); /* count global queueing */

if (fifo_sched) {
scx_bpf_dispatch(p, SHARED_DSQ, SCX_SLICE_DFL, enq_flags);
} else {
u64 vtime = p->scx.dsq_vtime;

/*
* Limit the amount of budget that an idling task can accumulate
* to one slice.
*/
if (vtime_before(vtime, vtime_now - SCX_SLICE_DFL))
vtime = vtime_now - SCX_SLICE_DFL;

scx_bpf_dispatch_vtime(p, SHARED_DSQ, SCX_SLICE_DFL, vtime,
enq_flags);
}
}

void BPF_STRUCT_OPS(ml_collect_dispatch, s32 cpu, struct task_struct *prev)
{
scx_bpf_consume(SHARED_DSQ);
}

void BPF_STRUCT_OPS(ml_collect_running, struct task_struct *p)
{
if (fifo_sched)
return;

/*
* Global vtime always progresses forward as tasks start executing. The
* test and update can be performed concurrently from multiple CPUs and
* thus racy. Any error should be contained and temporary. Let's just
* live with it.
*/
if (vtime_before(vtime_now, p->scx.dsq_vtime))
vtime_now = p->scx.dsq_vtime;
}

void BPF_STRUCT_OPS(ml_collect_stopping, struct task_struct *p, bool runnable)
{
if (fifo_sched)
return;

/*
* Scale the execution time by the inverse of the weight and charge.
*
* Note that the default yield implementation yields by setting
* @p->scx.slice to zero and the following would treat the yielding task
* as if it has consumed all its slice. If this penalizes yielding tasks
* too much, determine the execution time by taking explicit timestamps
* instead of depending on @p->scx.slice.
*/
p->scx.dsq_vtime += (SCX_SLICE_DFL - p->scx.slice) * 100 / p->scx.weight;
}

void BPF_STRUCT_OPS(ml_collect_enable, struct task_struct *p)
{
p->scx.dsq_vtime = vtime_now;
}

s32 BPF_STRUCT_OPS_SLEEPABLE(ml_collect_init)
{
return scx_bpf_create_dsq(SHARED_DSQ, -1);
}

void BPF_STRUCT_OPS(ml_collect_exit, struct scx_exit_info *ei)
{
UEI_RECORD(uei, ei);
}

SCX_OPS_DEFINE(ml_collect_ops,
.select_cpu = (void *)ml_collect_select_cpu,
.enqueue = (void *)ml_collect_enqueue,
.dispatch = (void *)ml_collect_dispatch,
.running = (void *)ml_collect_running,
.stopping = (void *)ml_collect_stopping,
.enable = (void *)ml_collect_enable,
.init = (void *)ml_collect_init,
.exit = (void *)ml_collect_exit,
.name = "ml_collect");
124 changes: 124 additions & 0 deletions scheds/c/scx_ml_collect.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2022 Meta Platforms, Inc. and affiliates.
* Copyright (c) 2022 Tejun Heo <tj@kernel.org>
* Copyright (c) 2022 David Vernet <dvernet@meta.com>
*/
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <libgen.h>
#include <bpf/bpf.h>
#include <bpf/libbpf.h>
#include <scx/common.h>
#include "task_sched_data.h"
#include "scx_ml_collect.bpf.skel.h"

const char help_fmt[] =
"A simple sched_ext scheduler.\n"
"\n"
"See the top-level comment in .bpf.c for more details.\n"
"\n"
"Usage: %s [-f] [-v]\n"
"\n"
" -f Use FIFO scheduling instead of weighted vtime scheduling\n"
" -v Print libbpf debug messages\n"
" -h Display this help and exit\n";

static bool verbose;
static volatile int exit_req;

static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args)
{
if (level == LIBBPF_DEBUG && !verbose)
return 0;
return vfprintf(stderr, format, args);
}

static void sigint_handler(int simple)
{
exit_req = 1;
}

static void read_stats(struct scx_ml_collect *skel, __u64 *stats)
{
int nr_cpus = libbpf_num_possible_cpus();
__u64 cnts[2][nr_cpus];
__u32 idx;

memset(stats, 0, sizeof(stats[0]) * 2);

for (idx = 0; idx < 2; idx++) {
int ret, cpu;

ret = bpf_map_lookup_elem(bpf_map__fd(skel->maps.stats),
&idx, cnts[idx]);
if (ret < 0)
continue;
for (cpu = 0; cpu < nr_cpus; cpu++)
stats[idx] += cnts[idx][cpu];
}
}

static void print_stats(struct scx_ml_collect * skel) {
pid_t * cur_pid = NULL;
pid_t next_pid;
int err = bpf_map_get_next_key(bpf_map__fd(skel->maps.task_data), cur_pid, &next_pid);
struct task_sched_data tsk_ptr;
while (!err) {
printf("err value: %d\n", err);
int result = bpf_map_lookup_elem(bpf_map__fd(skel->maps.task_data), &next_pid, &tsk_ptr);
printf("result value: %d\n", result);
printf("NAME: %s, PID: %d, MIN_FLT: %u\n", tsk_ptr.name, tsk_ptr.pid, tsk_ptr.min_flt);
cur_pid = &next_pid;
err = bpf_map_get_next_key(bpf_map__fd(skel->maps.task_data), cur_pid, &next_pid);
}
}

int main(int argc, char **argv)
{
struct scx_ml_collect *skel;
struct bpf_link *link;
__u32 opt;
__u64 ecode;

libbpf_set_print(libbpf_print_fn);
signal(SIGINT, sigint_handler);
signal(SIGTERM, sigint_handler);
restart:
skel = SCX_OPS_OPEN(ml_collect_ops, scx_ml_collect);

while ((opt = getopt(argc, argv, "fvh")) != -1) {
switch (opt) {
case 'f':
skel->rodata->fifo_sched = true;
break;
case 'v':
verbose = true;
break;
default:
fprintf(stderr, help_fmt, basename(argv[0]));
return opt != 'h';
}
}

SCX_OPS_LOAD(skel, ml_collect_ops, scx_ml_collect, uei);
link = SCX_OPS_ATTACH(skel, ml_collect_ops, scx_ml_collect);

while (!exit_req && !UEI_EXITED(skel, uei)) {
// __u64 stats[2];
print_stats(skel);
// read_stats(skel, stats);
// printf("local=%llu global=%llu\n", stats[0], stats[1]);
fflush(stdout);
sleep(1);
}

bpf_link__destroy(link);
ecode = UEI_REPORT(skel, uei);
scx_ml_collect__destroy(skel);

if (UEI_ECODE_RESTART(ecode))
goto restart;
return 0;
}
62 changes: 62 additions & 0 deletions scheds/c/task_sched_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// #include <sched.h>

#define TASK_COMM_LEN 16

struct task_sched_data {
// Task attributes
// Identification
char name[TASK_COMM_LEN];
int pid;
int rq_idx;
u64 last_sum_exec_runtime;
u64 total_numa_faults;
u64 blkio_start; // Deadline attribtues
u64 blkio_delay;
u64 swapin_delay;
u32 blkio_count;
u32 swapin_count;
u64 freepages_start;
u64 freepages_delay;
u64 thrashing_start;
u64 thrashing_delay;
u32 freepages_count;
u32 thrashing_count;
int stack_refcount;

// Sched entity counters
long unsigned int weight;
u32 inv_weight;
//u64 deadline;
u64 vruntime;
u64 sum_exec_runtime;
u64 prev_sum_exec_runtime;
u64 nr_migrations;

// Sched stats counters
u64 wait_start;
u64 wait_max;
u64 wait_count;
u64 wait_sum;
u64 iowait_count;
u64 iowait_sum;
u64 sleep_start;
u64 sleep_max;
u64 sum_sleep_runtime;
u64 block_start;
u64 block_max;
u64 start_time; // added by Ale
u64 end_time; // added by Ale
u64 execution_time; // added by Ale

u64 run_delay;
u64 last_arrival;
u64 last_queued;

// Memory counters
u32 min_flt;
u32 maj_flt;
u32 total_vm;
u32 hiwater_rss;
int map_count;

};

0 comments on commit 03e20df

Please sign in to comment.