Skip to content

Commit

Permalink
removed old workers replaced them with the new ones
Browse files Browse the repository at this point in the history
  • Loading branch information
realDragonium committed Jul 11, 2021
1 parent b3050c6 commit 5de4705
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 591 deletions.
2 changes: 0 additions & 2 deletions config/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ func TestReadUltravioletConfigFile(t *testing.T) {
},

NumberOfWorkers: 5,
NumberOfConnWorkers: 1,
NumberOfStatusWorkers: 3,
}
file, _ := json.MarshalIndent(cfg, "", " ")
tmpfile, err := ioutil.TempFile("", "example")
Expand Down
2 changes: 0 additions & 2 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ type UltravioletConfig struct {
DefaultStatus mc.AnotherStatusResponse `json:"defaultStatus"`

NumberOfWorkers int `json:"numberOfWorkers"`
NumberOfConnWorkers int `json:"numberOfConnWorkers"`
NumberOfStatusWorkers int `json:"numberOfStatusWorkers"`

LogOutput io.Writer
}
4 changes: 1 addition & 3 deletions examples/ultraviolet.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@
"protocol": 755,
"text": "Some broken proxy"
},
"numberOfWorkers": 5,
"numberOfConnWorkers": 1,
"numberOfStatusWorkers": 3
"numberOfWorkers": 5
}
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func main() {
log.Fatalf("Something went wrong while reading config files: %v", err)
}
reqCh := make(chan proxy.McRequest)
shouldNotifyCh, notifyCh := proxy.Serve(mainCfg, serverCfgs, reqCh)
gateway := proxy.NewGateway()
gateway.StartWorkers(mainCfg, serverCfgs, reqCh)

log.SetPrefix(fmt.Sprintf("%d ", os.Getpid()))
upg, err := tableflip.New(tableflip.Options{
Expand Down Expand Up @@ -72,8 +73,7 @@ func main() {
panic(err)
}
<-upg.Exit()
shouldNotifyCh <- struct{}{}
log.Println("Waiting for all open connections to close before shutting down")
<-notifyCh
gateway.Shutdown()
log.Println("Shutting down")
}
105 changes: 52 additions & 53 deletions proxy/proxy.go → proxy/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,68 @@ const (
PROXY_CLOSE
)

func NewProxy() Proxy {
return Proxy{
func (action ProxyAction) String() string {
var text string
switch action {
case PROXY_CLOSE:
text = "Proxy Close"
case PROXY_OPEN:
text = "Proxy Open"
}
return text
}

func NewGateway() Gateway {
return Gateway{
NotifyCh: make(chan struct{}),
ShouldNotifyCh: make(chan struct{}),

ProxyCh: make(chan ProxyAction),
serverWorkers: make(map[int]chan gatewayRequest),

proxyCh: make(chan ProxyAction),
wg: &sync.WaitGroup{},
}
}

type Proxy struct {
NotifyCh chan struct{}
ShouldNotifyCh chan struct{}

ProxyCh chan ProxyAction
wg *sync.WaitGroup
type gatewayRequest struct {
ch chan gatewayAnswer
}

func Serve(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig, reqCh chan McRequest) (chan struct{}, chan struct{}) {
p := NewProxy()
go p.manageConnections()
SetupWorkers(cfg, serverCfgs, reqCh, p.ProxyCh)
return p.ShouldNotifyCh, p.NotifyCh
type gatewayAnswer struct {
hasOpenConnections bool
}

func SetupWorkers(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig, reqCh chan McRequest, proxyCh chan ProxyAction) {
connCh := make(chan ConnRequest)
statusCh := make(chan StatusRequest)
type Gateway struct {
NotifyCh chan struct{}
ShouldNotifyCh chan struct{}

if cfg.LogOutput != nil {
log.SetOutput(cfg.LogOutput)
}
serverWorkers map[int]chan gatewayRequest

defaultStatus := cfg.DefaultStatus.Marshal()
workerServerCfgs := make(map[int]WorkerServerConfig)
serverDict := make(map[string]int)
for i, serverCfg := range serverCfgs {
workerServerCfg := FileToWorkerConfig(serverCfg)
workerServerCfgs[i] = workerServerCfg
for _, domain := range serverCfg.Domains {
serverDict[domain] = i
proxyCh chan ProxyAction
wg *sync.WaitGroup
}

func (gw *Gateway) Shutdown() {
for {
activeConns := false
for _, ch := range gw.serverWorkers {
answerCh := make(chan gatewayAnswer)
ch <- gatewayRequest{
ch: answerCh,
}
answer := <-answerCh
if answer.hasOpenConnections {
activeConns = true
}
}
if !activeConns {
return
}
time.Sleep(time.Minute)
}

workerCfg := NewWorkerConfig(reqCh, serverDict, workerServerCfgs, defaultStatus)
workerCfg.ProxyCh = proxyCh
RunBasicWorkers(cfg.NumberOfWorkers, workerCfg, statusCh, connCh)
RunConnWorkers(cfg.NumberOfConnWorkers, connCh, statusCh, workerServerCfgs)
RunStatusWorkers(cfg.NumberOfStatusWorkers, statusCh, connCh, workerServerCfgs)
}

func SetupNewWorkers(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig, reqCh chan McRequest) {
func (gw *Gateway) StartWorkers(cfg config.UltravioletConfig, serverCfgs []config.ServerConfig, reqCh chan McRequest) {
if cfg.LogOutput != nil {
log.SetOutput(cfg.LogOutput)
}
Expand All @@ -80,6 +90,7 @@ func SetupNewWorkers(cfg config.UltravioletConfig, serverCfgs []config.ServerCon
workerServerCfg := FileToWorkerConfig(serverCfg)
privateWorker := NewPrivateWorker(i, workerServerCfg)
privateWorker.reqCh = workerRequestCh
gw.registerPrivateWorker(&privateWorker)
go privateWorker.Work()
for _, domain := range serverCfg.Domains {
serverDict[domain] = i
Expand All @@ -95,30 +106,18 @@ func SetupNewWorkers(cfg config.UltravioletConfig, serverCfgs []config.ServerCon
serverDict: serverDict,
servers: servers,
}

for i := 0; i < cfg.NumberOfWorkers; i++ {
go func(worker PublicWorker){
go func(worker PublicWorker) {
worker.Work()
}(publicWorker)
}

}

func (p *Proxy) manageConnections() {
go func() {
<-p.ShouldNotifyCh
p.wg.Wait()
p.NotifyCh <- struct{}{}
}()

for {
action := <-p.ProxyCh
switch action {
case PROXY_OPEN:
p.wg.Add(1)
case PROXY_CLOSE:
p.wg.Done()
}
}
func (gw *Gateway) registerPrivateWorker(worker *PrivateWorker) {
gatewayCh := make(chan gatewayRequest)
gw.serverWorkers[worker.serverId] = gatewayCh
worker.gatewayCh = gatewayCh
}

func FileToWorkerConfig(cfg config.ServerConfig) WorkerServerConfig {
Expand Down
Loading

0 comments on commit 5de4705

Please sign in to comment.