Skip to content

Commit

Permalink
Merge pull request kubernetes#21431 from freehan/sourcerange
Browse files Browse the repository at this point in the history
Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Feb 22, 2016
2 parents 1ce188e + 7ffb123 commit 2e3053a
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 24 deletions.
9 changes: 8 additions & 1 deletion pkg/cloudprovider/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type LoadBalancer interface {
// if so, what its status is.
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations ServiceAnnotation) (*api.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer.
UpdateLoadBalancer(name, region string, hosts []string) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
Expand Down Expand Up @@ -160,3 +160,10 @@ type Zones interface {
// GetZone returns the Zone containing the current failure zone and locality region that the program is running in
GetZone() (Zone, error)
}

type ServiceAnnotation map[string]string

func (s ServiceAnnotation) GetValue(key string) (string, bool) {
val, ok := s[key]
return val, ok
}
4 changes: 2 additions & 2 deletions pkg/cloudprovider/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,8 +1820,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) {

// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName)
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName, annotations)

if region != s.region {
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/providers/aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) {

serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"}

_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone)
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone, nil)
if err == nil || err.Error() != errorMessage {
t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/providers/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu

// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
// It adds an entry "create" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
f.addCall("create")
if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer)
Expand Down
67 changes: 51 additions & 16 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,30 @@ const (

operationPollInterval = 3 * time.Second
operationPollTimeoutDuration = 30 * time.Minute

defaultLBSourceRange = "0.0.0.0/0"

//Expected annotations for GCE
gceLBAllowSourceRange = "net.beta.kubernetes.io/gce-source-ranges"
)

//validateAllowSourceRange validates annotation of allow source ranges
func validateSourceRangeAnnotation(annotation cloudprovider.ServiceAnnotation) error {
val := annotation[gceLBAllowSourceRange]
errMsg := fmt.Errorf("Service annotation %s:%s is not valid. Expecting source IP ranges. Comma Seperated. For example, 0.0.0.0/0,192.168.2.0/24", gceLBAllowSourceRange, val)
ranges := strings.Split(val, ",")
if len(ranges) <= 0 {
return errMsg
}
for _, subnet := range ranges {
_, _, err := net.ParseCIDR(subnet)
if err != nil {
return errMsg
}
}
return nil
}

// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
service *compute.Service
Expand Down Expand Up @@ -432,12 +454,12 @@ func isHTTPErrorCode(err error, code int) bool {
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
portStr := []string{}
for _, p := range ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName)
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations)

if len(hostNames) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
Expand Down Expand Up @@ -555,7 +577,17 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// Deal with the firewall next. The reason we do this here rather than last
// is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getLoadBalancer checks for.
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports)
// Check if user specified the allow source range
sourceRanges := []string{defaultLBSourceRange}
val, ok := annotations.GetValue(gceLBAllowSourceRange)
if ok {
if err := validateSourceRangeAnnotation(annotations); err != nil {
return nil, err
}
sourceRanges = strings.Split(val, ",")
}

firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports, sourceRanges)
if err != nil {
return nil, err
}
Expand All @@ -565,12 +597,12 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
// Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated.
if firewallExists {
if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
if err := gce.updateFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName)
} else {
if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
if err := gce.createFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName)
Expand Down Expand Up @@ -713,7 +745,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string {
}
}

func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort) (exists bool, needsUpdate bool, err error) {
func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort, sourceRanges []string) (exists bool, needsUpdate bool, err error) {
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
Expand All @@ -737,6 +769,9 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [
}
// The service controller already verified that the protocol matches on all ports, no need to check.

if !slicesEqual(fw.SourceRanges, sourceRanges) {
return true, true, nil
}
return true, false, nil
}

Expand Down Expand Up @@ -810,8 +845,8 @@ func (gce *GCECloud) createTargetPool(name, region string, hosts []*gceInstance,
return nil
}

func (gce *GCECloud) createFirewall(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, srcRange, ports, hosts)
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}
Expand All @@ -828,8 +863,8 @@ func (gce *GCECloud) createFirewall(name, region, desc, srcRange string, ports [
return nil
}

func (gce *GCECloud) updateFirewall(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, srcRange, ports, hosts)
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}
Expand All @@ -846,7 +881,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc, srcRange string, ports [
return nil
}

func (gce *GCECloud) firewallObject(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
Expand All @@ -859,7 +894,7 @@ func (gce *GCECloud) firewallObject(name, region, desc, srcRange string, ports [
Name: makeFirewallName(name),
Description: desc,
Network: gce.networkURL,
SourceRanges: []string{srcRange},
SourceRanges: sourceRanges,
TargetTags: hostTags,
Allowed: []*compute.FirewallAllowed{
{
Expand Down Expand Up @@ -1138,7 +1173,7 @@ func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
}

// CreateFirewall creates the given firewall rule.
func (gce *GCECloud) CreateFirewall(name, desc, srcRange string, ports []int64, hostNames []string) error {
func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
return err
Expand All @@ -1153,7 +1188,7 @@ func (gce *GCECloud) CreateFirewall(name, desc, srcRange string, ports []int64,
if err != nil {
return err
}
return gce.createFirewall(name, region, desc, srcRange, svcPorts, hosts)
return gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
}

// DeleteFirewall deletes the given firewall rule.
Expand All @@ -1167,7 +1202,7 @@ func (gce *GCECloud) DeleteFirewall(name string) error {

// UpdateFirewall applies the given firewall rule as an update to an existing
// firewall rule with the same name.
func (gce *GCECloud) UpdateFirewall(name, desc, srcRange string, ports []int64, hostNames []string) error {
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
return err
Expand All @@ -1182,7 +1217,7 @@ func (gce *GCECloud) UpdateFirewall(name, desc, srcRange string, ports []int64,
if err != nil {
return err
}
return gce.updateFirewall(name, region, desc, srcRange, svcPorts, hosts)
return gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
}

// Global static IP management
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/providers/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,8 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS
// a list of regions (from config) and query/create loadbalancers in
// each region.

func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName)
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName, annotations)

if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/service/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime"
"reflect"
)

const (
Expand Down Expand Up @@ -247,6 +248,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
}
message += err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)

return err, retry
}
// Always update the cache upon success.
Expand Down Expand Up @@ -321,6 +323,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name

// The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")

err := s.createLoadBalancer(service, namespacedName)
if err != nil {
return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable
Expand Down Expand Up @@ -385,7 +388,7 @@ func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName
// - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity)
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity, service.ObjectMeta.Annotations)
if err != nil {
return err
} else {
Expand Down Expand Up @@ -488,6 +491,10 @@ func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api
return true
}
}
if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
return true
}

return false
}

Expand Down

0 comments on commit 2e3053a

Please sign in to comment.