-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client #8543
Conversation
Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually? |
filebeat/inputsource/tcp/client.go
Outdated
ID() string | ||
} | ||
|
||
// Client allows a callback to occur when a new client connects or disconnects to the server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on exported type ClientCallback should be of the form "ClientCallback ..." (with optional leading article)
de8c1e1
to
554755b
Compare
554755b
to
1817ff7
Compare
filebeat/inputsource/tcp/client.go
Outdated
|
||
"github.com/elastic/beats/filebeat/inputsource" | ||
"github.com/elastic/beats/libbeat/common/transport/tlscommon" | ||
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
||
// Client is a remote client. | ||
// ClientInfo is a remote client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the rename?
filebeat/input/syslog/config.go
Outdated
return tcp.New(&config.Config, splitFunc, cb) | ||
factory := func() (inputsource.NetworkFunc, bufio.SplitFunc, tcp.ClientCallback, tcp.ClientCallback) { | ||
return cb, splitFunc, nil, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this factory in the TCP package so we can reuse the same factory definition for both TCP input and syslog.
filebeat/inputsource/tcp/client.go
Outdated
@@ -41,15 +42,32 @@ type client struct { | |||
splitFunc bufio.SplitFunc | |||
maxMessageSize uint64 | |||
timeout time.Duration | |||
onConnect ClientCallback | |||
OnDisconnect ClientCallback | |||
id string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets keep the original type uuid.UUID
here instead of using a string.
filebeat/inputsource/tcp/client.go
Outdated
@@ -41,15 +42,32 @@ type client struct { | |||
splitFunc bufio.SplitFunc | |||
maxMessageSize uint64 | |||
timeout time.Duration | |||
onConnect ClientCallback | |||
OnDisconnect ClientCallback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with the other Fields and callback, s/OnDisconnect/onDisconnect
filebeat/inputsource/tcp/server.go
Outdated
@@ -124,11 +123,11 @@ func (s *Server) run() { | |||
|
|||
err := client.handle() | |||
if err != nil { | |||
s.log.Debugw("Client error", "error", err) | |||
s.log.Debugw("ClientInfo error", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We did not change the type, so let's keep Client
as the log selector instead of ClientInfo
filebeat/inputsource/tcp/server.go
Outdated
} | ||
|
||
defer s.log.Debugw( | ||
"Client disconnected", | ||
"ClientInfo disconnected", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above lets keep Client.
filebeat/inputsource/tcp/client.go
Outdated
} | ||
|
||
// ClientFactory passes a connection ID as an input and gets back a NetworkFunc and a SplitFunc | ||
type ClientFactory func() (inputsource.NetworkFunc, bufio.SplitFunc, ClientCallback, ClientCallback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that we miss some code in this PR I am looking at the comments and I would expect a factory to create a new object or some new code provided by X input. Looking at this method signature I don't see any parameters but In the comment I see passes a connection ID as an input and gets back
.
From what I remember from previous discussion we want to achieve the following: create a parser unique per client, that could contain states?
I think we are adding dependencies where we could split them, we could do the following:
- Revert the changes for UUID in the TCP type, creating an ID could be implemented in the custom decoder when it get created.
- Remove any callbacks,
onConnect/onDisconnect
- Create a new interface, this interface will receive an
io.Reader
which is
type TCPDecoder interface {
func Handle(io.Reader) error
func Close() // Maybe needed for you case to do some cleanup.
}
- The factory return an instance of a TCPDecoder
- Move the current logic into a SplitDecoder, the split decoder will accepts the split func and a callback, this will make it backward compatible with what we have today.
I believe this will better encapsulate your code and you will be free to do whatever you want from the io.Reader, I think this make it lot more flexible.
filebeat/inputsource/tcp/server.go
Outdated
idx := 0 | ||
for client := range s.clients { | ||
for client, _ := range s.clients { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should omit 2nd value from range; this loop is equivalent to for client := range ...
filebeat/inputsource/tcp/client.go
Outdated
client := &client{ | ||
conn: conn, | ||
log: log.With("remote_address", conn.RemoteAddr()), | ||
) *splitClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported func NewSplitClient returns unexported type *tcp.splitClient, which can be annoying to use
a3308d5
to
b3193c5
Compare
filebeat/inputsource/tcp/client.go
Outdated
} | ||
|
||
// NewSplitClient allows creation of a TCP client that has splitting capabilities. | ||
func NewSplitClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the constructor used somewhere else? Let's not export symbols we don't need somewhere else for sure.
filebeat/inputsource/tcp/client.go
Outdated
conn net.Conn, | ||
log *logp.Logger, | ||
// ClientFactory returns a Client func | ||
type ClientFactory func(config *Config) Client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to documented more detailed how "client" and "server" interact.
Also I'm not sure I like the name Client
here. Client
suggests we have an actor connection to some remote endpoint. This is more like a Handler
or ConnectionHandler
.
The config
parameter is a shared resource between server and all clients. Maybe better pass as value, so no handler/client gets the chance to change a value under the hood.
@@ -79,24 +95,24 @@ func (c *client) handle() error { | |||
for scanner.Scan() { | |||
err := scanner.Err() | |||
if err != nil { | |||
// we are forcing a close on the socket, lets ignore any error that could happen. | |||
// we are forcing a Close on the socket, lets ignore any error that could happen. | |||
select { | |||
case <-c.done: | |||
break | |||
default: | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel could be closed anytime, not only if the reader fails.
The way this function is written the callback if scan could produce an 'event', but channel is closed in the meantime. Is this expected behavior or do we rather not want to publish the event on close and guarantee the handler can return immediately.
filebeat/inputsource/tcp/conn.go
Outdated
@@ -71,7 +71,7 @@ type DeadlineReader struct { | |||
} | |||
|
|||
// NewDeadlineReader returns a new DeadlineReader | |||
func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader { | |||
func NewDeadlineReader(c net.Conn, timeout time.Duration) io.Reader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change the return type here? Prefer to return concrete types, but accept interfaces.
looks like requested changes have been adressed
Jenkins, test this. |
This CI fail seems to be related:
|
never mind i have updated the PR with the fix |
CHANGELOG.asciidoc
Outdated
@@ -231,6 +231,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...v7.0.0-rc1[Check the HEA | |||
- Add ISO8601 timestamp support in syslog metricset. {issue}8716[8716] {pull}10736[10736] | |||
- Add support for loading custom NetFlow and IPFIX field definitions to netflow input. {pull}10945[10945] {pull}11223[11223] | |||
- Added categorization fields for SSH login events in the system/auth fileset. {pull}11334[11334] | |||
- Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changelog suggest it's an usr visible feature in 7.0.0-rc1. Please move the changelog entry to: CHANGELOG-developer.next.asciidoc
thanks @urso |
This PR changes the way TCP input source is created to a ConnectionFactory based model which takes a
SplitFunc
,NetworkFunc
and twoClientCallback
funcs that can be called during a client connect and disconnect.These hooks allow the TCP implementer to do better logging, stateful processing and also spin up splitters per client.