Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Jetstream API #98

Merged
merged 11 commits into from
Nov 8, 2023
Merged

feat: Jetstream API #98

merged 11 commits into from
Nov 8, 2023

Conversation

cv65kr
Copy link
Member

@cv65kr cv65kr commented Nov 7, 2023

Reason for This PR

closes: roadrunner-server/roadrunner#1574

Description of Changes

  • Replacing the old NATS API with a new one.

License Acceptance

By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.

PR Checklist

[Author TODO: Meet these criteria.]
[Reviewer TODO: Verify that these criteria are met. Request changes if not]

  • All commits in this PR are signed (git commit -s).
  • The reason for this PR is clearly provided (issue no. or explanation).
  • The description of changes is clear and encompassing.
  • Any required documentation changes (code and docs) are included in this PR.
  • Any user-facing changes are mentioned in CHANGELOG.md.
  • All added/changed functionality is tested.

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced a new Search class with methods for fetching and displaying results.
    • Added a search bar to the Hero component and a Search component to the App component.
    • Added styles for the search bar in the styles.css file.
  • Refactor

    • Renamed and updated variable names and struct fields in various files.
  • Chores

    • Renamed and updated variable names and struct fields in various files.
  • Tests

    • No visible changes to end-users.
  • Documentation

    • No visible changes to end-users.

Copy link

coderabbitai bot commented Nov 7, 2023

Walkthrough

The changes primarily involve refactoring the natsjobs package and test configurations to use the jetstream package instead of nats. This includes renaming variables, updating function calls, and modifying struct fields. The changes also involve updating test configurations to use wildcard characters in the subject field and renaming the stream field to stream_id.

Changes

File(s) Summary
natsjobs/config.go Renamed Stream to StreamID in the config struct and updated default value assignment.
natsjobs/driver.go Replaced nats with jetstream in various places, introduced a new consumer struct, and updated several functions to use jetstream.
natsjobs/item.go, natsjobs/listener.go, natsjobs/unpack.go Made changes related to the switch from nats to jetstream, including updating import statements and modifying function calls.
tests/configs/... Updated job configurations to use wildcard characters in the subject field and renamed the stream field to stream_id.
tests/jobs_nats_test.go Updated test functions to use jetstream instead of nats, corrected a typo, and modified stream subjects and queue names.

Poem

🐇 In the code's dance, changes take flight, 🎉

Jetstream leads the way, shining bright. 🚀

Wildcards in subjects, stream IDs anew, 🌟

The code grows stronger, bugs are few. 🐞

As November's leaves gently fall, 🍂

We celebrate these changes, big and small. 🎊

Here's to the code, evolving with might, 💡

Guiding us through the coder's night. 🌙


Tips

Chat with CodeRabbit Bot (@coderabbitai)

  • If you reply to a review comment from CodeRabbit, the bot will automatically respond.
  • To engage with CodeRabbit bot directly around the specific lines of code in the PR, mention @coderabbitai in your review comment
  • Note: Review comments are made on code diffs or files, not on the PR overview.
  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai help to get help.
  • @coderabbitai resolve to resolve all the CodeRabbit review comments.

Note: For conversation with the bot, please use the review comments on code diffs or files.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • The JSON schema for the configuration file is available here.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/coderabbit-overrides.v2.json

rustatian and others added 4 commits November 7, 2023 13:56
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
@rustatian rustatian self-requested a review November 7, 2023 13:05
@rustatian rustatian added the enhancement New feature or request label Nov 7, 2023
@cv65kr cv65kr marked this pull request as ready for review November 7, 2023 13:56
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 4

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 15c8d2e and 5b1be35.
Files ignored due to filter (5)
  • go.mod
  • go.sum
  • go.work.sum
  • tests/go.sum
  • tests/php_test_files/composer.lock
Files selected for processing (14)
  • natsjobs/config.go (3 hunks)
  • natsjobs/driver.go (13 hunks)
  • natsjobs/item.go (5 hunks)
  • natsjobs/listener.go (6 hunks)
  • natsjobs/unpack.go (1 hunks)
  • tests/configs/.rr-nats-durability-redial.yaml (2 hunks)
  • tests/configs/.rr-nats-init-v27-br.yaml (2 hunks)
  • tests/configs/.rr-nats-init-v27.yaml (1 hunks)
  • tests/configs/.rr-nats-init.yaml (2 hunks)
  • tests/configs/.rr-nats-otel.yaml (1 hunks)
  • tests/configs/.rr-nats-pq.yaml (2 hunks)
  • tests/configs/.rr-nats-raw.yaml (1 hunks)
  • tests/configs/.rr-no-global.yaml (1 hunks)
  • tests/jobs_nats_test.go (11 hunks)
Additional comments: 46
natsjobs/unpack.go (1)
  • 22-26: The Queue field of the Options struct has been changed from c.stream to c.streamID. Ensure that the streamID is correctly initialized and used throughout the codebase.
tests/configs/.rr-nats-otel.yaml (1)
  • 40-47: Ensure that the wildcard pattern in the subject field and the new stream_id field are correctly configured and compatible with the rest of your NATS setup. Also, verify that the deliver_new field is expected to be a string and not a boolean.
tests/configs/.rr-nats-init.yaml (2)
  • 31-38: The subject field has been updated to use a wildcard pattern, and the stream field has been replaced with stream_id. Ensure that these changes are compatible with the rest of the system and that the wildcard pattern in the subject field matches the expected subjects. Also, verify that the stream_id is correctly set.

  • 41-48: Similar to the previous comment, ensure that the changes to the subject and stream_id fields are compatible with the rest of the system. Also, verify that the wildcard pattern in the subject field matches the expected subjects and that the stream_id is correctly set.

tests/configs/.rr-nats-raw.yaml (1)
  • 31-38: The configuration for the NATS driver has been updated. Ensure that the new wildcard pattern in the subject field and the renamed stream_id field are compatible with the rest of the system. Also, verify that the new configuration values are correctly used in the codebase.
tests/configs/.rr-nats-pq.yaml (2)
  • 29-36: The subject field has been updated to include a wildcard character, and the stream field has been replaced with stream_id. Ensure that these changes are reflected in the corresponding code and that the wildcard character in the subject field does not introduce any unexpected behavior. Also, verify that the stream_id "foo-pq" is correctly configured in your Jetstream setup.

  • 39-46: Similar to the previous comment, ensure that the changes in the subject and stream_id fields are reflected in the corresponding code and that the wildcard character in the subject field does not introduce any unexpected behavior. Also, verify that the stream_id "foo-2-pq" is correctly configured in your Jetstream setup.

tests/configs/.rr-nats-init-v27-br.yaml (2)
  • 31-38: The subject field now includes a wildcard character, and the stream field has been replaced with stream_id. Ensure that these changes are compatible with the rest of the codebase and that the wildcard character in the subject field does not unintentionally match unwanted subjects.

  • 41-48: The subject field now includes a wildcard character, and the stream field has been replaced with stream_id. The priority value has been updated from 1 to 2. Ensure that these changes are compatible with the rest of the codebase and that the wildcard character in the subject field does not unintentionally match unwanted subjects. Also, verify that the change in priority value does not affect the order of job execution in a way that could cause issues.

tests/configs/.rr-nats-durability-redial.yaml (2)
  • 31-38: The subject field has been updated to use wildcard characters, and the stream field has been replaced with stream_id. Ensure that these changes are compatible with the rest of the system and that the wildcard characters in the subject field do not unintentionally match unwanted subjects.

  • 42-49: The same changes as in the previous hunk have been made here. Again, verify that these changes are compatible with the rest of the system and that the wildcard characters in the subject field do not unintentionally match unwanted subjects.

tests/configs/.rr-nats-init-v27.yaml (2)
  • 31-37: The subject field has been updated to use wildcard characters, and the stream field has been renamed to stream_id. Ensure that these changes are compatible with the rest of the codebase and that the wildcard characters in the subject field do not unintentionally match unwanted subjects.

  • 39-46: Similar to the previous comment, ensure that the changes to the subject and stream_id fields are compatible with the rest of the codebase and that the wildcard characters in the subject field do not unintentionally match unwanted subjects.

natsjobs/config.go (3)
  • 8-12: The renaming of pipeStream to pipeStreamID is consistent with the changes in the config struct. This change should be reflected wherever pipeStream was used.

  • 24-24: The renaming of Stream to StreamID in the config struct is a breaking change. Ensure that all references to Stream have been updated to StreamID throughout the codebase.

  • 45-47: The default value for StreamID has been updated. Ensure that this new default value doesn't conflict with any existing stream IDs in your NATS setup.

tests/configs/.rr-no-global.yaml (2)
  • 30-36: The subject field has been updated to use a wildcard character, and the stream field has been replaced with stream_id. Ensure that these changes are compatible with the rest of the codebase and the NATS Jetstream API.

  • 38-45: The subject field has been updated to use a wildcard character, and the stream field has been replaced with stream_id. The priority value has been updated from 1 to 2. Ensure that these changes are compatible with the rest of the codebase and the NATS Jetstream API.

natsjobs/listener.go (6)
  • 14-40: The listenerInit function has been significantly modified to handle JetStream messages. It now generates a unique ID using uuid.NewString(), creates a JetStream consumer, and sets up a message consumption context. The function also introduces a new consumer struct to manage the JetStream consumer. Ensure that the new logic aligns with the intended message consumption and handling behavior.

  • 46-55: The listenerStart function now handles JetStream messages. It checks if the message consumption channel is closed and handles errors in retrieving message metadata. Ensure that the error handling logic is correct and that the function behaves as expected when the channel is closed or when an error occurs in retrieving message metadata.

  • 75-85: The function unpacks the message data into an Item struct and sets up a tracing span. It also sets the queue and pipeline options for the item. Ensure that the unpacking and tracing logic is correct and that the queue and pipeline options are set as intended.

  • 92-98: The function checks if the deleteAfterAck option is enabled and sets the relevant options for the item. Ensure that the logic for handling the deleteAfterAck option is correct.

  • 113-119: The function handles the deletion of messages after acknowledgement. It checks if the deleteAfterAck option is enabled and deletes the message if it is. Ensure that the message deletion logic is correct and that messages are deleted as intended when the deleteAfterAck option is enabled.

  • 134-144: The function injects the tracing context into the item headers and inserts the item into the queue. It also handles the stopping of the consumer. Ensure that the context injection, item insertion, and consumer stopping logic are correct.

natsjobs/item.go (3)
  • 1-12: The import statements have been updated to reflect the new Jetstream API. Ensure that the new import paths are correct and that the old NATS API is no longer being used anywhere in the codebase.

  • 34-49: The type of the sub field in the Options struct has been changed from nats.JetStreamContext to jetstream.Stream. This change is consistent with the switch to the new Jetstream API. Make sure that all uses of this field in the codebase have been updated to match the new type.

  • 144-150: The error handling logic here seems to be correct. If the requeueFn function returns an error and the message was not auto acknowledged, the nak function is called. If the nak function also returns an error, both errors are joined and returned. This ensures that all errors are properly handled and reported.

natsjobs/driver.go (11)
  • 2-14: Imports have been updated to include the new Jetstream API and remove the old NATS API. Ensure that all dependencies are correctly managed and updated.

  • 46-65: The Driver struct has been updated to include new fields related to the Jetstream API. Ensure that these fields are correctly initialized and used throughout the code.

  • 67-71: A new consumer struct has been introduced. Ensure that this struct is correctly used and managed throughout the code.

  • 116-132: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [73-130]

The FromConfig function has been updated to use the new Jetstream API. Ensure that the function correctly initializes the Driver struct and handles errors.

  • 194-211: The FromPipeline function has been updated to use the new Jetstream API. Ensure that the function correctly initializes the Driver struct and handles errors.

  • 256-265: The Push function has been updated to use the new Jetstream API. Ensure that the function correctly publishes messages and handles errors.

  • 319-329: The Pause function has been updated to use the new Jetstream API. Ensure that the function correctly stops the consumer and handles errors.

  • 375-391: The State function has been updated to use the new Jetstream API. Ensure that the function correctly retrieves the state of the consumer and handles errors.

  • 405-419: The Stop function has been updated to use the new Jetstream API. Ensure that the function correctly stops the consumer, deletes the stream if necessary, and handles errors.

  • 443-458: The requeue function has been updated to use the new Jetstream API. Ensure that the function correctly requeues messages and handles errors.

  • 467-476: The disconnectHandler function has been updated to handle disconnections from the Jetstream API. Ensure that the function correctly logs disconnections and handles errors.

tests/jobs_nats_test.go (8)
  • 4-4: The context package is imported but not used in the hunks provided. Ensure it's used elsewhere in the file.

  • 23-23: The github.com/nats-io/nats.go/jetstream package is imported but not used in the hunks provided. Ensure it's used elsewhere in the file.

  • 516-516: The use of wildcard characters in pipeline and stream names can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.

  • 596-596: The use of wildcard characters in pipeline and stream names can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.

  • 686-688: The JetStream context is created but not used in the hunks provided. Ensure it's used elsewhere in the file.

  • 827-827: The use of wildcard characters in pipeline and stream names can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.

  • 856-856: The queue name is asserted with a wildcard character. This can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.

  • 991-991: The stream field has been replaced with stream_id. Ensure that all references to this field in the codebase have been updated to match the new field name.

natsjobs/item.go Show resolved Hide resolved
natsjobs/item.go Show resolved Hide resolved
natsjobs/config.go Outdated Show resolved Hide resolved
natsjobs/config.go Outdated Show resolved Hide resolved
natsjobs/config.go Show resolved Hide resolved
natsjobs/driver.go Outdated Show resolved Hide resolved
natsjobs/driver.go Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 8

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 5b1be35 and 7342f6a.
Files selected for processing (11)
  • natsjobs/config.go (2 hunks)
  • natsjobs/driver.go (13 hunks)
  • tests/configs/.rr-nats-durability-redial.yaml (2 hunks)
  • tests/configs/.rr-nats-init-v27-br.yaml (2 hunks)
  • tests/configs/.rr-nats-init-v27.yaml (2 hunks)
  • tests/configs/.rr-nats-init.yaml (2 hunks)
  • tests/configs/.rr-nats-otel.yaml (1 hunks)
  • tests/configs/.rr-nats-pq.yaml (2 hunks)
  • tests/configs/.rr-nats-raw.yaml (1 hunks)
  • tests/configs/.rr-no-global.yaml (2 hunks)
  • tests/jobs_nats_test.go (11 hunks)
Files skipped from review due to trivial changes (1)
  • tests/configs/.rr-nats-durability-redial.yaml
Additional comments: 25
natsjobs/config.go (2)
  • 21-27: The Stream field has been renamed to StreamID. Ensure that all references to this field in the codebase have been updated accordingly. Also, verify that the external systems that interact with this configuration are compatible with this change.

  • 45-47: The default value for StreamID has been set to "default-stream". This change aligns with the previous comment made by rustatian. Ensure that this default value is acceptable in all use cases.

tests/configs/.rr-nats-raw.yaml (1)
  • 31-37: The change in the subject field from "default-raw" to "default-raw." will allow the job to subscribe to subjects matching the pattern "default-raw." instead of only "default-raw". This change will likely affect the subscription behavior of the job. Ensure that this change is intended and that it does not cause any unintended side effects in the subscription behavior of the job. Also, the stream field has been replaced with stream_id. Make sure that all references to the stream field in the codebase have been updated to stream_id.
tests/configs/.rr-no-global.yaml (2)
  • 33-33: The subject field has been changed from "default" to "default.*". This change likely affects the message routing or filtering behavior within the system. Ensure that this change is intentional and that it does not break any existing functionality.

  • 42-42: The subject field has been changed from "default" to "default.*". This change likely affects the message routing or filtering behavior within the system. Ensure that this change is intentional and that it does not break any existing functionality.

tests/configs/.rr-nats-init-v27.yaml (2)
  • 31-37: The subject field has been changed to include a wildcard character. This change will affect the subscription behavior of the job. Ensure that this change is intended and that the wildcard character is correctly handled in the message subscription logic.
-        subject: "default-3"
+        subject: "default-3.*"
  • 40-46: The subject field has been changed to include a wildcard character. This change will affect the subscription behavior of the job. Ensure that this change is intended and that the wildcard character is correctly handled in the message subscription logic.
-        subject: "default-4"
+        subject: "default-4.*"
tests/configs/.rr-nats-init.yaml (2)
  • 31-37: The subject field has been changed to "xxxxxxx.*". This indicates that the job is now subscribed to all subjects that start with "xxxxxxx". Ensure that this is the intended behavior and that the wildcard character is being used correctly.

  • 41-47: The subject field has been changed to "default-2.*". This indicates that the job is now subscribed to all subjects that start with "default-2". Ensure that this is the intended behavior and that the wildcard character is being used correctly.

tests/configs/.rr-nats-otel.yaml (1)
  • 40-46: The change in the subject field from "default-otel" to "default-otel." will allow the NATS driver to subscribe to subjects matching "default-otel." instead of just "default-otel". Ensure that this change is intended and that the subjects of the messages being published match this new pattern.
tests/configs/.rr-nats-init-v27-br.yaml (2)
  • 31-37: The subject field has been updated to include a wildcard character, which will allow the job to match any subjects that start with "default-5.". Ensure that this change is intentional and that it won't lead to unexpected behavior due to the broader scope of subject matching. The stream field has been moved in the configuration. Verify that this reordering doesn't affect the functionality.

  • 41-47: The subject field has been updated to include a wildcard character, which will allow the job to match any subjects that start with "default-6.". Ensure that this change is intentional and that it won't lead to unexpected behavior due to the broader scope of subject matching.

tests/configs/.rr-nats-pq.yaml (2)
  • 29-35: The subject has been changed to "default-pq.*" which means it will match any subject that starts with "default-pq". Ensure that this is the intended behavior. Also, the "stream" field has been removed and replaced with "stream_id". Make sure that all references to the "stream" field in the codebase have been updated accordingly. The "delete_after_ack" field has been set to true, which means messages will be deleted after acknowledgment. Verify that this is the desired behavior.

  • 39-45: The subject has been changed to "default-2-pq.*" which means it will match any subject that starts with "default-2-pq". Ensure that this is the intended behavior. Also, the "stream" field has been removed and replaced with "stream_id". Make sure that all references to the "stream" field in the codebase have been updated accordingly. The "delete_after_ack" field has been set to true, which means messages will be deleted after acknowledgment. Verify that this is the desired behavior.

tests/jobs_nats_test.go (11)
  • 1-7: The context package is imported but not used in this hunk. Ensure it's used in the code not shown in this hunk.

  • 20-26: The github.com/nats-io/nats.go/jetstream package is imported but not used in this hunk. Ensure it's used in the code not shown in this hunk.

  • 207-207: The assertion message "nats disconnected" is now corrected. Good job.

  • 516-519: Ensure that the wildcard character in the subject "default-10.*" is handled correctly in the declareNATSPipe function.

  • 596-599: Ensure that the wildcard character in the subject "default-11.*" is handled correctly in the declareNATSPipe function.

  • 683-695: The jetstream.New function is used to create a JetStream context. Ensure that the conn object passed to this function is correctly initialized and connected to the NATS server.

  • 702-715: The jetstream.StreamConfig is used instead of nats.StreamConfig in stream creation. Also, js.CreateOrUpdateStream is used instead of js.AddStream to create or update a stream. Ensure that these changes are compatible with the rest of the codebase.

  • 827-830: Ensure that the wildcard character in the subject "default-13.*" is handled correctly in the declareNATSPipe function.

  • 840-843: Ensure that the queue name "default-13.*" is correctly handled in the out object.

  • 856-859: Ensure that the queue name "default-13.*" is correctly handled in the out object.

  • 988-994: The stream field is replaced with stream_id. Ensure that this change is compatible with the rest of the codebase.

tests/configs/.rr-nats-otel.yaml Show resolved Hide resolved
natsjobs/driver.go Show resolved Hide resolved
natsjobs/driver.go Show resolved Hide resolved
natsjobs/driver.go Show resolved Hide resolved
natsjobs/driver.go Outdated Show resolved Hide resolved
natsjobs/driver.go Show resolved Hide resolved
natsjobs/driver.go Show resolved Hide resolved
natsjobs/driver.go Show resolved Hide resolved
@cv65kr
Copy link
Member Author

cv65kr commented Nov 7, 2023

@coderabbitai pause

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 0

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 7342f6a and 14f7aa2.
Files selected for processing (1)
  • natsjobs/driver.go (13 hunks)
Additional comments: 6
natsjobs/driver.go (6)
  • 30-35: The Configurer interface is still not being used anywhere else in the code. If it's not used elsewhere, consider removing it.

  • 73-75: The function FromConfig still lacks error handling for when cfg.Has(configKey) and cfg.Has(pluginName) return false. Consider adding error handling for these cases.

  • 119-130: The function FromConfig still lacks error handling for when js.CreateOrUpdateStream fails. Consider adding error handling for this case.

  • 319-329: The function Pause still lacks error handling for when c.consumer.context.Stop() fails. Consider adding error handling for this case.

  • 405-419: The function Stop still lacks error handling for when c.stream.Purge(ctx) and c.jetstream.DeleteStream(ctx, c.streamID) fail. Consider adding error handling for these cases.

  • 443-458: The function requeue still lacks error handling for when c.jetstream.PublishMsg and c.stream.DeleteMsg fail. Consider adding error handling for these cases.

@cv65kr cv65kr requested a review from rustatian November 7, 2023 16:01
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 4

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 14f7aa2 and 81b1fe8.
Files ignored due to filter (5)
  • go.mod
  • go.sum
  • go.work.sum
  • tests/go.mod
  • tests/go.sum
Files selected for processing (3)
  • natsjobs/listener.go (6 hunks)
  • tests/jobs_nats_durability_test.go (1 hunks)
  • tests/jobs_nats_test.go (20 hunks)
Additional comments: 9
natsjobs/listener.go (6)
  • 14-43: The listenerInit function has been significantly updated to use the new Jetstream API. Ensure that the new consumer configuration and the use of the Consume method align with your application's requirements. Also, consider adding error handling for the uuid.NewString() function in case it fails to generate a unique ID.

  • 49-58: The error handling logic for message consumption has been updated. Ensure that the new logic aligns with your application's requirements. Also, consider adding a log statement or some form of notification if the ok check fails, as this could indicate a serious issue with the message channel.

  • 76-88: The context extraction and setting of queue and pipeline options have been updated. Ensure that these changes align with your application's requirements. Also, consider adding error handling for the c.unpack(m.Data(), item) function in case it fails to unpack the message data.

  • 96-101: The handling of the deleteAfterAck option has been updated. Ensure that these changes align with your application's requirements. Also, consider adding a log statement or some form of notification if the deleteAfterAck option is set to true, as this could indicate a significant change in message handling behavior.

  • 116-122: The handling of the deleteAfterAck option within the AutoAck block has been updated. Ensure that these changes align with your application's requirements. Also, consider adding a log statement or some form of notification if the deleteAfterAck option is set to true and the message deletion fails, as this could indicate a serious issue with the message handling logic.

  • 137-147: The injection of context into the headers and the handling of the stopCh channel have been updated. Ensure that these changes align with your application's requirements. Also, consider adding a log statement or some form of notification if the stopCh channel is closed, as this could indicate a significant change in the listener's state.

tests/jobs_nats_test.go (3)
  • 4-4: The context package is imported but not used in this hunk. Ensure it's used in the complete file.

  • 23-23: The jetstream package is imported but not used in this hunk. Ensure it's used in the complete file.

  • 1049-1074: The cleanupNats function is well implemented. It creates a new connection, initializes a new JetStream context, and deletes the specified streams. The error handling is also properly done.

Comment on lines 121 to 129

stopCh <- struct{}{}
wg.Wait()

t.Cleanup(func() {
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
t.Log(errc)
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup function is correctly added using t.Cleanup. However, it's important to check if the error returned by cleanupNats is not nil and handle it appropriately. Ignoring errors can lead to unexpected behavior or difficult-to-diagnose bugs.

	t.Cleanup(func() {
		errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
-		t.Log(errc)
+		if errc != nil {
+			t.Errorf("cleanupNats failed: %v", errc)
+		}
	})

Commitable suggestion

[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.

Suggested change
stopCh <- struct{}{}
wg.Wait()
t.Cleanup(func() {
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
t.Log(errc)
})
}
stopCh <- struct{}{}
wg.Wait()
t.Cleanup(func() {
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
if errc != nil {
t.Errorf("cleanupNats failed: %v", errc)
}
})
}

Comment on lines 123 to 126
t.Cleanup(func() {
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo-2")
t.Log(errc)
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup function is a good practice to ensure resources are released after the test. However, the error returned by cleanupNats is only logged and not asserted. Consider asserting the error to ensure the cleanup was successful.

- t.Log(errc)
+ require.NoError(t, errc)

Commitable suggestion

[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.

Suggested change
t.Cleanup(func() {
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo-2")
t.Log(errc)
})
t.Cleanup(func() {
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo-2")
require.NoError(t, errc)
})

@@ -202,7 +209,12 @@ func TestNATSRemoveAllPQ(t *testing.T) {
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 200, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("nast disconnected").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("nats disconnected").Len())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion message "nats disconnected" is not clear. Consider providing more context in the assertion message.

Comment on lines +736 to +748
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "foo-raw",
Subjects: []string{"default-raw"},
Subjects: []string{"default-raw.*"},
})
if err != nil {
t.Fatal(err)
}
}

_, err = js.Publish("default-raw", []byte("foo-barrrrrr-bazzzzz"))
_, err = js.PublishMsg(ctx, &nats.Msg{
Data: []byte("foo-barrrrrr-bazzzzz"),
Subject: "default-raw.*",
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream is created or updated and a message is published, but there is no error handling for the case where the stream creation or update fails. Consider adding error handling for these cases.

+	if si == nil {
+		_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
+			Name:     "foo-raw",
+			Subjects: []string{"default-raw.*"},
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	_, err = js.PublishMsg(ctx, &nats.Msg{
+		Data:    []byte("foo-barrrrrr-bazzzzz"),
+		Subject: "default-raw.*",
+	})
+	require.NoError(t, err)

Commitable suggestion

[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.

Suggested change
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "foo-raw",
Subjects: []string{"default-raw"},
Subjects: []string{"default-raw.*"},
})
if err != nil {
t.Fatal(err)
}
}
_, err = js.Publish("default-raw", []byte("foo-barrrrrr-bazzzzz"))
_, err = js.PublishMsg(ctx, &nats.Msg{
Data: []byte("foo-barrrrrr-bazzzzz"),
Subject: "default-raw.*",
})
if si == nil {
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "foo-raw",
Subjects: []string{"default-raw.*"},
})
if err != nil {
t.Fatal(err)
}
}
_, err = js.PublishMsg(ctx, &nats.Msg{
Data: []byte("foo-barrrrrr-bazzzzz"),
Subject: "default-raw.*",
})
require.NoError(t, err)

Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 2

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 81b1fe8 and 5b1cb01.
Files selected for processing (3)
  • tests/helpers/helpers.go (2 hunks)
  • tests/jobs_nats_durability_test.go (1 hunks)
  • tests/jobs_nats_test.go (19 hunks)
Additional comments: 6
tests/helpers/helpers.go (2)
  • 2-17: The new imports are correctly added and are necessary for the new function CleanupNats.

  • 199-224: The new function CleanupNats is correctly implemented. It connects to a NATS server, creates a JetStream context, and deletes specified streams. However, it's important to ensure that the streams to be deleted are not being used elsewhere in the codebase or by other services.

func CleanupNats(address string, stream ...string) error {
	conn, err := nats.Connect(address,
		nats.NoEcho(),
		nats.Timeout(time.Minute),
		nats.MaxReconnects(-1),
		nats.PingInterval(time.Second*10),
		nats.ReconnectWait(time.Second),
	)
	if err != nil {
		return err
	}

	js, err := jetstream.New(conn)
	if err != nil {
		return err
	}

	for _, s := range stream {
		err = js.DeleteStream(context.Background(), s)
		if err != nil {
			return err
		}
	}

	return nil
}
tests/jobs_nats_test.go (4)
  • 4-4: The context package is imported but not used in the hunks provided. Ensure it's used elsewhere in the code.

  • 23-23: The jetstream package is imported but not used in the hunks provided. Ensure it's used elsewhere in the code.

  • 212-212: The assertion message "nats disconnected" is not clear. Consider providing more context in the assertion message.

  • 736-748: The stream is created or updated and a message is published, but there is no error handling for the case where the stream creation or update fails. Consider adding error handling for these cases.

Comment on lines 121 to 129

stopCh <- struct{}{}
wg.Wait()

t.Cleanup(func() {
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
t.Log(errc)
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup function is correctly added using t.Cleanup. However, the error returned by helpers.CleanupNats is logged without checking if it's nil. This could lead to logging nil errors which might be confusing. Consider checking if errc is not nil before logging it.

	t.Cleanup(func() {
		errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
-		t.Log(errc)
+		if errc != nil {
+			t.Log(errc)
+		}
	})

Commitable suggestion

[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.

Suggested change
stopCh <- struct{}{}
wg.Wait()
t.Cleanup(func() {
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
t.Log(errc)
})
}
stopCh <- struct{}{}
wg.Wait()
t.Cleanup(func() {
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
if errc != nil {
t.Log(errc)
}
})
}

Comment on lines +1022 to +1023
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-otel")
t.Log(errc)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup function is a good practice to ensure resources are released after the test. However, the error returned by helpers.CleanupNats is logged but not asserted. Consider asserting the error to ensure the cleanup is successful.

- t.Log(errc)
+ require.NoError(t, errc)

Commitable suggestion

[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.

Suggested change
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-otel")
t.Log(errc)
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-otel")
require.NoError(t, errc)

Copy link
Member

@rustatian rustatian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @cv65kr 👍

@rustatian rustatian merged commit a9abca3 into master Nov 8, 2023
8 checks passed
@rustatian rustatian deleted the feat/new-api branch November 8, 2023 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[🧹 CHORE]: Update NATS JetStream client
2 participants