Skip to content

Commit

Permalink
made statusworker more straight forward
Browse files Browse the repository at this point in the history
  • Loading branch information
realDragonium committed Jul 9, 2021
1 parent daaacea commit e4be29b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 103 deletions.
8 changes: 3 additions & 5 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ func Serve(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig, reqCh
}

func SetupWorkers(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig, reqCh chan McRequest, proxyCh chan ProxyAction) {
stateCh := make(chan StateRequest)
connCh := make(chan ConnRequest)
statusCh := make(chan StatusRequest)
stateUpdateCh := make(chan StateUpdate)

defaultStatus := cfg.DefaultStatus.Marshal()
workerServerCfgs := make(map[string]WorkerServerConfig)
Expand All @@ -58,9 +56,9 @@ func SetupWorkers(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig

workerCfg := NewWorkerConfig(reqCh, workerServerCfgs, defaultStatus)
workerCfg.ProxyCh = proxyCh
RunBasicWorkers(cfg.NumberOfWorkers, workerCfg, statusCh, connCh, stateCh, stateUpdateCh)
RunConnWorkers(cfg.NumberOfConnWorkers, connCh, stateUpdateCh, workerServerCfgs)
RunStatusWorkers(cfg.NumberOfStatusWorkers, statusCh, stateCh, stateUpdateCh, connCh, workerServerCfgs)
RunBasicWorkers(cfg.NumberOfWorkers, workerCfg, statusCh, connCh)
RunConnWorkers(cfg.NumberOfConnWorkers, connCh, statusCh, workerServerCfgs)
RunStatusWorkers(cfg.NumberOfStatusWorkers, statusCh, connCh, workerServerCfgs)
}

func (p *Proxy) manageConnections() {
Expand Down
163 changes: 76 additions & 87 deletions proxy/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,38 +80,35 @@ type WorkerServerConfig struct {
RateLimitDuration time.Duration
}

func RunBasicWorkers(amount int, cfg WorkerConfig, statusCh chan StatusRequest, connCh chan ConnRequest, stateCh chan StateRequest, updateCh chan StateUpdate) {
worker := NewBasicWorker(cfg, statusCh, connCh, stateCh, updateCh)
func RunBasicWorkers(amount int, cfg WorkerConfig, statusCh chan StatusRequest, connCh chan ConnRequest) {
worker := NewBasicWorker(cfg, statusCh, connCh)
for i := 0; i < amount; i++ {
go func(worker BasicWorker) {
worker.Work()
}(worker)
}
}

func NewBasicWorker(cfg WorkerConfig, statusCh chan StatusRequest, connCh chan ConnRequest, stateCh chan StateRequest, updateCh chan StateUpdate) BasicWorker {
func NewBasicWorker(cfg WorkerConfig, statusCh chan StatusRequest, connCh chan ConnRequest) BasicWorker {
return BasicWorker{
reqCh: cfg.ReqCh,
defaultStatus: cfg.DefaultStatus,
servers: cfg.Servers,
ProxyCh: cfg.ProxyCh,

stateCh: stateCh,
statusCh: statusCh,
connCh: connCh,
}
}

func NewWorker(cfg WorkerConfig) BasicWorker {
stateCh := make(chan StateRequest)
connCh := make(chan ConnRequest)
statusCh := make(chan StatusRequest)
stateUpdateCh := make(chan StateUpdate)

connWorker := NewConnWorker(connCh, stateUpdateCh, cfg.Servers)
connWorker := NewConnWorker(connCh, statusCh, cfg.Servers)
go connWorker.Work()

statusWorker := NewStatusWorker(statusCh, stateCh, stateUpdateCh, connCh, cfg.Servers)
statusWorker := NewStatusWorker(statusCh, connCh, cfg.Servers)
go statusWorker.Work()

return BasicWorker{
Expand All @@ -120,7 +117,6 @@ func NewWorker(cfg WorkerConfig) BasicWorker {
servers: cfg.Servers,
ProxyCh: cfg.ProxyCh,

stateCh: stateCh,
statusCh: statusCh,
connCh: connCh,
}
Expand All @@ -132,7 +128,6 @@ type BasicWorker struct {
servers map[string]WorkerServerConfig
ProxyCh chan ProxyAction

stateCh chan StateRequest
statusCh chan StatusRequest
connCh chan ConnRequest
}
Expand All @@ -157,25 +152,27 @@ func (w BasicWorker) Work() {
}
continue
}
stateAnswerCh := make(chan ServerState)
w.stateCh <- StateRequest{
stateAnswerCh := make(chan StatusAnswer)
w.statusCh <- StatusRequest{
ServerId: request.ServerAddr,
Type: STATE_REQUEST,
AnswerCh: stateAnswerCh,
}
state := <-stateAnswerCh
answer := <-stateAnswerCh

if state == OFFLINE {
if answer.State == OFFLINE {
// This need to be modified later when online status cache is being added
if request.Type == STATUS {
statusAnswerCh := make(chan mc.Packet)
statusAnswerCh := make(chan StatusAnswer)
w.statusCh <- StatusRequest{
ServerId: request.ServerAddr,
Type: STATUS_REQUEST,
AnswerCh: statusAnswerCh,
}
statusPk := <-statusAnswerCh
answer := <-statusAnswerCh
request.Ch <- McAnswer{
Action: SEND_STATUS,
StatusPk: statusPk,
StatusPk: answer.Pk,
}
} else if request.Type == LOGIN {
request.Ch <- McAnswer{
Expand Down Expand Up @@ -219,7 +216,7 @@ func (w BasicWorker) Work() {
}
}

func RunConnWorkers(amount int, reqCh chan ConnRequest, updateCh chan StateUpdate, proxies map[string]WorkerServerConfig) {
func RunConnWorkers(amount int, reqCh chan ConnRequest, updateCh chan StatusRequest, proxies map[string]WorkerServerConfig) {
worker := NewConnWorker(reqCh, updateCh, proxies)
for i := 0; i < amount; i++ {
go func(worker ConnWorker) {
Expand All @@ -228,7 +225,7 @@ func RunConnWorkers(amount int, reqCh chan ConnRequest, updateCh chan StateUpdat
}
}

func NewConnWorker(reqCh chan ConnRequest, updateCh chan StateUpdate, proxies map[string]WorkerServerConfig) ConnWorker {
func NewConnWorker(reqCh chan ConnRequest, updateCh chan StatusRequest, proxies map[string]WorkerServerConfig) ConnWorker {
servers := make(map[string]*ConnectionData)
for id, proxy := range proxies {
servers[id] = &ConnectionData{
Expand Down Expand Up @@ -301,16 +298,16 @@ func (w ConnWorker) Work() {
}
}

func RunStatusWorkers(amount int, reqCh chan StatusRequest, stateCh chan StateRequest, updateCh chan StateUpdate, connCh chan ConnRequest, proxies map[string]WorkerServerConfig) {
stateWorker := NewStatusWorker(reqCh, stateCh, updateCh, connCh, proxies)
func RunStatusWorkers(amount int, reqCh chan StatusRequest, connCh chan ConnRequest, proxies map[string]WorkerServerConfig) {
stateWorker := NewStatusWorker(reqCh, connCh, proxies)
for i := 0; i < amount; i++ {
go func(worker StatusWorker) {
stateWorker.Work()
}(stateWorker)
}
}

func NewStatusWorker(reqStatusCh chan StatusRequest, stateCh chan StateRequest, updateCh chan StateUpdate, connCh chan ConnRequest, proxies map[string]WorkerServerConfig) StatusWorker {
func NewStatusWorker(reqStatusCh chan StatusRequest, connCh chan ConnRequest, proxies map[string]WorkerServerConfig) StatusWorker {
servers := make(map[string]*ServerData)
for id, proxy := range proxies {
cooldown := proxy.StateUpdateCooldown
Expand All @@ -326,27 +323,30 @@ func NewStatusWorker(reqStatusCh chan StatusRequest, stateCh chan StateRequest,
}

return StatusWorker{
reqConnCh: connCh,
reqStateCh: stateCh,
reqStatusCh: reqStatusCh,
updateStateCh: updateCh,
serverData: servers,
reqConnCh: connCh,
reqCh: reqStatusCh,
serverData: servers,
}
}

type StateUpdate struct {
ServerId string
State ServerState
}
type StatusReqType byte

type StateRequest struct {
ServerId string
AnswerCh chan ServerState
}
const (
STATE_UPDATE StatusReqType = iota + 1
STATE_REQUEST
STATUS_REQUEST
)

type StatusRequest struct {
ServerId string
AnswerCh chan mc.Packet
State ServerState
Type StatusReqType
AnswerCh chan StatusAnswer
}

type StatusAnswer struct {
Pk mc.Packet
State ServerState
}

type ServerData struct {
Expand All @@ -361,72 +361,61 @@ type ServerData struct {
// - check or state is still valid or expired before replying
// - add online status caching when doesnt proxy client requests to actual server
type StatusWorker struct {
reqConnCh chan ConnRequest

reqStatusCh chan StatusRequest
reqStateCh chan StateRequest
updateStateCh chan StateUpdate

reqConnCh chan ConnRequest
reqCh chan StatusRequest
serverData map[string]*ServerData
}

func (w *StatusWorker) Work() {
for {
select {
case request := <-w.reqStatusCh:
data := w.serverData[request.ServerId]
if data.State == UNKNOWN {
w.UpdateState(request.ServerId)
data = w.serverData[request.ServerId]
request := <-w.reqCh
data := w.serverData[request.ServerId]
if request.Type == STATE_UPDATE {
data.State = request.State
w.serverData[request.ServerId] = data
continue
}
if data.State == UNKNOWN {
connAnswerCh := make(chan func() (net.Conn, error))
w.reqConnCh <- ConnRequest{
serverId: request.ServerId,
answerCh: connAnswerCh,
}
connFunc := <-connAnswerCh
_, err := connFunc()
if err != nil {
data.State = OFFLINE
} else {
data.State = ONLINE
}
w.serverData[request.ServerId] = data
go func(serverId string, sleepTime time.Duration, updateCh chan StatusRequest) {
time.Sleep(sleepTime)
updateCh <- StatusRequest{
ServerId: serverId,
Type: STATE_UPDATE,
State: UNKNOWN,
}
}(request.ServerId, data.StateUpdateCooldown, w.reqCh)
}

switch request.Type {
case STATUS_REQUEST:
var statusPk mc.Packet
switch data.State {
case ONLINE:
statusPk = data.OnlineStatus
case OFFLINE:
statusPk = data.OfflineStatus
}
request.AnswerCh <- statusPk
case request := <-w.reqStateCh:
data := w.serverData[request.ServerId]
if data.State == UNKNOWN {
w.UpdateState(request.ServerId)
data = w.serverData[request.ServerId]
request.AnswerCh <- StatusAnswer{
Pk: statusPk,
}
request.AnswerCh <- data.State
case update := <-w.updateStateCh:
data := w.serverData[update.ServerId]
if update.State == UPDATE {
w.UpdateState(update.ServerId)
data = w.serverData[update.ServerId]
} else {
data.State = update.State
case STATE_REQUEST:
request.AnswerCh <- StatusAnswer{
State: data.State,
}
w.serverData[update.ServerId] = data
}
}
}

func (w *StatusWorker) UpdateState(serverId string) {
data := w.serverData[serverId]
connAnswerCh := make(chan func() (net.Conn, error))
w.reqConnCh <- ConnRequest{
serverId: serverId,
answerCh: connAnswerCh,
}
connFunc := <-connAnswerCh
_, err := connFunc()
if err != nil {
data.State = OFFLINE
} else {
data.State = ONLINE
}
w.serverData[serverId] = data
go func(serverId string, sleepTime time.Duration, updateCh chan StateUpdate) {
time.Sleep(sleepTime)
updateCh <- StateUpdate{
ServerId: serverId,
State: UNKNOWN,
}
}(serverId, data.StateUpdateCooldown, w.updateStateCh)
}
}
21 changes: 10 additions & 11 deletions proxy/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func defaultOfflineStatusPacket() mc.Packet {

var port *int16
var portLock sync.Mutex = sync.Mutex{}

// To make sure every test gets its own unique port
func testAddr() string {
portLock.Lock()
Expand Down Expand Up @@ -712,34 +713,32 @@ func TestStatusWorker_ShareServerData(t *testing.T) {
State: proxy.UNKNOWN,
StateUpdateCooldown: cooldown,
}
stateCh := make(chan proxy.StateRequest)
connCh := make(chan proxy.ConnRequest)
statusCh := make(chan proxy.StatusRequest)
stateUpdateCh := make(chan proxy.StateUpdate)
proxy.RunConnWorkers(1, connCh, stateUpdateCh, servers)
proxy.RunStatusWorkers(2, statusCh, stateCh, stateUpdateCh, connCh, servers)
proxy.RunConnWorkers(1, connCh, statusCh, servers)
proxy.RunStatusWorkers(2, statusCh, connCh, servers)

answerCh := make(chan proxy.ServerState)
stateCh <- proxy.StateRequest{
answerCh := make(chan proxy.StatusAnswer)
statusCh <- proxy.StatusRequest{
ServerId: serverAddr,
Type: proxy.STATE_REQUEST,
AnswerCh: answerCh,
}
time.Sleep(defaultChTimeout)
answerCh2 := make(chan proxy.ServerState)
stateCh <- proxy.StateRequest{
answerCh2 := make(chan proxy.StatusAnswer)
statusCh <- proxy.StatusRequest{
ServerId: serverAddr,
Type: proxy.STATE_REQUEST,
AnswerCh: answerCh2,
}

select {
case answer := <-answerCh2:
t.Log("worker has successfully responded")
if answer != proxy.OFFLINE {
if answer.State != proxy.OFFLINE {
t.Errorf("expected: %v got: %v", proxy.OFFLINE, answer)
}
case <-time.After(defaultChTimeout):
t.Error("timed out")
}
}


0 comments on commit e4be29b

Please sign in to comment.