Skip to content

Commit

Permalink
chanfitness: switch to query by channel outpoint
Browse files Browse the repository at this point in the history
In this commit, the channelEventStore in the channel
fitness subsystem is changed to identify channels
by their outpoint rather than short channel id. This
change is made made becuase outpoints are the preferred
way to expose references over rpc, and easier to perform
queries within lnd.
  • Loading branch information
carlaKC committed Dec 17, 2019
1 parent 47e700b commit 4f9795e
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 72 deletions.
17 changes: 10 additions & 7 deletions chanfitness/chanevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"time"

"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/routing/route"
)

Expand Down Expand Up @@ -36,8 +37,8 @@ type channelEvent struct {

// chanEventLog stores all events that have occurred over a channel's lifetime.
type chanEventLog struct {
// id is the uint64 of the short channel ID.
id uint64
// channelPoint is the outpoint for the channel's funding transaction.
channelPoint wire.OutPoint

// peer is the compressed public key of the peer being monitored.
peer route.Vertex
Expand All @@ -59,11 +60,13 @@ type chanEventLog struct {
closedAt time.Time
}

func newEventLog(id uint64, peer route.Vertex, now func() time.Time) *chanEventLog {
func newEventLog(outpoint wire.OutPoint, peer route.Vertex,
now func() time.Time) *chanEventLog {

return &chanEventLog{
id: id,
peer: peer,
now: now,
channelPoint: outpoint,
peer: peer,
now: now,
}
}

Expand Down Expand Up @@ -95,7 +98,7 @@ func (e *chanEventLog) add(eventType eventType) {
e.openedAt = event.timestamp
}

log.Debugf("Channel %v recording event: %v", e.id, eventType)
log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType)
}

// onlinePeriod represents a period of time over which a peer was online.
Expand Down
51 changes: 27 additions & 24 deletions chanfitness/chaneventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/peernotifier"
Expand All @@ -37,8 +38,8 @@ var (
type ChannelEventStore struct {
cfg *Config

// channels maps short channel IDs to event logs.
channels map[uint64]*chanEventLog
// channels maps channel points to event logs.
channels map[wire.OutPoint]*chanEventLog

// peers tracks the current online status of peers based on online/offline
// events.
Expand Down Expand Up @@ -76,7 +77,7 @@ type Config struct {
// channel's lifespan and a blocking response channel on which the result is
// sent.
type lifespanRequest struct {
channelID uint64
channelPoint wire.OutPoint
responseChan chan lifespanResponse
}

Expand All @@ -91,7 +92,7 @@ type lifespanResponse struct {
// uptimeRequest contains the parameters required to query the store for a
// channel's uptime and a blocking response channel on which the result is sent.
type uptimeRequest struct {
channelID uint64
channelPoint wire.OutPoint
startTime time.Time
endTime time.Time
responseChan chan uptimeResponse
Expand All @@ -110,7 +111,7 @@ type uptimeResponse struct {
func NewChannelEventStore(config *Config) *ChannelEventStore {
store := &ChannelEventStore{
cfg: config,
channels: make(map[uint64]*chanEventLog),
channels: make(map[wire.OutPoint]*chanEventLog),
peers: make(map[route.Vertex]bool),
lifespanRequests: make(chan lifespanRequest),
uptimeRequests: make(chan uptimeRequest),
Expand Down Expand Up @@ -167,7 +168,7 @@ func (c *ChannelEventStore) Start() error {

// Add existing channels to the channel store with an initial peer
// online or offline event.
c.addChannel(ch.ShortChanID().ToUint64(), peerKey)
c.addChannel(ch.FundingOutpoint, peerKey)
}

// Start a goroutine that consumes events from all subscriptions.
Expand Down Expand Up @@ -196,15 +197,17 @@ func (c *ChannelEventStore) Stop() {
// already present in the map, the function returns early. This function should
// be called to add existing channels on startup and when open channel events
// are observed.
func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) {
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
peer route.Vertex) {

// Check for the unexpected case where the channel is already in the store.
_, ok := c.channels[channelID]
_, ok := c.channels[channelPoint]
if ok {
log.Errorf("Channel %v duplicated in channel store", channelID)
log.Errorf("Channel %v duplicated in channel store", channelPoint)
return
}

eventLog := newEventLog(channelID, peer, time.Now)
eventLog := newEventLog(channelPoint, peer, time.Now)

// If the peer is online, add a peer online event to indicate its starting
// state.
Expand All @@ -213,16 +216,16 @@ func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) {
eventLog.add(peerOnlineEvent)
}

c.channels[channelID] = eventLog
c.channels[channelPoint] = eventLog
}

// closeChannel records a closed time for a channel, and returns early is the
// channel is not known to the event store.
func (c *ChannelEventStore) closeChannel(channelID uint64) {
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint) {
// Check for the unexpected case where the channel is unknown to the store.
eventLog, ok := c.channels[channelID]
eventLog, ok := c.channels[channelPoint]
if !ok {
log.Errorf("Close channel %v unknown to store", channelID)
log.Errorf("Close channel %v unknown to store", channelPoint)
return
}

Expand Down Expand Up @@ -265,8 +268,6 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
// A new channel has been opened, we must add the channel to the
// store and record a channel open event.
case channelnotifier.OpenChannelEvent:
chanID := event.Channel.ShortChanID().ToUint64()

peerKey, err := route.NewVertexFromBytes(
event.Channel.IdentityPub.SerializeCompressed(),
)
Expand All @@ -275,12 +276,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
event.Channel.IdentityPub.SerializeCompressed())
}

c.addChannel(chanID, peerKey)
c.addChannel(event.Channel.FundingOutpoint, peerKey)

// A channel has been closed, we must remove the channel from the
// store and record a channel closed event.
case channelnotifier.ClosedChannelEvent:
c.closeChannel(event.CloseSummary.ShortChanID.ToUint64())
c.closeChannel(event.CloseSummary.ChanPoint)
}

// Process peer online and offline events.
Expand All @@ -301,7 +302,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
case req := <-c.lifespanRequests:
var resp lifespanResponse

channel, ok := c.channels[req.channelID]
channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
Expand All @@ -315,7 +316,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
case req := <-c.uptimeRequests:
var resp uptimeResponse

channel, ok := c.channels[req.channelID]
channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
Expand All @@ -336,9 +337,11 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
// GetLifespan returns the opening and closing time observed for a channel and
// a boolean to indicate whether the channel is known the the event store. If
// the channel is still open, a zero close time is returned.
func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, error) {
func (c *ChannelEventStore) GetLifespan(
channelPoint wire.OutPoint) (time.Time, time.Time, error) {

request := lifespanRequest{
channelID: chanID,
channelPoint: channelPoint,
responseChan: make(chan lifespanResponse),
}

Expand All @@ -364,11 +367,11 @@ func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, er

// GetUptime returns the uptime of a channel over a period and an error if the
// channel cannot be found or the uptime calculation fails.
func (c *ChannelEventStore) GetUptime(chanID uint64, startTime,
func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime,
endTime time.Time) (time.Duration, error) {

request := uptimeRequest{
channelID: chanID,
channelPoint: channelPoint,
startTime: startTime,
endTime: endTime,
responseChan: make(chan uptimeResponse),
Expand Down
Loading

0 comments on commit 4f9795e

Please sign in to comment.