Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: yield processor explicitly #198

Merged
merged 1 commit into from
May 7, 2024

Conversation

zezhehh
Copy link
Contributor

@zezhehh zezhehh commented Dec 1, 2023

It will spend more time within the potentially continuous goroutine, as we anticipate the scheduler to switch contexts more frequently under high concurrency, especially in the local runner. To address it, we explicitly yield the processor in the loop block with a default statement.

@myzhan :D

@myzhan
Copy link
Owner

myzhan commented Dec 4, 2023

@zezhehh I'm not sure why we should do this explicitly.

@zezhehh
Copy link
Contributor Author

zezhehh commented Dec 4, 2023

@zezhehh I'm not sure why we should do this explicitly.

@myzhan I think it explains well 👉 https://medium.com/@genchilu/one-goroutine-trap-when-would-goroutine-context-switch-f91b9a20b8e8

Of course it will not only spawn 4 goroutines before any noticeable stuck, it is stuck for me around every 700 goroutines without runtime.Gosched, while there's no apparent stuck with runtime.Gosched (and finish spawning for 10000 goroutines much quicker).

@myzhan
Copy link
Owner

myzhan commented Dec 11, 2023

OK, now I get it. Can you provide your code of boomer task to reproduce this issue?

@zezhehh
Copy link
Contributor Author

zezhehh commented Dec 13, 2023

OK, now I get it. Can you provide your code of boomer task to reproduce this issue?

@myzhan Adding explicit runtime.Gosched() in boomer will reduce the chance of a panic. (try to run it multiple times)

package main

import (
	"sync"
	"time"
	"runtime"

	"github.com/myzhan/boomer"
)

func someSequentialActions() {
	for step := 0; step < 20; step++ {
		sum := 0
		for i := 0; i < 100000; i++ {
			sum += i
			runtime.Gosched()
		}
	}
}

const (
	interval   = 50 * time.Millisecond
	spawnCount = 1000
	spawnRate  = 50
)

type Foo struct {
	stop   chan any
	stream chan int
}

func (f *Foo) Generate() {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()
	for {
		select {
		case <-f.stop:
			ticker.Stop()
			return
		case <-ticker.C:
			f.stream <- 1
		}
	}
}

func (f *Foo) Receive(stream <-chan int) {
	maxInterval := 2 * interval
	for {
		select {
		case <-f.stop:
			return
		case <-time.After(maxInterval):
			panic("timeout")
		case <-stream:
			// do something
		}
	}

}

func (f *Foo) Stop() {
	close(f.stop)
}

func main() {
	wg := sync.WaitGroup{}
	task := &boomer.Task{
		Name:   "foo",
		Weight: 10,
		Fn: func() {
			wg.Add(1)
			defer wg.Done()

			f := &Foo{
				stop:   make(chan any),
				stream: make(chan int),
			}
			defer f.Stop()

			go f.Generate()
			go f.Receive(f.stream)

			someSequentialActions()
		},
	}

	instance := boomer.NewStandaloneBoomer(spawnCount, spawnRate)
	go instance.Run(task)

	time.Sleep(100 * time.Second)
	instance.Quit()
	wg.Wait()
	println("done")
}

@myzhan myzhan merged commit edd5204 into myzhan:master May 7, 2024
@myzhan
Copy link
Owner

myzhan commented May 7, 2024

I can't reproduce the stuck issue, but I think it's harmless to add runtime.Gosched().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants