Skip to content

Commit

Permalink
Refactor storage to support shallow types.
Browse files Browse the repository at this point in the history
Add Client to storage.
Fix client resizing issues.
  • Loading branch information
xetorthio committed Sep 1, 2017
1 parent 4b00a9c commit 954c524
Show file tree
Hide file tree
Showing 25 changed files with 819 additions and 377 deletions.
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
s := initStorage()
f := initFactory(s)

ipf := provisioner.NewInstanceProvisionerFactory(provisioner.NewWindowsASG(f, s), provisioner.NewDinD(f))
ipf := provisioner.NewInstanceProvisionerFactory(provisioner.NewWindowsASG(f, s), provisioner.NewDinD(f, s))
sp := provisioner.NewOverlaySessionProvisioner(f)

core := pwd.NewPWD(f, e, s, sp, ipf)
Expand Down
21 changes: 19 additions & 2 deletions handlers/get_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,38 @@ package handlers

import (
"encoding/json"
"log"
"net/http"

"github.com/gorilla/mux"
"github.com/play-with-docker/play-with-docker/pwd/types"
)

type SessionInfo struct {
*types.Session
Instances map[string]*types.Instance `json:"instances"`
}

func GetSession(rw http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
sessionId := vars["sessionId"]

session := core.SessionGet(sessionId)

if session == nil {
rw.WriteHeader(http.StatusNotFound)
return
}

json.NewEncoder(rw).Encode(session)
instances, err := core.InstanceFindBySession(session)
if err != nil {
log.Println(err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
is := map[string]*types.Instance{}
for _, i := range instances {
is[i.Name] = i
}

json.NewEncoder(rw).Encode(SessionInfo{session, is})
}
10 changes: 5 additions & 5 deletions handlers/new_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/gorilla/mux"
"github.com/play-with-docker/play-with-docker/pwd"
"github.com/play-with-docker/play-with-docker/pwd/types"
)

Expand All @@ -19,13 +20,12 @@ func NewInstance(rw http.ResponseWriter, req *http.Request) {

s := core.SessionGet(sessionId)

if len(s.Instances) >= 5 {
rw.WriteHeader(http.StatusConflict)
return
}

i, err := core.InstanceNew(s, body)
if err != nil {
if pwd.SessionComplete(err) {
rw.WriteHeader(http.StatusConflict)
return
}
log.Println(err)
rw.WriteHeader(http.StatusInternalServerError)
return
Expand Down
13 changes: 6 additions & 7 deletions handlers/session_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ func SessionSetup(rw http.ResponseWriter, req *http.Request) {

s := core.SessionGet(sessionId)

if len(s.Instances) > 0 {
log.Println("Cannot setup a session that contains instances")
rw.WriteHeader(http.StatusConflict)
rw.Write([]byte("Cannot setup a session that contains instances"))
return
}

s.Host = req.Host
err := core.SessionSetup(s, body)
if err != nil {
if pwd.SessionNotEmpty(err) {
log.Println("Cannot setup a session that contains instances")
rw.WriteHeader(http.StatusConflict)
rw.Write([]byte("Cannot setup a session that contains instances"))
return
}
log.Println(err)
rw.WriteHeader(http.StatusInternalServerError)
return
Expand Down
9 changes: 7 additions & 2 deletions handlers/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ func WS(so socketio.Socket) {

so.Join(session.Id)

instances, err := core.InstanceFindBySession(session)
if err != nil {
log.Printf("Couldn't find instances for session with id [%s]. Got: %v\n", sessionId, err)
return
}
var rw sync.Mutex
trackedTerminals := make(map[string]net.Conn, len(session.Instances))
trackedTerminals := make(map[string]net.Conn, len(instances))

attachTerminalToSocket := func(instance *types.Instance, ws socketio.Socket) {
rw.Lock()
Expand Down Expand Up @@ -73,7 +78,7 @@ func WS(so socketio.Socket) {
}(instance.Name, conn, ws)
}
// since this is a new connection, get all terminals of the session and attach
for _, instance := range session.Instances {
for _, instance := range instances {
attachTerminalToSocket(instance, so)
}

Expand Down
19 changes: 12 additions & 7 deletions provisioner/dind.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ import (
"github.com/play-with-docker/play-with-docker/docker"
"github.com/play-with-docker/play-with-docker/pwd/types"
"github.com/play-with-docker/play-with-docker/router"
"github.com/play-with-docker/play-with-docker/storage"
)

type DinD struct {
factory docker.FactoryApi
storage storage.StorageApi
}

func NewDinD(f docker.FactoryApi) *DinD {
return &DinD{factory: f}
func NewDinD(f docker.FactoryApi, s storage.StorageApi) *DinD {
return &DinD{factory: f, storage: s}
}

func checkHostnameExists(session *types.Session, hostname string) bool {
containerName := fmt.Sprintf("%s_%s", session.Id[:8], hostname)
func checkHostnameExists(sessionId, hostname string, instances []*types.Instance) bool {
containerName := fmt.Sprintf("%s_%s", sessionId[:8], hostname)
exists := false
for _, instance := range session.Instances {
for _, instance := range instances {
if instance.Name == containerName {
exists = true
break
Expand All @@ -42,10 +44,14 @@ func (d *DinD) InstanceNew(session *types.Session, conf types.InstanceConfig) (*
}
log.Printf("NewInstance - using image: [%s]\n", conf.ImageName)
if conf.Hostname == "" {
instances, err := d.storage.InstanceFindBySessionId(session.Id)
if err != nil {
return nil, err
}
var nodeName string
for i := 1; ; i++ {
nodeName = fmt.Sprintf("node%d", i)
exists := checkHostnameExists(session, nodeName)
exists := checkHostnameExists(session.Id, nodeName, instances)
if !exists {
break
}
Expand Down Expand Up @@ -92,7 +98,6 @@ func (d *DinD) InstanceNew(session *types.Session, conf types.InstanceConfig) (*
instance.ServerCert = conf.ServerCert
instance.ServerKey = conf.ServerKey
instance.CACert = conf.CACert
instance.Session = session
instance.ProxyHost = router.EncodeHost(session.Id, instance.RoutableIP, router.HostOpts{})
instance.SessionHost = session.Host

Expand Down
25 changes: 14 additions & 11 deletions provisioner/windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ func (d *windows) InstanceNew(session *types.Session, conf types.InstanceConfig)
}

if conf.Hostname == "" {
instances, err := d.storage.InstanceFindBySessionId(session.Id)
if err != nil {
return nil, err
}
var nodeName string
for i := 1; ; i++ {
nodeName = fmt.Sprintf("node%d", i)
exists := checkHostnameExists(session, nodeName)
exists := checkHostnameExists(session.Id, nodeName, instances)
if !exists {
break
}
Expand All @@ -87,11 +91,11 @@ func (d *windows) InstanceNew(session *types.Session, conf types.InstanceConfig)

dockerClient, err := d.factory.GetForSession(session.Id)
if err != nil {
d.releaseInstance(session.Id, winfo.id)
d.releaseInstance(winfo.id)
return nil, err
}
if err = dockerClient.CreateContainer(opts); err != nil {
d.releaseInstance(session.Id, winfo.id)
d.releaseInstance(winfo.id)
return nil, err
}

Expand All @@ -108,7 +112,6 @@ func (d *windows) InstanceNew(session *types.Session, conf types.InstanceConfig)
instance.ServerCert = conf.ServerCert
instance.ServerKey = conf.ServerKey
instance.CACert = conf.CACert
instance.Session = session
instance.ProxyHost = router.EncodeHost(session.Id, instance.RoutableIP, router.HostOpts{})
instance.SessionHost = session.Host

Expand Down Expand Up @@ -167,11 +170,11 @@ func (d *windows) InstanceDelete(session *types.Session, instance *types.Instanc
return err
}

return d.releaseInstance(session.Id, instance.WindowsId)
return d.releaseInstance(instance.WindowsId)
}

func (d *windows) releaseInstance(sessionId, instanceId string) error {
return d.storage.InstanceDeleteWindows(sessionId, instanceId)
func (d *windows) releaseInstance(instanceId string) error {
return d.storage.WindowsInstanceDelete(instanceId)
}

func (d *windows) InstanceResizeTerminal(instance *types.Instance, rows, cols uint) error {
Expand Down Expand Up @@ -222,10 +225,10 @@ func (d *windows) getWindowsInstanceInfo(sessionId string) (*instanceInfo, error
}
}

assignedInstances, err := d.storage.InstanceGetAllWindows()
assignedInstances, err := d.storage.WindowsInstanceGetAll()
assignedInstancesIds := []string{}
for _, ai := range assignedInstances {
assignedInstancesIds = append(assignedInstancesIds, ai.ID)
assignedInstancesIds = append(assignedInstancesIds, ai.Id)
}

if err != nil {
Expand All @@ -243,7 +246,7 @@ func (d *windows) getWindowsInstanceInfo(sessionId string) (*instanceInfo, error
})
if err != nil {
// TODO retry x times and free the instance that was picked?
d.releaseInstance(sessionId, avInstanceId)
d.releaseInstance(avInstanceId)
return nil, err
}

Expand Down Expand Up @@ -273,7 +276,7 @@ func (d *windows) pickFreeInstance(sessionId string, availInstances, assignedIns
}

if !found {
err := d.storage.InstanceCreateWindows(&types.WindowsInstance{SessionId: sessionId, ID: av})
err := d.storage.WindowsInstancePut(&types.WindowsInstance{SessionId: sessionId, Id: av})
if err != nil {
// TODO either storage error or instance is already assigned (race condition)
}
Expand Down
50 changes: 29 additions & 21 deletions pwd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pwd

import (
"log"
"sync/atomic"
"time"

"github.com/play-with-docker/play-with-docker/event"
Expand All @@ -11,9 +10,10 @@ import (

func (p *pwd) ClientNew(id string, session *types.Session) *types.Client {
defer observeAction("ClientNew", time.Now())
c := &types.Client{Id: id, Session: session}
session.Clients = append(session.Clients, c)
p.clientCount = atomic.AddInt32(&p.clientCount, 1)
c := &types.Client{Id: id, SessionId: session.Id}
if err := p.storage.ClientPut(c); err != nil {
log.Println("Error saving client", err)
}
return c
}

Expand All @@ -22,38 +22,46 @@ func (p *pwd) ClientResizeViewPort(c *types.Client, cols, rows uint) {
c.ViewPort.Rows = rows
c.ViewPort.Cols = cols

p.notifyClientSmallestViewPort(c.Session)
if err := p.storage.ClientPut(c); err != nil {
log.Println("Error saving client", err)
return
}
p.notifyClientSmallestViewPort(c.SessionId)
}

func (p *pwd) ClientClose(client *types.Client) {
defer observeAction("ClientClose", time.Now())
// Client has disconnected. Remove from session and recheck terminal sizes.
session := client.Session
for i, cl := range session.Clients {
if cl.Id == client.Id {
session.Clients = append(session.Clients[:i], session.Clients[i+1:]...)
p.clientCount = atomic.AddInt32(&p.clientCount, -1)
break
}
}
if len(session.Clients) > 0 {
p.notifyClientSmallestViewPort(session)
if err := p.storage.ClientDelete(client.Id); err != nil {
log.Println("Error deleting client", err)
return
}
p.setGauges()
p.notifyClientSmallestViewPort(client.SessionId)
}

func (p *pwd) ClientCount() int {
return int(atomic.LoadInt32(&p.clientCount))
count, err := p.storage.ClientCount()
if err != nil {
log.Println("Error counting clients", err)
return 0
}
return count
}

func (p *pwd) notifyClientSmallestViewPort(session *types.Session) {
vp := p.SessionGetSmallestViewPort(session)
func (p *pwd) notifyClientSmallestViewPort(sessionId string) {
instances, err := p.storage.InstanceFindBySessionId(sessionId)
if err != nil {
log.Printf("Error finding instances for session [%s]. Got: %v\n", sessionId, err)
return
}

vp := p.SessionGetSmallestViewPort(sessionId)
// Resize all terminals in the session
p.event.Emit(event.INSTANCE_VIEWPORT_RESIZE, session.Id, vp.Cols, vp.Rows)
for _, instance := range session.Instances {
for _, instance := range instances {
err := p.InstanceResizeTerminal(instance, vp.Rows, vp.Cols)
if err != nil {
log.Println("Error resizing terminal", err)
}
}
p.event.Emit(event.INSTANCE_VIEWPORT_RESIZE, sessionId, vp.Cols, vp.Rows)
}
Loading

0 comments on commit 954c524

Please sign in to comment.