Skip to content

Commit

Permalink
testing: set pprof labels for goroutines that use the same code for d…
Browse files Browse the repository at this point in the history
…ifferent cases (algorand#4350)
  • Loading branch information
cce authored Jun 27, 2024
1 parent d3831cd commit 63c0d5b
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 17 deletions.
2 changes: 2 additions & 0 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/logspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
)

const (
Expand Down Expand Up @@ -113,6 +114,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
defer func() {
close(decoded)
}()
util.SetGoroutineLabels("tokenizeTag", string(tag))
for {
select {
case raw, ok := <-networkMessages:
Expand Down
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ func (n *P2PNetwork) Start() {
for i := 0; i < incomingThreads; i++ {
n.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n)
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n, "network", "P2PNetwork")
}

n.wg.Add(1)
go n.broadcaster.broadcastThread(&n.wg, n)
go n.broadcaster.broadcastThread(&n.wg, n, "network", "P2PNetwork")
n.service.DialPeersUntilTargetCount(n.config.GossipFanout)

n.wg.Add(1)
Expand Down
10 changes: 6 additions & 4 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,10 @@ func (wn *WebsocketNetwork) Start() {
for i := 0; i < incomingThreads; i++ {
wn.wg.Add(1)
// We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure.
go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn)
go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn, "network", "WebsocketNetwork")
}
wn.wg.Add(1)
go wn.broadcaster.broadcastThread(&wn.wg, wn)
go wn.broadcaster.broadcastThread(&wn.wg, wn, "network", "WebsocketNetwork")
if wn.prioScheme != nil {
wn.wg.Add(1)
go wn.prioWeightRefresh()
Expand Down Expand Up @@ -1129,8 +1129,9 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf
}
}

func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager) {
func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager, profLabels ...string) {
defer wg.Done()
util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.messageHandlerThread")...)

for {
select {
Expand Down Expand Up @@ -1231,8 +1232,9 @@ func (wn *msgHandler) sendFilterMessage(msg IncomingMessage, net networkPeerMana
}
}

func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager) {
func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager, profLabels ...string) {
defer wg.Done()
util.SetGoroutineLabels(append(profLabels, "func", "msgHandler.broadcastThread")...)

slowWritingPeerCheckTicker := time.NewTicker(wn.slowWritingPeerMonitorInterval)
defer slowWritingPeerCheckTicker.Stop()
Expand Down
7 changes: 4 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
}
node.net = p2pNode

node.cryptoPool = execpool.MakePool(node)
node.lowPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.LowPriority, node)
node.highPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.HighPriority, node)
node.cryptoPool = execpool.MakePool(node, "worker", "cryptoPool")
node.lowPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.LowPriority, node, "worker", "lowPriorityCryptoVerificationPool")
node.highPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.HighPriority, node, "worker", "highPriorityCryptoVerificationPool")
ledgerPaths := ledger.DirsAndPrefix{
DBFilePrefix: config.LedgerFilenamePrefix,
ResolvedGenesisDirs: node.genesisDirs,
Expand Down Expand Up @@ -1061,6 +1061,7 @@ func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgerco
// don't have to delete key for each block we received.
func (node *AlgorandFullNode) oldKeyDeletionThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()

for {
select {
case <-done:
Expand Down
11 changes: 7 additions & 4 deletions util/execpool/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package execpool
import (
"context"
"sync"

"github.com/algorand/go-algorand/util"
)

// A backlog for an execution pool. The typical usage of this is to
Expand Down Expand Up @@ -47,7 +49,7 @@ type BacklogPool interface {
}

// MakeBacklog creates a backlog
func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}) BacklogPool {
func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, owner interface{}, profLabels ...string) BacklogPool {
if backlogSize < 0 {
return nil
}
Expand All @@ -59,7 +61,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own
bl.ctx, bl.ctxCancel = context.WithCancel(context.Background())
if bl.pool == nil {
// create one internally.
bl.pool = MakePool(bl)
bl.pool = MakePool(bl, append(profLabels, "execpool", "internal")...)
}
if backlogSize == 0 {
// use the number of cpus in the system.
Expand All @@ -68,7 +70,7 @@ func MakeBacklog(execPool ExecutionPool, backlogSize int, priority Priority, own
bl.buffer = make(chan backlogItemTask, backlogSize)

bl.wg.Add(1)
go bl.worker()
go bl.worker(profLabels)
return bl
}

Expand Down Expand Up @@ -129,10 +131,11 @@ func (b *backlog) Shutdown() {
}
}

func (b *backlog) worker() {
func (b *backlog) worker(profLabels []string) {
var t backlogItemTask
var ok bool
defer b.wg.Done()
util.SetGoroutineLabels(profLabels...)

for {

Expand Down
11 changes: 7 additions & 4 deletions util/execpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"runtime"
"sync"

"github.com/algorand/go-algorand/util"
)

// The list of all valid priority values. When adding new ones, add them before numPrios.
Expand Down Expand Up @@ -68,7 +70,7 @@ type enqueuedTask struct {
}

// MakePool creates a pool.
func MakePool(owner interface{}) ExecutionPool {
func MakePool(owner interface{}, profLabels ...string) ExecutionPool {
p := &pool{
inputs: make([]chan enqueuedTask, numPrios),
numCPUs: runtime.NumCPU(),
Expand All @@ -82,9 +84,8 @@ func MakePool(owner interface{}) ExecutionPool {

p.wg.Add(p.numCPUs)
for i := 0; i < p.numCPUs; i++ {
go p.worker()
go p.worker(profLabels)
}

return p
}

Expand Down Expand Up @@ -136,12 +137,14 @@ func (p *pool) Shutdown() {

// worker function blocks until a new task is pending on any of the channels and execute the above task.
// the implementation below would give higher priority for channels that are on higher priority slot.
func (p *pool) worker() {
func (p *pool) worker(profLabels []string) {
var t enqueuedTask
var ok bool
lowPrio := p.inputs[LowPriority]
highPrio := p.inputs[HighPriority]
defer p.wg.Done()
util.SetGoroutineLabels(profLabels...)

for {

select {
Expand Down
7 changes: 7 additions & 0 deletions util/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package util

import (
"context"
"io"
"os"
"os/exec"
"runtime/pprof"
"sync"
"time"
)
Expand Down Expand Up @@ -73,3 +75,8 @@ func ExecAndCaptureOutput(command string, args ...string) (string, string, error

return string(outputStdout), string(outputStderr), err
}

// SetGoroutineLabels sets profiler labels for identifying goroutines using the pprof package.
func SetGoroutineLabels(args ...string) {
pprof.SetGoroutineLabels(pprof.WithLabels(context.Background(), pprof.Labels(args...)))
}

0 comments on commit 63c0d5b

Please sign in to comment.