Skip to content

Commit

Permalink
Add a resource fit scheduler predicate. Set sensible defaults.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Oct 3, 2014
1 parent 90800bd commit 1551b48
Show file tree
Hide file tree
Showing 25 changed files with 280 additions and 103 deletions.
11 changes: 11 additions & 0 deletions cmd/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"strings"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/golang/glog"
Expand All @@ -52,6 +54,9 @@ var (
machineList util.StringList
corsAllowedOriginList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.")
// TODO: Discover these by pinging the host machines, and rip out these flags.
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
)

func init() {
Expand Down Expand Up @@ -150,6 +155,12 @@ func main() {
MinionCacheTTL: *minionCacheTTL,
MinionRegexp: *minionRegexp,
PodInfoGetter: podInfoGetter,
NodeResources: api.NodeResources{
Capacity: api.ResourceList{
resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU),
resources.Memory: util.NewIntOrStringFromInt(*nodeMemory),
},
},
})

mux := http.NewServeMux()
Expand Down
1 change: 1 addition & 0 deletions examples/guestbook/frontend-controller.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"containers": [{
"name": "php-redis",
"image": "brendanburns/php-redis",
"memory": 10000000,
"ports": [{"containerPort": 80, "hostPort": 8000}]
}]
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/ec2"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)

Expand Down Expand Up @@ -179,3 +180,7 @@ func (aws *AWSCloud) List(filter string) ([]string, error) {
// TODO: Should really use tag query. No need to go regexp.
return aws.getInstancesByRegex(filter)
}

func (v *AWSCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}
4 changes: 4 additions & 0 deletions pkg/cloudprovider/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package cloudprovider

import (
"net"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)

// Interface is an abstract, pluggable interface for cloud providers.
Expand Down Expand Up @@ -49,6 +51,8 @@ type Instances interface {
IPAddress(name string) (net.IP, error)
// List lists instances that match 'filter' which is a regular expression which must match the entire instance name (fqdn)
List(filter string) ([]string, error)
// GetNodeResources gets the resources for a particular node
GetNodeResources(name string) (*api.NodeResources, error)
}

// Zone represents the location of a particular machine.
Expand Down
18 changes: 13 additions & 5 deletions pkg/cloudprovider/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ import (
"net"
"regexp"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)

// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer and Instances. It is useful for testing.
type FakeCloud struct {
Exists bool
Err error
Calls []string
IP net.IP
Machines []string
Exists bool
Err error
Calls []string
IP net.IP
Machines []string
NodeResources *api.NodeResources

cloudprovider.Zone
}

Expand Down Expand Up @@ -110,3 +113,8 @@ func (f *FakeCloud) GetZone() (cloudprovider.Zone, error) {
f.addCall("get-zone")
return f.Zone, f.Err
}

func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) {
f.addCall("get-node-resources")
return f.NodeResources, f.Err
}
45 changes: 45 additions & 0 deletions pkg/cloudprovider/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (

"code.google.com/p/goauth2/compute/serviceaccount"
compute "code.google.com/p/google-api-go-client/compute/v1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)

Expand Down Expand Up @@ -254,6 +257,48 @@ func (gce *GCECloud) List(filter string) ([]string, error) {
return instances, nil
}

func makeResources(cpu float32, memory float32) *api.NodeResources {
return &api.NodeResources{
Capacity: api.ResourceList{
resources.CPU: util.NewIntOrStringFromInt(int(cpu * 1000)),
resources.Memory: util.NewIntOrStringFromInt(int(memory * 1024 * 1024 * 1024)),
},
}
}

func canonicalizeMachineType(machineType string) string {
ix := strings.LastIndex(machineType, "/")
return machineType[ix+1:]
}

func (gce *GCECloud) GetNodeResources(name string) (*api.NodeResources, error) {
instance := canonicalizeInstanceName(name)
instanceCall := gce.service.Instances.Get(gce.projectID, gce.zone, instance)
res, err := instanceCall.Do()
if err != nil {
return nil, err
}
switch canonicalizeMachineType(res.MachineType) {
case "f1-micro":
return makeResources(1, 0.6), nil
case "g1-small":
return makeResources(1, 1.70), nil
case "n1-standard-1":
return makeResources(1, 3.75), nil
case "n1-standard-2":
return makeResources(2, 7.5), nil
case "n1-standard-4":
return makeResources(4, 15), nil
case "n1-standard-8":
return makeResources(8, 30), nil
case "n1-standard-16":
return makeResources(16, 30), nil
default:
glog.Errorf("unknown machine: %s", res.MachineType)
return nil, nil
}
}

func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
region, err := getGceRegion(gce.zone)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloudprovider/ovirt/ovirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"

"code.google.com/p/gcfg"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)

Expand Down Expand Up @@ -154,3 +155,7 @@ func (v *OVirtCloud) List(filter string) ([]string, error) {

return getInstancesFromXml(response.Body)
}

func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}
5 changes: 5 additions & 0 deletions pkg/cloudprovider/vagrant/vagrant.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
neturl "net/url"
"sort"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)

Expand Down Expand Up @@ -209,3 +210,7 @@ func (v *VagrantCloud) List(filter string) ([]string, error) {

return instances, nil
}

func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}
6 changes: 4 additions & 2 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
Expand Down Expand Up @@ -52,6 +53,7 @@ type Config struct {
MinionCacheTTL time.Duration
MinionRegexp string
PodInfoGetter client.PodInfoGetter
NodeResources api.NodeResources
}

// Master contains state for a Kubernetes cluster master/api server.
Expand Down Expand Up @@ -104,13 +106,13 @@ func makeMinionRegistry(c *Config) minion.Registry {
var minionRegistry minion.Registry
if c.Cloud != nil && len(c.MinionRegexp) > 0 {
var err error
minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp)
minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp, &c.NodeResources)
if err != nil {
glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err)
}
}
if minionRegistry == nil {
minionRegistry = minion.NewRegistry(c.Minions)
minionRegistry = minion.NewRegistry(c.Minions, c.NodeResources)
}
if c.HealthCheckMinions {
minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{})
Expand Down
20 changes: 11 additions & 9 deletions pkg/registry/minion/caching_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)

type Clock interface {
Expand All @@ -35,7 +37,7 @@ func (SystemClock) Now() time.Time {
type CachingRegistry struct {
delegate Registry
ttl time.Duration
minions []string
nodes *api.MinionList
lastUpdate int64
lock sync.RWMutex
clock Clock
Expand All @@ -49,13 +51,13 @@ func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error)
return &CachingRegistry{
delegate: delegate,
ttl: ttl,
minions: list,
nodes: list,
lastUpdate: time.Now().Unix(),
clock: SystemClock{},
}, nil
}

func (r *CachingRegistry) Contains(minion string) (bool, error) {
func (r *CachingRegistry) Contains(nodeID string) (bool, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
return false, err
Expand All @@ -64,8 +66,8 @@ func (r *CachingRegistry) Contains(minion string) (bool, error) {
// block updates in the middle of a contains.
r.lock.RLock()
defer r.lock.RUnlock()
for _, name := range r.minions {
if name == minion {
for _, node := range r.nodes.Items {
if node.ID == nodeID {
return true, nil
}
}
Expand All @@ -86,13 +88,13 @@ func (r *CachingRegistry) Insert(minion string) error {
return r.refresh(true)
}

func (r *CachingRegistry) List() ([]string, error) {
func (r *CachingRegistry) List() (*api.MinionList, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
return r.minions, err
return r.nodes, err
}
}
return r.minions, nil
return r.nodes, nil
}

func (r *CachingRegistry) expired() bool {
Expand All @@ -108,7 +110,7 @@ func (r *CachingRegistry) refresh(force bool) error {
defer r.lock.Unlock()
if force || r.expired() {
var err error
r.minions, err = r.delegate.List()
r.nodes, err = r.delegate.List()
time := r.clock.Now()
atomic.SwapInt64(&r.lastUpdate, time.Unix())
return err
Expand Down
22 changes: 11 additions & 11 deletions pkg/registry/minion/caching_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func TestCachingHit(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
list, err := cache.List()
if err != nil {
Expand All @@ -59,20 +59,20 @@ func TestCachingMiss(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
fakeClock.now = time.Unix(3, 0)
list, err := cache.List()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
if !reflect.DeepEqual(list, &fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}
Expand All @@ -82,13 +82,13 @@ func TestCachingInsert(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
err := cache.Insert("foo")
if err != nil {
Expand All @@ -98,7 +98,7 @@ func TestCachingInsert(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
if !reflect.DeepEqual(list, &fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}
Expand All @@ -108,13 +108,13 @@ func TestCachingDelete(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
err := cache.Delete("m2")
if err != nil {
Expand All @@ -124,7 +124,7 @@ func TestCachingDelete(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
if !reflect.DeepEqual(list, &fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}
Loading

0 comments on commit 1551b48

Please sign in to comment.