From 1ed15800dc3bb68c2b9dfcea262b19bb7f473f2a Mon Sep 17 00:00:00 2001 From: Davide Agnello Date: Thu, 11 Aug 2016 16:51:13 -0700 Subject: [PATCH] Openstack provider allowing more than one service port for lbaas v2 --- .../openstack/openstack_loadbalancer.go | 265 +++++++++++------- 1 file changed, 159 insertions(+), 106 deletions(-) diff --git a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go index 90f345a8aecbb..1dc5d9d2104a4 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go +++ b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go @@ -55,6 +55,8 @@ type LbaasV2 struct { LoadBalancer } +type empty struct{} + func getPortIDByIP(client *gophercloud.ServiceClient, ipAddress string) (string, error) { var portID string @@ -219,22 +221,23 @@ func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loa return &loadbalancerList[0], nil } -func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) error { +func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) { start := time.Now().Second() for { loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract() if err != nil { - return err + return "", err } if loadbalancer.ProvisioningStatus == "ACTIVE" { - return nil + return "ACTIVE", nil + } else if loadbalancer.ProvisioningStatus == "ERROR" { + return "ERROR", fmt.Errorf("Loadbalancer has gone into ERROR state") } time.Sleep(1 * time.Second) if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds { - return fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time") - + return loadbalancer.ProvisioningStatus, fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time") } } } @@ -285,16 +288,16 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(apiService *api.Service, hosts []string glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations) ports := apiService.Spec.Ports - if len(ports) > 1 { - return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") - } else if len(ports) == 0 { + if len(ports) == 0 { return nil, fmt.Errorf("no ports provided to openstack load balancer") } - // The service controller verified all the protocols match on the ports, just check and use the first one + // Check for TCP protocol on each port // TODO: Convert all error messages to use an event recorder - if ports[0].Protocol != api.ProtocolTCP { - return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") + for _, port := range ports { + if port.Protocol != api.ProtocolTCP { + return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers") + } } affinity := api.ServiceAffinityNone //apiService.Spec.SessionAffinity @@ -358,67 +361,72 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(apiService *api.Service, hosts []string waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{ - Name: name, - Protocol: listeners.ProtocolTCP, - ProtocolPort: (int)(ports[0].Port), //TODO: need to handle multi-port - LoadbalancerID: loadbalancer.ID, - }).Extract() - if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(apiService) - return nil, err - } - - waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - - pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{ - Name: name, - Protocol: v2_pools.ProtocolTCP, - LBMethod: lbmethod, - ListenerID: listener.ID, - Persistence: persistence, - }).Extract() - if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(apiService) - return nil, err - } - - for _, host := range hosts { - addr, err := getAddressByName(lbaas.compute, host) + for portIndex, port := range ports { + listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{ + Name: fmt.Sprintf("listener_%s_%d", name, portIndex), + Protocol: listeners.Protocol(port.Protocol), + ProtocolPort: int(port.Port), + LoadbalancerID: loadbalancer.ID, + }).Extract() if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) return nil, err } waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - _, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{ - ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port - Address: addr, - SubnetID: lbaas.opts.SubnetId, + pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{ + Name: fmt.Sprintf("pool_%s_%d", name, portIndex), + Protocol: v2_pools.Protocol(port.Protocol), + LBMethod: lbmethod, + ListenerID: listener.ID, + Persistence: persistence, }).Extract() if err != nil { // cleanup what was created so far _ = lbaas.EnsureLoadBalancerDeleted(apiService) return nil, err } - } - if lbaas.opts.CreateMonitor { waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - _, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{ - PoolID: pool.ID, - Type: monitors.TypeTCP, - Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()), - Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()), - MaxRetries: int(lbaas.opts.MonitorMaxRetries), - }).Extract() - if err != nil { - // cleanup what was created so far - _ = lbaas.EnsureLoadBalancerDeleted(apiService) - return nil, err + for _, host := range hosts { + addr, err := getAddressByName(lbaas.compute, host) + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + + _, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{ + ProtocolPort: int(port.NodePort), + Address: addr, + SubnetID: lbaas.opts.SubnetId, + }).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + if lbaas.opts.CreateMonitor { + _, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{ + PoolID: pool.ID, + Type: string(port.Protocol), + Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()), + Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()), + MaxRetries: int(lbaas.opts.MonitorMaxRetries), + }).Extract() + if err != nil { + // cleanup what was created so far + _ = lbaas.EnsureLoadBalancerDeleted(apiService) + return nil, err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) } } @@ -456,9 +464,7 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) e glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts) ports := service.Spec.Ports - if len(ports) > 1 { - return fmt.Errorf("multiple ports are not yet supported in openstack load balancers") - } else if len(ports) == 0 { + if len(ports) == 0 { return fmt.Errorf("no ports provided to openstack load balancer") } @@ -470,50 +476,37 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) e return fmt.Errorf("Loadbalancer %s does not exist", loadBalancerName) } - // Set of member (addresses) that _should_ exist - addrs := map[string]bool{} - for _, host := range hosts { - addr, err := getAddressByName(lbaas.compute, host) + // Get all listeners for this loadbalancer, by "port key". + type portKey struct { + Protocol string + Port int + } + lbListeners := make(map[portKey]listeners.Listener) + err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) { + listenersList, err := listeners.ExtractListeners(page) if err != nil { - return err + return false, err } - addrs[addr] = true + for _, l := range listenersList { + key := portKey{Protocol: l.Protocol, Port: l.ProtocolPort} + lbListeners[key] = l + } + return true, nil + }) + if err != nil { + return err } - // Iterate over members in each pool that _do_ exist - var poolID string + // Get all pools for this loadbalancer, by listener ID. + lbPools := make(map[string]v2_pools.Pool) err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) { poolsList, err := v2_pools.ExtractPools(page) if err != nil { return false, err } - - for _, pool := range poolsList { - poolID = pool.ID - err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) { - membersList, err := v2_pools.ExtractMembers(page) - if err != nil { - return false, err - } - - for _, member := range membersList { - if _, found := addrs[member.Address]; found { - // Member already exists, remove from update list - delete(addrs, member.Address) - } else { - // Member needs to be deleted - err = v2_pools.DeleteMember(lbaas.network, poolID, member.ID).ExtractErr() - if err != nil { - return false, err - } - waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - } - } - - return true, nil - }) - if err != nil { - return false, err + for _, p := range poolsList { + for _, l := range p.Listeners { + lbPools[l.ID] = p } } return true, nil @@ -522,19 +515,79 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) e return err } - // Anything left in addrs is a new member that needs to be added to a pool - for addr := range addrs { - _, err := v2_pools.CreateAssociateMember(lbaas.network, poolID, v2_pools.MemberCreateOpts{ - Address: addr, - ProtocolPort: int(ports[0].NodePort), - SubnetID: lbaas.opts.SubnetId, - }).Extract() + // Compose Set of member (addresses) that _should_ exist + addrs := map[string]empty{} + for _, host := range hosts { + addr, err := getAddressByName(lbaas.compute, host) if err != nil { return err } - waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) - } + addrs[addr] = empty{} + } + + // Check for adding/removing members associated with each port + for _, port := range ports { + // Get listener associated with this port + listener, ok := lbListeners[portKey{ + Protocol: string(port.Protocol), + Port: int(port.Port), + }] + if !ok { + return fmt.Errorf("Loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol) + } + // Get pool associated with this listener + pool, ok := lbPools[listener.ID] + if !ok { + return fmt.Errorf("Loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID) + } + + // Find existing pool members (by address) for this port + members := make(map[string]v2_pools.Member) + err := v2_pools.ListAssociateMembers(lbaas.network, pool.ID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) { + membersList, err := v2_pools.ExtractMembers(page) + if err != nil { + return false, err + } + for _, member := range membersList { + members[member.Address] = member + } + return true, nil + }) + if err != nil { + return err + } + + // Add any new members for this port + for addr := range addrs { + if _, ok := members[addr]; ok { + // Already exists, do not create member + continue + } + _, err := v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{ + Address: addr, + ProtocolPort: int(port.NodePort), + SubnetID: lbaas.opts.SubnetId, + }).Extract() + if err != nil { + return err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + + // Remove any old members for this port + for _, member := range members { + if _, ok := addrs[member.Address]; ok { + // Still present, do not delete member + continue + } + err = v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr() + if err != nil && !isNotFound(err) { + return err + } + waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID) + } + } return nil } @@ -716,7 +769,7 @@ func (lb *LbaasV1) EnsureLoadBalancer(apiService *api.Service, hosts []string) ( ports := apiService.Spec.Ports if len(ports) > 1 { - return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") + return nil, fmt.Errorf("multiple ports are not supported in openstack v1 load balancers") } else if len(ports) == 0 { return nil, fmt.Errorf("no ports provided to openstack load balancer") } @@ -785,7 +838,7 @@ func (lb *LbaasV1) EnsureLoadBalancer(apiService *api.Service, hosts []string) ( _, err = members.Create(lb.network, members.CreateOpts{ PoolID: pool.ID, - ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port + ProtocolPort: int(ports[0].NodePort), //Note: only handles single port Address: addr, }).Extract() if err != nil {