Skip to content

Commit

Permalink
added state acquire code when state of the server is unknown (default…
Browse files Browse the repository at this point in the history
… value)
  • Loading branch information
realDragonium committed Jul 8, 2021
1 parent 5731823 commit 0154d1b
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 69 deletions.
31 changes: 22 additions & 9 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,38 @@ func NewProxy() Proxy {
NotifyCh: make(chan struct{}),
ShouldNotifyCh: make(chan struct{}),

closedProxy: make(chan struct{}),
openedProxy: make(chan struct{}),
wg: &sync.WaitGroup{},
ProxyCh: make(chan ProxyAction),
wg: &sync.WaitGroup{},
}
}

type Proxy struct {
NotifyCh chan struct{}
ShouldNotifyCh chan struct{}

closedProxy chan struct{}
openedProxy chan struct{}
wg *sync.WaitGroup
ProxyCh chan ProxyAction
wg *sync.WaitGroup
}

func Serve(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig, reqCh chan McRequest) (chan struct{}, chan struct{}) {
p := NewProxy()
go p.manageConnections()
// go p.backend()

defaultStatus := cfg.DefaultStatus.Marshal()
workerServerCfgs := make(map[string]WorkerServerConfig)
for _, serverCfg := range serverCfgs {
workerServerCfg := FileToWorkerConfig(serverCfg)
workerServerCfgs[serverCfg.MainDomain] = workerServerCfg
for _, extraDomains := range serverCfg.ExtraDomains {
workerServerCfgs[extraDomains] = workerServerCfg
}
}

workerCfg := NewWorkerConfig(reqCh, workerServerCfgs, defaultStatus)
worker := NewWorker(workerCfg)
go worker.Work()

return p.ShouldNotifyCh, p.NotifyCh
}

Expand Down Expand Up @@ -88,10 +100,11 @@ func (p *Proxy) manageConnections() {
}()

for {
select {
case <-p.openedProxy:
action := <-p.ProxyCh
switch action {
case PROXY_OPEN:
p.wg.Add(1)
case <-p.closedProxy:
case PROXY_CLOSE:
p.wg.Done()
}
}
Expand Down
151 changes: 91 additions & 60 deletions proxy/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var (
type ServerState byte

const (
ONLINE ServerState = iota
UNKNOWN ServerState = iota
ONLINE
OFFLINE
)

Expand Down Expand Up @@ -93,17 +94,16 @@ type WorkerServerConfig struct {

func NewWorker(cfg WorkerConfig) BasicWorker {
stateCh := make(chan StateRequest)
stateWorker := NewStateWorker(stateCh, cfg.Servers)
go stateWorker.Work()

statusCh := make(chan StatusRequest)
statusWorker := NewStatusWorker(statusCh, cfg.Servers)
go statusWorker.Work()

// stateWorker := NewStateWorker(stateCh, cfg.Servers)
// go stateWorker.Work()
connCh := make(chan ConnRequest)
connWorker := NewConnWorker(connCh, cfg.Servers)
go connWorker.Work()

statusCh := make(chan StatusRequest)
statusWorker := NewStatusWorker(statusCh, stateCh, connCh, cfg.Servers)
go statusWorker.Work()

return BasicWorker{
reqCh: cfg.ReqCh,
defaultStatus: cfg.DefaultStatus,
Expand Down Expand Up @@ -147,20 +147,20 @@ func (w BasicWorker) Work() {
}
return
}
stateAnswerCh := make(chan StateServerData)
stateAnswerCh := make(chan ServerState)
w.stateCh <- StateRequest{
serverId: request.ServerAddr,
answerCh: stateAnswerCh,
}
stateData := <-stateAnswerCh
state := <-stateAnswerCh

if stateData.state == OFFLINE {
if state == OFFLINE {
// This need to be modified later when online status cache is being added
if request.Type == STATUS {
statusAnswerCh := make(chan mc.Packet)
w.statusCh <- StatusRequest{
serverId: request.ServerAddr,
state: stateData.state,
state: state,
answerCh: statusAnswerCh,
}
statusPk := <-statusAnswerCh
Expand All @@ -178,9 +178,8 @@ func (w BasicWorker) Work() {
}
connAnswerCh := make(chan func() (net.Conn, error))
w.connCh <- ConnRequest{
clientAddr: request.Addr,
serverId: request.ServerAddr,
answerCh: connAnswerCh,
serverId: request.ServerAddr,
answerCh: connAnswerCh,
}
connFunc := <-connAnswerCh
netConn, err := connFunc()
Expand Down Expand Up @@ -232,9 +231,8 @@ func NewConnWorker(reqCh chan ConnRequest, proxies map[string]WorkerServerConfig
}

type ConnRequest struct {
answerCh chan func() (net.Conn, error)
clientAddr net.Addr
serverId string
answerCh chan func() (net.Conn, error)
serverId string
}

type ConnectionData struct {
Expand Down Expand Up @@ -286,18 +284,23 @@ func (w ConnWorker) Work() {
}
}

func NewStatusWorker(reqCh chan StatusRequest, proxies map[string]WorkerServerConfig) StatusWorker {
func NewStatusWorker(reqCh chan StatusRequest, stateCh chan StateRequest, connCh chan ConnRequest, proxies map[string]WorkerServerConfig) StatusWorker {
servers := make(map[string]StatusServerData)
servers2 := make(map[string]ServerState)
for id, proxy := range proxies {
servers[id] = StatusServerData{
OfflineStatus: proxy.OfflineStatus,
OnlineStatus: mc.Packet{},
}

servers2[id] = proxy.State
}

return StatusWorker{
reqCh: reqCh,
servers: servers,
reqConnCh: connCh,
reqStateCh: stateCh,
reqStatusCh: reqCh,
statusServers: servers,
stateServers: servers2,
}
}

Expand All @@ -315,59 +318,87 @@ type StatusServerData struct {
// TODO:
// - add online status caching when doesnt proxy client requests to actual server
type StatusWorker struct {
reqCh chan StatusRequest
servers map[string]StatusServerData
reqConnCh chan ConnRequest

reqStatusCh chan StatusRequest
reqStateCh chan StateRequest
statusServers map[string]StatusServerData
stateServers map[string]ServerState
}

func (w StatusWorker) Work() {
for {
request := <-w.reqCh
cfg := w.servers[request.serverId]
var statusPk mc.Packet
switch request.state {
case ONLINE:
statusPk = cfg.OnlineStatus
case OFFLINE:
statusPk = cfg.OfflineStatus
select {
case request := <-w.reqStatusCh:
cfg := w.statusServers[request.serverId]
var statusPk mc.Packet
switch request.state {
case ONLINE:
statusPk = cfg.OnlineStatus
case OFFLINE:
statusPk = cfg.OfflineStatus
}
request.answerCh <- statusPk
case request := <-w.reqStateCh:
state := w.stateServers[request.serverId]
if state == UNKNOWN {
connAnswerCh := make(chan func() (net.Conn, error))
w.reqConnCh <- ConnRequest{
serverId: request.serverId,
answerCh: connAnswerCh,
}
connFunc := <-connAnswerCh
_, err := connFunc()
if err != nil {
state = OFFLINE
} else {
state = ONLINE
}
w.stateServers[request.serverId] = state
}
request.answerCh <- state
}
request.answerCh <- statusPk
}
}

func NewStateWorker(reqCh chan StateRequest, proxies map[string]WorkerServerConfig) StateWorker {
servers := make(map[string]StateServerData)
for id, proxy := range proxies {
servers[id] = StateServerData{
state: proxy.State,
}
// func NewStateWorker(reqCh chan StateRequest, proxies map[string]WorkerServerConfig) StateWorker {
// servers := make(map[string]StateServerData)
// for id, proxy := range proxies {
// servers[id] = StateServerData{
// state: proxy.State,
// }

}
return StateWorker{
reqCh: reqCh,
servers: servers,
}
}
// }
// return StateWorker{
// reqCh: reqCh,
// servers: servers,
// }
// }

type StateRequest struct {
serverId string
answerCh chan StateServerData
answerCh chan ServerState
}

type StateServerData struct {
state ServerState
}

// TODO:
// - automatically update state every x amount of time
// - check or state is still valid or expired before replying
type StateWorker struct {
reqCh chan StateRequest
servers map[string]StateServerData
}

func (w StateWorker) Work() {
for {
request := <-w.reqCh
request.answerCh <- w.servers[request.serverId]
}
}
// // TODO:
// // - automatically update state every x amount of time
// // - check or state is still valid or expired before replying
// type StateWorker struct {
// reqCh chan StateRequest
// servers map[string]StateServerData
// }

// func (w StateWorker) Work() {
// for {
// request := <-w.reqCh
// data := w.servers[request.serverId]
// if data.state == UNKNOWN {

// }
// request.answerCh <- data.state
// }
// }
Loading

0 comments on commit 0154d1b

Please sign in to comment.