Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Back port - Openstack provider allowing more than one service port for lbaas v2 #32001

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 159 additions & 106 deletions pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type LbaasV2 struct {
LoadBalancer
}

type empty struct{}

func getPortIDByIP(client *gophercloud.ServiceClient, ipAddress string) (string, error) {
var portID string

Expand Down Expand Up @@ -219,22 +221,23 @@ func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loa
return &loadbalancerList[0], nil
}

func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) error {
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
start := time.Now().Second()
for {
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
return err
return "", err
}
if loadbalancer.ProvisioningStatus == "ACTIVE" {
return nil
return "ACTIVE", nil
} else if loadbalancer.ProvisioningStatus == "ERROR" {
return "ERROR", fmt.Errorf("Loadbalancer has gone into ERROR state")
}

time.Sleep(1 * time.Second)

if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds {
return fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")

return loadbalancer.ProvisioningStatus, fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
}
}
}
Expand Down Expand Up @@ -285,16 +288,16 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(apiService *api.Service, hosts []string
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)

ports := apiService.Spec.Ports
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 {
if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}

// The service controller verified all the protocols match on the ports, just check and use the first one
// Check for TCP protocol on each port
// TODO: Convert all error messages to use an event recorder
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
for _, port := range ports {
if port.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
}
}

affinity := api.ServiceAffinityNone //apiService.Spec.SessionAffinity
Expand Down Expand Up @@ -358,67 +361,72 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(apiService *api.Service, hosts []string

waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)

listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
Name: name,
Protocol: listeners.ProtocolTCP,
ProtocolPort: (int)(ports[0].Port), //TODO: need to handle multi-port
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
}

waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)

pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
Name: name,
Protocol: v2_pools.ProtocolTCP,
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
}

for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
for portIndex, port := range ports {
listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
Protocol: listeners.Protocol(port.Protocol),
ProtocolPort: int(port.Port),
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
}

waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)

_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
Address: addr,
SubnetID: lbaas.opts.SubnetId,
pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
Protocol: v2_pools.Protocol(port.Protocol),
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
}
}

if lbaas.opts.CreateMonitor {
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)

_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
PoolID: pool.ID,
Type: monitors.TypeTCP,
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
}

_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
ProtocolPort: int(port.NodePort),
Address: addr,
SubnetID: lbaas.opts.SubnetId,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
}

waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}

if lbaas.opts.CreateMonitor {
_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
PoolID: pool.ID,
Type: string(port.Protocol),
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}

Expand Down Expand Up @@ -456,9 +464,7 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) e
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)

ports := service.Spec.Ports
if len(ports) > 1 {
return fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
} else if len(ports) == 0 {
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}

Expand All @@ -470,50 +476,37 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) e
return fmt.Errorf("Loadbalancer %s does not exist", loadBalancerName)
}

// Set of member (addresses) that _should_ exist
addrs := map[string]bool{}
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
// Get all listeners for this loadbalancer, by "port key".
type portKey struct {
Protocol string
Port int
}
lbListeners := make(map[portKey]listeners.Listener)
err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
listenersList, err := listeners.ExtractListeners(page)
if err != nil {
return err
return false, err
}
addrs[addr] = true
for _, l := range listenersList {
key := portKey{Protocol: l.Protocol, Port: l.ProtocolPort}
lbListeners[key] = l
}
return true, nil
})
if err != nil {
return err
}

// Iterate over members in each pool that _do_ exist
var poolID string
// Get all pools for this loadbalancer, by listener ID.
lbPools := make(map[string]v2_pools.Pool)
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
poolsList, err := v2_pools.ExtractPools(page)
if err != nil {
return false, err
}

for _, pool := range poolsList {
poolID = pool.ID
err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2_pools.ExtractMembers(page)
if err != nil {
return false, err
}

for _, member := range membersList {
if _, found := addrs[member.Address]; found {
// Member already exists, remove from update list
delete(addrs, member.Address)
} else {
// Member needs to be deleted
err = v2_pools.DeleteMember(lbaas.network, poolID, member.ID).ExtractErr()
if err != nil {
return false, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}

return true, nil
})
if err != nil {
return false, err
for _, p := range poolsList {
for _, l := range p.Listeners {
lbPools[l.ID] = p
}
}
return true, nil
Expand All @@ -522,19 +515,79 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) e
return err
}

// Anything left in addrs is a new member that needs to be added to a pool
for addr := range addrs {
_, err := v2_pools.CreateAssociateMember(lbaas.network, poolID, v2_pools.MemberCreateOpts{
Address: addr,
ProtocolPort: int(ports[0].NodePort),
SubnetID: lbaas.opts.SubnetId,
}).Extract()
// Compose Set of member (addresses) that _should_ exist
addrs := map[string]empty{}
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
if err != nil {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
addrs[addr] = empty{}
}

// Check for adding/removing members associated with each port
for _, port := range ports {
// Get listener associated with this port
listener, ok := lbListeners[portKey{
Protocol: string(port.Protocol),
Port: int(port.Port),
}]
if !ok {
return fmt.Errorf("Loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
}

// Get pool associated with this listener
pool, ok := lbPools[listener.ID]
if !ok {
return fmt.Errorf("Loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
}

// Find existing pool members (by address) for this port
members := make(map[string]v2_pools.Member)
err := v2_pools.ListAssociateMembers(lbaas.network, pool.ID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2_pools.ExtractMembers(page)
if err != nil {
return false, err
}
for _, member := range membersList {
members[member.Address] = member
}
return true, nil
})
if err != nil {
return err
}

// Add any new members for this port
for addr := range addrs {
if _, ok := members[addr]; ok {
// Already exists, do not create member
continue
}
_, err := v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
Address: addr,
ProtocolPort: int(port.NodePort),
SubnetID: lbaas.opts.SubnetId,
}).Extract()
if err != nil {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}

// Remove any old members for this port
for _, member := range members {
if _, ok := addrs[member.Address]; ok {
// Still present, do not delete member
continue
}
err = v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}
return nil
}

Expand Down Expand Up @@ -716,7 +769,7 @@ func (lb *LbaasV1) EnsureLoadBalancer(apiService *api.Service, hosts []string) (

ports := apiService.Spec.Ports
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
return nil, fmt.Errorf("multiple ports are not supported in openstack v1 load balancers")
} else if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
Expand Down Expand Up @@ -785,7 +838,7 @@ func (lb *LbaasV1) EnsureLoadBalancer(apiService *api.Service, hosts []string) (

_, err = members.Create(lb.network, members.CreateOpts{
PoolID: pool.ID,
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
ProtocolPort: int(ports[0].NodePort), //Note: only handles single port
Address: addr,
}).Extract()
if err != nil {
Expand Down