From bbd1dcfe290540cb3a46c730967870c4e6e087c1 Mon Sep 17 00:00:00 2001 From: John Howard Date: Thu, 13 Oct 2022 10:19:22 -0700 Subject: [PATCH] echo: add UDP support (#41387) * echo: add UDP support While we don't support UDP today, it is still useful to have in our standard client/server to test various UDP things (in the future, Istio sidecars proxying the UDP, of course). * lint --- pkg/test/echo/cmd/server/main.go | 12 +- pkg/test/echo/common/scheme/scheme.go | 1 + pkg/test/echo/server/endpoint/instance.go | 2 + pkg/test/echo/server/endpoint/udp.go | 142 +++++++++++++++++++++ pkg/test/echo/server/endpoint/util.go | 19 +++ pkg/test/echo/server/forwarder/instance.go | 1 + pkg/test/echo/server/forwarder/udp.go | 123 ++++++++++++++++++ pkg/test/echo/server/instance.go | 1 + 8 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 pkg/test/echo/server/endpoint/udp.go create mode 100644 pkg/test/echo/server/forwarder/udp.go diff --git a/pkg/test/echo/cmd/server/main.go b/pkg/test/echo/cmd/server/main.go index 1c28e0d028b3..3d541672baae 100644 --- a/pkg/test/echo/cmd/server/main.go +++ b/pkg/test/echo/cmd/server/main.go @@ -35,6 +35,7 @@ var ( httpPorts []int grpcPorts []int tcpPorts []int + udpPorts []int tlsPorts []int hbonePorts []int instanceIPPorts []int @@ -59,7 +60,7 @@ var ( Long: `Echo application for testing Istio E2E`, PersistentPreRunE: configureLogging, Run: func(cmd *cobra.Command, args []string) { - ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(hbonePorts)) + ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(udpPorts)+len(hbonePorts)) tlsByPort := map[int]bool{} for _, p := range tlsPorts { tlsByPort[p] = true @@ -104,6 +105,14 @@ var ( } portIndex++ } + for i, p := range udpPorts { + ports[portIndex] = &common.Port{ + Name: "udp-" + strconv.Itoa(i), + Protocol: protocol.UDP, + Port: p, + } + portIndex++ + } for i, p := range hbonePorts { ports[portIndex] = &common.Port{ Name: "hbone-" + strconv.Itoa(i), @@ -163,6 +172,7 @@ func init() { rootCmd.PersistentFlags().IntSliceVar(&httpPorts, "port", []int{8080}, "HTTP/1.1 ports") rootCmd.PersistentFlags().IntSliceVar(&grpcPorts, "grpc", []int{7070}, "GRPC ports") rootCmd.PersistentFlags().IntSliceVar(&tcpPorts, "tcp", []int{9090}, "TCP ports") + rootCmd.PersistentFlags().IntSliceVar(&udpPorts, "udp", []int{}, "UDP ports") rootCmd.PersistentFlags().IntSliceVar(&hbonePorts, "hbone", []int{}, "HBONE ports") rootCmd.PersistentFlags().IntSliceVar(&tlsPorts, "tls", []int{}, "Ports that are using TLS. These must be defined as http/grpc/tcp.") rootCmd.PersistentFlags().IntSliceVar(&instanceIPPorts, "bind-ip", []int{}, "Ports that are bound to INSTANCE_IP rather than wildcard IP.") diff --git a/pkg/test/echo/common/scheme/scheme.go b/pkg/test/echo/common/scheme/scheme.go index 1b9baa2dbd4f..ea1d3b372945 100644 --- a/pkg/test/echo/common/scheme/scheme.go +++ b/pkg/test/echo/common/scheme/scheme.go @@ -24,6 +24,7 @@ const ( XDS Instance = "xds" WebSocket Instance = "ws" TCP Instance = "tcp" + UDP Instance = "udp" // TLS sends a TLS connection and reports back the properties of the TLS connection // This is similar to `openssl s_client` // Response data is not returned; only information about the TLS handshake. diff --git a/pkg/test/echo/server/endpoint/instance.go b/pkg/test/echo/server/endpoint/instance.go index 80876e3d9ae2..c653e7460cce 100644 --- a/pkg/test/echo/server/endpoint/instance.go +++ b/pkg/test/echo/server/endpoint/instance.go @@ -62,6 +62,8 @@ func New(cfg Config) (Instance, error) { return newGRPC(cfg), nil case protocol.TCP: return newTCP(cfg), nil + case protocol.UDP: + return newUDP(cfg), nil default: return nil, fmt.Errorf("unsupported protocol: %s", cfg.Port.Protocol) } diff --git a/pkg/test/echo/server/endpoint/udp.go b/pkg/test/echo/server/endpoint/udp.go new file mode 100644 index 000000000000..2325f9f4adff --- /dev/null +++ b/pkg/test/echo/server/endpoint/udp.go @@ -0,0 +1,142 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "fmt" + "net" + "net/http" + "os" + "strconv" + "strings" + + "github.com/google/uuid" + + "istio.io/istio/pkg/test/echo" + "istio.io/istio/pkg/test/util/retry" +) + +var _ Instance = &udpInstance{} + +type udpInstance struct { + Config + l net.PacketConn +} + +func newUDP(config Config) Instance { + return &udpInstance{ + Config: config, + } +} + +func (s *udpInstance) GetConfig() Config { + return s.Config +} + +func (s *udpInstance) Start(onReady OnReadyFunc) error { + var listener net.PacketConn + var port int + var err error + if s.Port.TLS { + return fmt.Errorf("TLS not supported for UDP") + } + // Listen on the given port and update the port if it changed from what was passed in. + listener, port, err = listenUDPAddress(s.ListenerIP, s.Port.Port) + // Store the actual listening port back to the argument. + s.Port.Port = port + if err != nil { + return err + } + + s.l = listener + epLog.Infof("Listening UDP on %v\n", port) + + // Start serving UDP traffic. + go func() { + buf := make([]byte, 2048) + for { + _, remote, err := listener.ReadFrom(buf) + if err != nil { + epLog.Warn("UDP read failed: " + err.Error()) + return + } + + id := uuid.New() + epLog.WithLabels("remote", remote, "id", id).Infof("UDP Request") + + responseFields := s.getResponseFields(remote) + if _, err := listener.WriteTo([]byte(responseFields), remote); err != nil { + epLog.WithLabels("id", id).Warnf("UDP failed writing echo response: %v", err) + } + } + }() + + // Notify the WaitGroup once the port has transitioned to ready. + go s.awaitReady(onReady, listener.LocalAddr().String()) + return nil +} + +func (s *udpInstance) getResponseFields(conn net.Addr) string { + ip, _, _ := net.SplitHostPort(conn.String()) + // Write non-request fields specific to the instance + respFields := map[echo.Field]string{ + echo.StatusCodeField: strconv.Itoa(http.StatusOK), + echo.ClusterField: s.Cluster, + echo.IstioVersionField: s.IstioVersion, + echo.ServiceVersionField: s.Version, + echo.ServicePortField: strconv.Itoa(s.Port.Port), + echo.IPField: ip, + echo.ProtocolField: "UDP", + } + + if hostname, err := os.Hostname(); err == nil { + respFields[echo.HostnameField] = hostname + } + + var out strings.Builder + for field, val := range respFields { + val := fmt.Sprintf("%s=%s\n", string(field), val) + _, _ = out.WriteString(val) + } + return out.String() +} + +func (s *udpInstance) Close() error { + if s.l != nil { + _ = s.l.Close() + } + return nil +} + +func (s *udpInstance) awaitReady(onReady OnReadyFunc, address string) { + defer onReady() + + err := retry.UntilSuccess(func() error { + conn, err := net.Dial("udp", address) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + + // Server is up now, we're ready. + return nil + }, retry.Timeout(readyTimeout), retry.Delay(readyInterval)) + + if err != nil { + epLog.Errorf("readiness failed for endpoint %s: %v", address, err) + } else { + epLog.Infof("ready for UDP endpoint %s", address) + } +} diff --git a/pkg/test/echo/server/endpoint/util.go b/pkg/test/echo/server/endpoint/util.go index 01baac66066e..7e57d826548e 100644 --- a/pkg/test/echo/server/endpoint/util.go +++ b/pkg/test/echo/server/endpoint/util.go @@ -78,6 +78,25 @@ func listenOnUDS(uds string) (net.Listener, error) { return ln, nil } +func listenUDPAddress(ip string, port int) (net.PacketConn, int, error) { + parsedIP := net.ParseIP(ip) + ipBind := "udp" + if parsedIP != nil { + if parsedIP.To4() == nil && parsedIP.To16() != nil { + ipBind = "udp6" + } else if parsedIP.To4() != nil { + ipBind = "udp4" + } + } + ln, err := net.ListenPacket(ipBind, net.JoinHostPort(ip, strconv.Itoa(port))) + if err != nil { + return nil, 0, err + } + + port = ln.LocalAddr().(*net.UDPAddr).Port + return ln, port, nil +} + // forceClose the given socket. func forceClose(conn net.Conn) error { // Close may be called more than once. diff --git a/pkg/test/echo/server/forwarder/instance.go b/pkg/test/echo/server/forwarder/instance.go index 9d78309949d0..8b81d5072dc0 100644 --- a/pkg/test/echo/server/forwarder/instance.go +++ b/pkg/test/echo/server/forwarder/instance.go @@ -52,6 +52,7 @@ func New() *Instance { protocolMap[scheme.TLS] = add(newTLSProtocol(e)) protocolMap[scheme.XDS] = add(newXDSProtocol(e)) protocolMap[scheme.TCP] = add(newTCPProtocol(e)) + protocolMap[scheme.UDP] = add(newUDPProtocol(e)) return &Instance{ e: e, diff --git a/pkg/test/echo/server/forwarder/udp.go b/pkg/test/echo/server/forwarder/udp.go new file mode 100644 index 000000000000..7046c4a8dc7b --- /dev/null +++ b/pkg/test/echo/server/forwarder/udp.go @@ -0,0 +1,123 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package forwarder + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "net/http" + "strings" + + "istio.io/istio/pkg/test/echo" + "istio.io/istio/pkg/test/echo/common" + "istio.io/istio/pkg/test/echo/proto" +) + +var _ protocol = &udpProtocol{} + +type udpProtocol struct { + e *executor +} + +func newUDPProtocol(e *executor) protocol { + return &udpProtocol{e: e} +} + +func (c *udpProtocol) ForwardEcho(ctx context.Context, cfg *Config) (*proto.ForwardEchoResponse, error) { + return doForward(ctx, cfg, c.e, c.makeRequest) +} + +func (c *udpProtocol) makeRequest(ctx context.Context, cfg *Config, requestID int) (string, error) { + conn, err := newUDPConnection(cfg) + if err != nil { + return "", err + } + defer func() { _ = conn.Close() }() + + msgBuilder := strings.Builder{} + echo.ForwarderURLField.WriteForRequest(&msgBuilder, requestID, cfg.Request.Url) + + if cfg.Request.Message != "" { + echo.ForwarderMessageField.WriteForRequest(&msgBuilder, requestID, cfg.Request.Message) + } + + // Apply per-request timeout to calculate deadline for reads/writes. + ctx, cancel := context.WithTimeout(ctx, cfg.timeout) + defer cancel() + + // Apply the deadline to the connection. + deadline, _ := ctx.Deadline() + if err := conn.SetWriteDeadline(deadline); err != nil { + return msgBuilder.String(), err + } + if err := conn.SetReadDeadline(deadline); err != nil { + return msgBuilder.String(), err + } + + // Make sure the client writes something to the buffer + message := "HelloWorld" + if cfg.Request.Message != "" { + message = cfg.Request.Message + } + + if _, err := conn.Write([]byte(message + "\n")); err != nil { + fwLog.Warnf("UDP write failed: %v", err) + return msgBuilder.String(), err + } + var resBuffer bytes.Buffer + buf := make([]byte, 1024+len(message)) + n, err := conn.Read(buf) + if err != nil && err != io.EOF { + fwLog.Warnf("UDP read failed (already read %d bytes): %v", len(resBuffer.String()), err) + return msgBuilder.String(), err + } + resBuffer.Write(buf[:n]) + + // format the output for forwarder response + for _, line := range strings.Split(string(buf[:n]), "\n") { + if line != "" { + echo.WriteBodyLine(&msgBuilder, requestID, line) + } + } + + msg := msgBuilder.String() + expected := fmt.Sprintf("%s=%d", string(echo.StatusCodeField), http.StatusOK) + if cfg.Request.ExpectedResponse != nil { + expected = cfg.Request.ExpectedResponse.GetValue() + } + if !strings.Contains(msg, expected) { + return msg, fmt.Errorf("expect to recv message with %s, got %s. Return EOF", expected, msg) + } + return msg, nil +} + +func (c *udpProtocol) Close() error { + return nil +} + +func newUDPConnection(cfg *Config) (net.Conn, error) { + address := cfg.Request.Url[len(cfg.scheme+"://"):] + + if cfg.secure { + return nil, fmt.Errorf("TLS not available") + } + + ctx, cancel := context.WithTimeout(context.Background(), common.ConnectionTimeout) + defer cancel() + return newDialer(cfg).DialContext(ctx, "udp", address) +} diff --git a/pkg/test/echo/server/instance.go b/pkg/test/echo/server/instance.go index 8d713358a31f..415f37fd5a5f 100644 --- a/pkg/test/echo/server/instance.go +++ b/pkg/test/echo/server/instance.go @@ -254,6 +254,7 @@ func (s *Instance) validate() error { for _, port := range s.Ports { switch port.Protocol { case protocol.TCP: + case protocol.UDP: case protocol.HTTP: case protocol.HTTPS: case protocol.HTTP2: