Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Composable ACKer #19632

Merged
merged 8 commits into from
Jul 6, 2020
Merged

Composable ACKer #19632

merged 8 commits into from
Jul 6, 2020

Conversation

urso
Copy link

@urso urso commented Jul 3, 2020

  • Refactoring

What does this PR do?

This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Review Notes

Although the PR is quite big, the main difference is that the ACKCount, ACKEvents, and ACKLastEvents handlers have been replaced by a single interface (beat.ACKer). The original ACKer implementations from libbeat/publisher/pipeline/acker.go and libbeat/publisher/pipeline/client_acker.go have been moved libbeat/common/acker. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The acker.Combine and acker.ConnectionOnly are the only new additions to the code base.

Why is it important?

tl;dr This change is required to integrate the v2 input API.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

ACKer implementations should be handled mostly by unit tests. Still, this change can have great impact and some Beats should be tested to double check that we have not introduced any regressions here.

  • Functionbeat:
    • check lambda function does not hang after publishing events (any kind of input will work)
    • check lambda function correctly stops in presence of processors that filter out events. E.g. setup kinesis and filter out events using drop_event with conditional. Check non-filtered events are published
    • check lambda function correctly stops if all events are filtered out
  • Choose one of the end-to-end ACKing inputs in filebeat (they all register similar handlers) and verify that end-to-end ACK works correctly. (e.g. Kafka input by monitoring the consumer group).

Related issues

Dev Docs:

Calls to pipeline.ConnectWith that used to setup an ACK callback need to use the acker helpers. Previously the callbacks have been ignored if the input is shutdown, but not all events have been ACKed yet. For end-to-end ACKers that loose the connection to the source-system on shutdown this behavior can be preserved by using ack.ConnectionOnly(<acker>).

Instead of directly passing callbacks, the callbacks should be wrapped using some of the utility functions in the libbeat/common/acker package:

  • Replace ACKCount: func(n int) { ... } with ACKHandler: acker.Counting(func(n int) { ... }).
  • Replace ACKEvents: func(private []interface{}) { ... } with ACKHandler: acker.EventPrivateReporter(func(_ int, private []interface{}) { ... }).
  • Replace ACKLastEvent: func(private interface{}) { ... } with ACKHandler: acker.LastEventPrivateReporter(func(_ int, interface{}) { ... })

The (beat.Pipeline).SetACKHandler method has been removed. libbeat/common/acker and libbeat/publisher/pipetool provide some helpers to modify and combine ACKers for all new beat.Client connections. For example this will use the global and local ACK handler for each event published.

pipeline = pipetool.WithACKer(pipeline, globalACKHandler)

...

client, err := pipeline.ConnectWith(beat.ClientConfig{
  ACKHandler: localACKHandler,
})

The WithACKer helper can be used arbitrarily often. ACKers are combined level by level via acker.Combine.

@urso urso added needs_backport PR is waiting to be backported to other branches. Project:Filebeat-Input-v2 Team:Services (Deprecated) Label for the former Integrations-Services team v7.9.0 labels Jul 3, 2020
@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Jul 3, 2020
@elasticmachine
Copy link
Collaborator

elasticmachine commented Jul 3, 2020

💔 Tests Failed

Pipeline View Test View Changes Artifacts preview

Expand to view the summary

Build stats

  • Build Cause: [Pull request #19632 updated]

  • Start Time: 2020-07-06T12:11:15.853+0000

  • Duration: 79 min 9 sec

Test stats 🧪

Test Results
Failed 5
Passed 9719
Skipped 1573
Total 11297

Test errors

Expand to view the tests failures

  • Name: Build and Test / Auditbeat oss Mac OS X / TestEventReader – file_integrity

    • Age: 1
    • Duration: 1.38
    • Error Details: Failed
  • Name: Build and Test / Auditbeat oss Mac OS X / TestEventReader/created – file_integrity

    • Age: 1
    • Duration: 1.15
    • Error Details: Failed
  • Name: Build and Test / Auditbeat oss Mac OS X / TestRaces – file_integrity

    • Age: 1
    • Duration: 10.8
    • Error Details: Failed
  • Name: Build and Test / Auditbeat oss Mac OS X / TestExcludedFiles – file_integrity

    • Age: 1
    • Duration: 10.11
    • Error Details: Failed
  • Name: Build and Test / Auditbeat oss Mac OS X / TestIncludedExcludedFiles – file_integrity

    • Age: 1
    • Duration: 10.48
    • Error Details: Failed

Steps errors

Expand to view the steps failures

  • Name: Mage build unitTest

    • Description: mage build unitTest

    • Duration: 8 min 40 sec

    • Start Time: 2020-07-06T12:34:31.020+0000

    • log

  • Name: Recursively delete the current directory from the workspace

    • Description: script returned exit code 1

    • Duration: 0 min 18 sec

    • Start Time: 2020-07-06T12:42:21.474+0000

    • log

  • Name: Report to Codecov

    • Description: curl -sSLo codecov https://codecov.io/bash for i in auditbeat filebeat heartbeat libbeat metricbeat packetbeat winlogbeat journalbeat do FILE="${i}/build/coverage/full.cov" if [ -f "${FILE}" ]; then bash codecov -f "${FILE}" fi done

    • Duration: 2 min 22 sec

    • Start Time: 2020-07-06T12:45:32.591+0000

    • log

Log output

Expand to view the last 100 lines of log output

[2020-07-06T13:29:56.754Z] + FILE=packetbeat/build/coverage/full.cov
[2020-07-06T13:29:56.754Z] + [ -f packetbeat/build/coverage/full.cov ]
[2020-07-06T13:29:56.754Z] + FILE=winlogbeat/build/coverage/full.cov
[2020-07-06T13:29:56.754Z] + [ -f winlogbeat/build/coverage/full.cov ]
[2020-07-06T13:29:56.754Z] + FILE=journalbeat/build/coverage/full.cov
[2020-07-06T13:29:56.754Z] + [ -f journalbeat/build/coverage/full.cov ]
[2020-07-06T13:29:57.270Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats
[2020-07-06T13:29:57.587Z] + find . -type f -name TEST*.xml -path */build/* -delete
[2020-07-06T13:29:57.600Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Lint
[2020-07-06T13:29:57.701Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Elastic-Agent-Mac-OS-X
[2020-07-06T13:29:57.791Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Winlogbeat-oss
[2020-07-06T13:29:57.888Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Elastic-Agent-x-pack
[2020-07-06T13:29:57.968Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Auditbeat-x-pack-Mac-OS-X
[2020-07-06T13:29:58.042Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Auditbeat-crosscompile
[2020-07-06T13:29:58.128Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Dockerlogbeat
[2020-07-06T13:29:58.203Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Journalbeat-oss
[2020-07-06T13:29:58.277Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Generators-Metricbeat-Linux
[2020-07-06T13:29:58.361Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Functionbeat-x-pack
[2020-07-06T13:29:58.435Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Elastic-Agent-x-pack-Windows
[2020-07-06T13:29:58.511Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Filebeat-Mac-OS-X
[2020-07-06T13:29:58.584Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Auditbeat-oss-Mac-OS-X
[2020-07-06T13:29:58.660Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack-Mac-OS-X
[2020-07-06T13:29:58.733Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-OSS-Unit-tests
[2020-07-06T13:29:58.807Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Filebeat-x-pack-Mac-OS-X
[2020-07-06T13:29:58.882Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Heartbeat-oss
[2020-07-06T13:29:58.957Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Auditbeat-oss-Windows
[2020-07-06T13:29:59.029Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Auditbeat-x-pack
[2020-07-06T13:29:59.099Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-crosscompile
[2020-07-06T13:29:59.170Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Auditbeat-x-pack-Windows
[2020-07-06T13:29:59.242Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Winlogbeat-Windows-x-pack
[2020-07-06T13:29:59.313Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Filebeat-x-pack-Windows
[2020-07-06T13:29:59.395Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Libbeat-x-pack
[2020-07-06T13:29:59.467Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Filebeat-Windows
[2020-07-06T13:29:59.539Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Auditbeat-oss-Linux
[2020-07-06T13:29:59.611Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-Windows
[2020-07-06T13:29:59.693Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Packetbeat-oss
[2020-07-06T13:29:59.775Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Winlogbeat-Windows
[2020-07-06T13:29:59.855Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Filebeat-x-pack
[2020-07-06T13:29:59.940Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack-Windows
[2020-07-06T13:30:00.018Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Generators-Beat-Linux
[2020-07-06T13:30:00.093Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Functionbeat-Mac-OS-X-x-pack
[2020-07-06T13:30:00.187Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Filebeat-oss
[2020-07-06T13:30:00.294Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Heartbeat-Mac-OS-X
[2020-07-06T13:30:00.399Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-Mac-OS-X
[2020-07-06T13:30:00.496Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-OSS-Integration-tests
[2020-07-06T13:30:00.585Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-Python-integration-tests
[2020-07-06T13:30:00.678Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Generators-Metricbeat-Mac-OS-X
[2020-07-06T13:30:00.767Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Heartbeat-Windows
[2020-07-06T13:30:00.855Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Libbeat-oss
[2020-07-06T13:30:00.929Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Functionbeat-Windows
[2020-07-06T13:30:01.005Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Generators-Beat-Mac-OS-X
[2020-07-06T13:30:01.078Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Libbeat-crosscompile
[2020-07-06T13:30:01.152Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Libbeat-stress-tests
[2020-07-06T13:30:01.234Z] Running in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack
[2020-07-06T13:30:01.608Z] + cat
[2020-07-06T13:30:01.608Z] + /usr/local/bin/runbld ./runbld-script
[2020-07-06T13:30:01.608Z] Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF8
[2020-07-06T13:30:09.770Z] runbld>>> runbld started
[2020-07-06T13:30:09.770Z] runbld>>> 1.6.12/f45d832f2ba0aa2722ab4ec1fda8ad140f027f8b
[2020-07-06T13:30:10.345Z] runbld>>> The following profiles matched the job 'Beats/beats-beats-mbp/PR-19632' in order of occurrence in the config (last value wins).
[2020-07-06T13:30:11.729Z] runbld>>> Debug logging enabled.
[2020-07-06T13:30:11.729Z] runbld>>> Storing result
[2020-07-06T13:30:11.992Z] runbld>>> Store result: created {:total 2, :successful 2, :failed 0} 1
[2020-07-06T13:30:11.992Z] runbld>>> BUILD: https://c150076387b5421f9154dfbf536e5c60.us-west1.gcp.cloud.es.io:9243/build-1587637540455/t/20200706133011-D49F1CC6
[2020-07-06T13:30:11.992Z] runbld>>> Adding system facts.
[2020-07-06T13:30:12.937Z] runbld>>> Adding vcs info for the latest commit:  90b6a33cd623d74fa204fc67773a852e13036733
[2020-07-06T13:30:12.937Z] runbld>>> >>>>>>>>>>>> SCRIPT EXECUTION BEGIN >>>>>>>>>>>>
[2020-07-06T13:30:12.937Z] runbld>>> Adding /usr/lib/jvm/java-8-openjdk-amd64/bin to the path.
[2020-07-06T13:30:12.937Z] + echo 'Processing JUnit reports with runbld...'
[2020-07-06T13:30:12.937Z] Processing JUnit reports with runbld...
[2020-07-06T13:30:13.510Z] runbld>>> <<<<<<<<<<<< SCRIPT EXECUTION END <<<<<<<<<<<<
[2020-07-06T13:30:13.510Z] runbld>>> DURATION: 26ms
[2020-07-06T13:30:13.510Z] runbld>>> STDOUT: 40 bytes
[2020-07-06T13:30:13.510Z] runbld>>> STDERR: 49 bytes
[2020-07-06T13:30:13.510Z] runbld>>> WRAPPED PROCESS: SUCCESS (0)
[2020-07-06T13:30:13.510Z] runbld>>> Searching for build metadata in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats
[2020-07-06T13:30:14.455Z] runbld>>> Storing build metadata: 
[2020-07-06T13:30:14.455Z] runbld>>> Adding test report.
[2020-07-06T13:30:14.455Z] runbld>>> Searching for junit test output files with the pattern: TEST-.*\.xml$ in: /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats
[2020-07-06T13:30:15.398Z] runbld>>> Found 114 test output files
[2020-07-06T13:30:15.972Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-istio.xml
[2020-07-06T13:30:15.972Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-tomcat.xml
[2020-07-06T13:30:15.972Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-iis.xml
[2020-07-06T13:30:15.972Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-openmetrics.xml
[2020-07-06T13:30:15.972Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-x-pack/x-pack/metricbeat/build/TEST-go-integration-activemq.xml
[2020-07-06T13:30:17.362Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-OSS-Integration-tests/metricbeat/build/TEST-go-integration-graphite.xml
[2020-07-06T13:30:17.362Z] runbld>>> No testsuite node found in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632/src/github.com/elastic/beats/Metricbeat-OSS-Integration-tests/metricbeat/build/TEST-go-integration-windows.xml
[2020-07-06T13:30:17.625Z] runbld>>> Test output logs contained: Errors: 0 Failures: 5 Tests: 11151 Skipped: 1334
[2020-07-06T13:30:17.625Z] runbld>>> Storing result
[2020-07-06T13:30:17.625Z] runbld>>> FAILURES: 5
[2020-07-06T13:30:19.010Z] runbld>>> Store result: updated {:total 2, :successful 2, :failed 0} 2
[2020-07-06T13:30:19.010Z] runbld>>> BUILD: https://c150076387b5421f9154dfbf536e5c60.us-west1.gcp.cloud.es.io:9243/build-1587637540455/t/20200706133011-D49F1CC6
[2020-07-06T13:30:19.010Z] runbld>>> Email notification disabled by environment variable.
[2020-07-06T13:30:19.010Z] runbld>>> Slack notification disabled by environment variable.
[2020-07-06T13:30:24.550Z] Running on Jenkins in /var/lib/jenkins/workspace/Beats_beats-beats-mbp_PR-19632
[2020-07-06T13:30:24.650Z] [INFO] getVaultSecret: Getting secrets
[2020-07-06T13:30:24.711Z] Masking supported pattern matches of $VAULT_ADDR or $VAULT_ROLE_ID or $VAULT_SECRET_ID
[2020-07-06T13:30:25.436Z] + chmod 755 generate-build-data.sh
[2020-07-06T13:30:25.436Z] + ./generate-build-data.sh https://beats-ci.elastic.co/blue/rest/organizations/jenkins/pipelines/Beats/beats-beats-mbp/PR-19632/ https://beats-ci.elastic.co/blue/rest/organizations/jenkins/pipelines/Beats/beats-beats-mbp/PR-19632/runs/8 FAILURE 4749322
[2020-07-06T13:30:25.436Z] INFO: curl https://beats-ci.elastic.co/blue/rest/organizations/jenkins/pipelines/Beats/beats-beats-mbp/PR-19632/runs/8/steps/?limit=10000 -o steps-info.json

@urso urso added the release-note:dev_docs Contains a Dev Docs section label Jul 3, 2020
@urso
Copy link
Author

urso commented Jul 3, 2020

jenkins run the tests please

@urso urso marked this pull request as ready for review July 3, 2020 14:59
@elasticmachine
Copy link
Collaborator

Pinging @elastic/integrations-services (Team:Services)

urso added 7 commits July 4, 2020 01:35
This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.
@urso urso force-pushed the simplified-acker branch from 758a0b2 to 0cdab12 Compare July 3, 2020 23:35

// ConnectionOnly ensures that the given ACKer is only used for as long as the
// pipeline Client is active. Once the Client is closed, the ACKer will drop
// it's internal state and no more ACK events will be processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// it's internal state and no more ACK events will be processed.
// its internal state and no more ACK events will be processed.

Copy link
Contributor

@kvch kvch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done basic Functionbeat tests, but it would be nice if someone else would test it during the "testing weeks" after the FF.

@urso urso merged commit bb89344 into elastic:master Jul 6, 2020
@urso urso deleted the simplified-acker branch July 6, 2020 17:43
urso pushed a commit to urso/beats that referenced this pull request Jul 8, 2020
This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.

(cherry picked from commit bb89344)
@urso urso removed the needs_backport PR is waiting to be backported to other branches. label Jul 8, 2020
urso pushed a commit that referenced this pull request Jul 8, 2020
This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.

(cherry picked from commit bb89344)
@urso urso added the test-plan Add this PR to be manual test plan label Jul 14, 2020
@andresrc andresrc added the test-plan-added This PR has been added to the test plan label Jul 21, 2020
melchiormoulin pushed a commit to melchiormoulin/beats that referenced this pull request Oct 14, 2020
This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Project:Filebeat-Input-v2 release-note:dev_docs Contains a Dev Docs section review Team:Services (Deprecated) Label for the former Integrations-Services team test-plan Add this PR to be manual test plan test-plan-added This PR has been added to the test plan v7.9.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants