Skip to content

Commit

Permalink
echo: add UDP support (istio#41387)
Browse files Browse the repository at this point in the history
* echo: add UDP support

While we don't support UDP today, it is still useful to have in our
standard client/server to test various UDP things (in the future, Istio
sidecars proxying the UDP, of course).

* lint
  • Loading branch information
howardjohn authored Oct 13, 2022
1 parent 82cf692 commit bbd1dcf
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pkg/test/echo/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
httpPorts []int
grpcPorts []int
tcpPorts []int
udpPorts []int
tlsPorts []int
hbonePorts []int
instanceIPPorts []int
Expand All @@ -59,7 +60,7 @@ var (
Long: `Echo application for testing Istio E2E`,
PersistentPreRunE: configureLogging,
Run: func(cmd *cobra.Command, args []string) {
ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(hbonePorts))
ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(udpPorts)+len(hbonePorts))
tlsByPort := map[int]bool{}
for _, p := range tlsPorts {
tlsByPort[p] = true
Expand Down Expand Up @@ -104,6 +105,14 @@ var (
}
portIndex++
}
for i, p := range udpPorts {
ports[portIndex] = &common.Port{
Name: "udp-" + strconv.Itoa(i),
Protocol: protocol.UDP,
Port: p,
}
portIndex++
}
for i, p := range hbonePorts {
ports[portIndex] = &common.Port{
Name: "hbone-" + strconv.Itoa(i),
Expand Down Expand Up @@ -163,6 +172,7 @@ func init() {
rootCmd.PersistentFlags().IntSliceVar(&httpPorts, "port", []int{8080}, "HTTP/1.1 ports")
rootCmd.PersistentFlags().IntSliceVar(&grpcPorts, "grpc", []int{7070}, "GRPC ports")
rootCmd.PersistentFlags().IntSliceVar(&tcpPorts, "tcp", []int{9090}, "TCP ports")
rootCmd.PersistentFlags().IntSliceVar(&udpPorts, "udp", []int{}, "UDP ports")
rootCmd.PersistentFlags().IntSliceVar(&hbonePorts, "hbone", []int{}, "HBONE ports")
rootCmd.PersistentFlags().IntSliceVar(&tlsPorts, "tls", []int{}, "Ports that are using TLS. These must be defined as http/grpc/tcp.")
rootCmd.PersistentFlags().IntSliceVar(&instanceIPPorts, "bind-ip", []int{}, "Ports that are bound to INSTANCE_IP rather than wildcard IP.")
Expand Down
1 change: 1 addition & 0 deletions pkg/test/echo/common/scheme/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
XDS Instance = "xds"
WebSocket Instance = "ws"
TCP Instance = "tcp"
UDP Instance = "udp"
// TLS sends a TLS connection and reports back the properties of the TLS connection
// This is similar to `openssl s_client`
// Response data is not returned; only information about the TLS handshake.
Expand Down
2 changes: 2 additions & 0 deletions pkg/test/echo/server/endpoint/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func New(cfg Config) (Instance, error) {
return newGRPC(cfg), nil
case protocol.TCP:
return newTCP(cfg), nil
case protocol.UDP:
return newUDP(cfg), nil
default:
return nil, fmt.Errorf("unsupported protocol: %s", cfg.Port.Protocol)
}
Expand Down
142 changes: 142 additions & 0 deletions pkg/test/echo/server/endpoint/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"fmt"
"net"
"net/http"
"os"
"strconv"
"strings"

"github.com/google/uuid"

"istio.io/istio/pkg/test/echo"
"istio.io/istio/pkg/test/util/retry"
)

var _ Instance = &udpInstance{}

type udpInstance struct {
Config
l net.PacketConn
}

func newUDP(config Config) Instance {
return &udpInstance{
Config: config,
}
}

func (s *udpInstance) GetConfig() Config {
return s.Config
}

func (s *udpInstance) Start(onReady OnReadyFunc) error {
var listener net.PacketConn
var port int
var err error
if s.Port.TLS {
return fmt.Errorf("TLS not supported for UDP")
}
// Listen on the given port and update the port if it changed from what was passed in.
listener, port, err = listenUDPAddress(s.ListenerIP, s.Port.Port)
// Store the actual listening port back to the argument.
s.Port.Port = port
if err != nil {
return err
}

s.l = listener
epLog.Infof("Listening UDP on %v\n", port)

// Start serving UDP traffic.
go func() {
buf := make([]byte, 2048)
for {
_, remote, err := listener.ReadFrom(buf)
if err != nil {
epLog.Warn("UDP read failed: " + err.Error())
return
}

id := uuid.New()
epLog.WithLabels("remote", remote, "id", id).Infof("UDP Request")

responseFields := s.getResponseFields(remote)
if _, err := listener.WriteTo([]byte(responseFields), remote); err != nil {
epLog.WithLabels("id", id).Warnf("UDP failed writing echo response: %v", err)
}
}
}()

// Notify the WaitGroup once the port has transitioned to ready.
go s.awaitReady(onReady, listener.LocalAddr().String())
return nil
}

func (s *udpInstance) getResponseFields(conn net.Addr) string {
ip, _, _ := net.SplitHostPort(conn.String())
// Write non-request fields specific to the instance
respFields := map[echo.Field]string{
echo.StatusCodeField: strconv.Itoa(http.StatusOK),
echo.ClusterField: s.Cluster,
echo.IstioVersionField: s.IstioVersion,
echo.ServiceVersionField: s.Version,
echo.ServicePortField: strconv.Itoa(s.Port.Port),
echo.IPField: ip,
echo.ProtocolField: "UDP",
}

if hostname, err := os.Hostname(); err == nil {
respFields[echo.HostnameField] = hostname
}

var out strings.Builder
for field, val := range respFields {
val := fmt.Sprintf("%s=%s\n", string(field), val)
_, _ = out.WriteString(val)
}
return out.String()
}

func (s *udpInstance) Close() error {
if s.l != nil {
_ = s.l.Close()
}
return nil
}

func (s *udpInstance) awaitReady(onReady OnReadyFunc, address string) {
defer onReady()

err := retry.UntilSuccess(func() error {
conn, err := net.Dial("udp", address)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()

// Server is up now, we're ready.
return nil
}, retry.Timeout(readyTimeout), retry.Delay(readyInterval))

if err != nil {
epLog.Errorf("readiness failed for endpoint %s: %v", address, err)
} else {
epLog.Infof("ready for UDP endpoint %s", address)
}
}
19 changes: 19 additions & 0 deletions pkg/test/echo/server/endpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,25 @@ func listenOnUDS(uds string) (net.Listener, error) {
return ln, nil
}

func listenUDPAddress(ip string, port int) (net.PacketConn, int, error) {
parsedIP := net.ParseIP(ip)
ipBind := "udp"
if parsedIP != nil {
if parsedIP.To4() == nil && parsedIP.To16() != nil {
ipBind = "udp6"
} else if parsedIP.To4() != nil {
ipBind = "udp4"
}
}
ln, err := net.ListenPacket(ipBind, net.JoinHostPort(ip, strconv.Itoa(port)))
if err != nil {
return nil, 0, err
}

port = ln.LocalAddr().(*net.UDPAddr).Port
return ln, port, nil
}

// forceClose the given socket.
func forceClose(conn net.Conn) error {
// Close may be called more than once.
Expand Down
1 change: 1 addition & 0 deletions pkg/test/echo/server/forwarder/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func New() *Instance {
protocolMap[scheme.TLS] = add(newTLSProtocol(e))
protocolMap[scheme.XDS] = add(newXDSProtocol(e))
protocolMap[scheme.TCP] = add(newTCPProtocol(e))
protocolMap[scheme.UDP] = add(newUDPProtocol(e))

return &Instance{
e: e,
Expand Down
123 changes: 123 additions & 0 deletions pkg/test/echo/server/forwarder/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package forwarder

import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"strings"

"istio.io/istio/pkg/test/echo"
"istio.io/istio/pkg/test/echo/common"
"istio.io/istio/pkg/test/echo/proto"
)

var _ protocol = &udpProtocol{}

type udpProtocol struct {
e *executor
}

func newUDPProtocol(e *executor) protocol {
return &udpProtocol{e: e}
}

func (c *udpProtocol) ForwardEcho(ctx context.Context, cfg *Config) (*proto.ForwardEchoResponse, error) {
return doForward(ctx, cfg, c.e, c.makeRequest)
}

func (c *udpProtocol) makeRequest(ctx context.Context, cfg *Config, requestID int) (string, error) {
conn, err := newUDPConnection(cfg)
if err != nil {
return "", err
}
defer func() { _ = conn.Close() }()

msgBuilder := strings.Builder{}
echo.ForwarderURLField.WriteForRequest(&msgBuilder, requestID, cfg.Request.Url)

if cfg.Request.Message != "" {
echo.ForwarderMessageField.WriteForRequest(&msgBuilder, requestID, cfg.Request.Message)
}

// Apply per-request timeout to calculate deadline for reads/writes.
ctx, cancel := context.WithTimeout(ctx, cfg.timeout)
defer cancel()

// Apply the deadline to the connection.
deadline, _ := ctx.Deadline()
if err := conn.SetWriteDeadline(deadline); err != nil {
return msgBuilder.String(), err
}
if err := conn.SetReadDeadline(deadline); err != nil {
return msgBuilder.String(), err
}

// Make sure the client writes something to the buffer
message := "HelloWorld"
if cfg.Request.Message != "" {
message = cfg.Request.Message
}

if _, err := conn.Write([]byte(message + "\n")); err != nil {
fwLog.Warnf("UDP write failed: %v", err)
return msgBuilder.String(), err
}
var resBuffer bytes.Buffer
buf := make([]byte, 1024+len(message))
n, err := conn.Read(buf)
if err != nil && err != io.EOF {
fwLog.Warnf("UDP read failed (already read %d bytes): %v", len(resBuffer.String()), err)
return msgBuilder.String(), err
}
resBuffer.Write(buf[:n])

// format the output for forwarder response
for _, line := range strings.Split(string(buf[:n]), "\n") {
if line != "" {
echo.WriteBodyLine(&msgBuilder, requestID, line)
}
}

msg := msgBuilder.String()
expected := fmt.Sprintf("%s=%d", string(echo.StatusCodeField), http.StatusOK)
if cfg.Request.ExpectedResponse != nil {
expected = cfg.Request.ExpectedResponse.GetValue()
}
if !strings.Contains(msg, expected) {
return msg, fmt.Errorf("expect to recv message with %s, got %s. Return EOF", expected, msg)
}
return msg, nil
}

func (c *udpProtocol) Close() error {
return nil
}

func newUDPConnection(cfg *Config) (net.Conn, error) {
address := cfg.Request.Url[len(cfg.scheme+"://"):]

if cfg.secure {
return nil, fmt.Errorf("TLS not available")
}

ctx, cancel := context.WithTimeout(context.Background(), common.ConnectionTimeout)
defer cancel()
return newDialer(cfg).DialContext(ctx, "udp", address)
}
1 change: 1 addition & 0 deletions pkg/test/echo/server/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (s *Instance) validate() error {
for _, port := range s.Ports {
switch port.Protocol {
case protocol.TCP:
case protocol.UDP:
case protocol.HTTP:
case protocol.HTTPS:
case protocol.HTTP2:
Expand Down

0 comments on commit bbd1dcf

Please sign in to comment.