-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Jetstream API #98
Conversation
WalkthroughThe changes primarily involve refactoring the Changes
Poem
TipsChat with CodeRabbit Bot (
|
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 4
Configuration used: CodeRabbit UI
Files ignored due to filter (5)
- go.mod
- go.sum
- go.work.sum
- tests/go.sum
- tests/php_test_files/composer.lock
Files selected for processing (14)
- natsjobs/config.go (3 hunks)
- natsjobs/driver.go (13 hunks)
- natsjobs/item.go (5 hunks)
- natsjobs/listener.go (6 hunks)
- natsjobs/unpack.go (1 hunks)
- tests/configs/.rr-nats-durability-redial.yaml (2 hunks)
- tests/configs/.rr-nats-init-v27-br.yaml (2 hunks)
- tests/configs/.rr-nats-init-v27.yaml (1 hunks)
- tests/configs/.rr-nats-init.yaml (2 hunks)
- tests/configs/.rr-nats-otel.yaml (1 hunks)
- tests/configs/.rr-nats-pq.yaml (2 hunks)
- tests/configs/.rr-nats-raw.yaml (1 hunks)
- tests/configs/.rr-no-global.yaml (1 hunks)
- tests/jobs_nats_test.go (11 hunks)
Additional comments: 46
natsjobs/unpack.go (1)
- 22-26: The
Queue
field of theOptions
struct has been changed fromc.stream
toc.streamID
. Ensure that thestreamID
is correctly initialized and used throughout the codebase.tests/configs/.rr-nats-otel.yaml (1)
- 40-47: Ensure that the wildcard pattern in the
subject
field and the newstream_id
field are correctly configured and compatible with the rest of your NATS setup. Also, verify that thedeliver_new
field is expected to be a string and not a boolean.tests/configs/.rr-nats-init.yaml (2)
31-38: The
subject
field has been updated to use a wildcard pattern, and thestream
field has been replaced withstream_id
. Ensure that these changes are compatible with the rest of the system and that the wildcard pattern in thesubject
field matches the expected subjects. Also, verify that thestream_id
is correctly set.41-48: Similar to the previous comment, ensure that the changes to the
subject
andstream_id
fields are compatible with the rest of the system. Also, verify that the wildcard pattern in thesubject
field matches the expected subjects and that thestream_id
is correctly set.tests/configs/.rr-nats-raw.yaml (1)
- 31-38: The configuration for the NATS driver has been updated. Ensure that the new wildcard pattern in the
subject
field and the renamedstream_id
field are compatible with the rest of the system. Also, verify that the new configuration values are correctly used in the codebase.tests/configs/.rr-nats-pq.yaml (2)
29-36: The
subject
field has been updated to include a wildcard character, and thestream
field has been replaced withstream_id
. Ensure that these changes are reflected in the corresponding code and that the wildcard character in thesubject
field does not introduce any unexpected behavior. Also, verify that thestream_id
"foo-pq" is correctly configured in your Jetstream setup.39-46: Similar to the previous comment, ensure that the changes in the
subject
andstream_id
fields are reflected in the corresponding code and that the wildcard character in thesubject
field does not introduce any unexpected behavior. Also, verify that thestream_id
"foo-2-pq" is correctly configured in your Jetstream setup.tests/configs/.rr-nats-init-v27-br.yaml (2)
31-38: The
subject
field now includes a wildcard character, and thestream
field has been replaced withstream_id
. Ensure that these changes are compatible with the rest of the codebase and that the wildcard character in thesubject
field does not unintentionally match unwanted subjects.41-48: The
subject
field now includes a wildcard character, and thestream
field has been replaced withstream_id
. Thepriority
value has been updated from 1 to 2. Ensure that these changes are compatible with the rest of the codebase and that the wildcard character in thesubject
field does not unintentionally match unwanted subjects. Also, verify that the change inpriority
value does not affect the order of job execution in a way that could cause issues.tests/configs/.rr-nats-durability-redial.yaml (2)
31-38: The
subject
field has been updated to use wildcard characters, and thestream
field has been replaced withstream_id
. Ensure that these changes are compatible with the rest of the system and that the wildcard characters in thesubject
field do not unintentionally match unwanted subjects.42-49: The same changes as in the previous hunk have been made here. Again, verify that these changes are compatible with the rest of the system and that the wildcard characters in the
subject
field do not unintentionally match unwanted subjects.tests/configs/.rr-nats-init-v27.yaml (2)
31-37: The subject field has been updated to use wildcard characters, and the stream field has been renamed to stream_id. Ensure that these changes are compatible with the rest of the codebase and that the wildcard characters in the subject field do not unintentionally match unwanted subjects.
39-46: Similar to the previous comment, ensure that the changes to the subject and stream_id fields are compatible with the rest of the codebase and that the wildcard characters in the subject field do not unintentionally match unwanted subjects.
natsjobs/config.go (3)
8-12: The renaming of
pipeStream
topipeStreamID
is consistent with the changes in theconfig
struct. This change should be reflected whereverpipeStream
was used.24-24: The renaming of
Stream
toStreamID
in theconfig
struct is a breaking change. Ensure that all references toStream
have been updated toStreamID
throughout the codebase.45-47: The default value for
StreamID
has been updated. Ensure that this new default value doesn't conflict with any existing stream IDs in your NATS setup.tests/configs/.rr-no-global.yaml (2)
30-36: The
subject
field has been updated to use a wildcard character, and thestream
field has been replaced withstream_id
. Ensure that these changes are compatible with the rest of the codebase and the NATS Jetstream API.38-45: The
subject
field has been updated to use a wildcard character, and thestream
field has been replaced withstream_id
. Thepriority
value has been updated from 1 to 2. Ensure that these changes are compatible with the rest of the codebase and the NATS Jetstream API.natsjobs/listener.go (6)
14-40: The
listenerInit
function has been significantly modified to handle JetStream messages. It now generates a unique ID usinguuid.NewString()
, creates a JetStream consumer, and sets up a message consumption context. The function also introduces a newconsumer
struct to manage the JetStream consumer. Ensure that the new logic aligns with the intended message consumption and handling behavior.46-55: The
listenerStart
function now handles JetStream messages. It checks if the message consumption channel is closed and handles errors in retrieving message metadata. Ensure that the error handling logic is correct and that the function behaves as expected when the channel is closed or when an error occurs in retrieving message metadata.75-85: The function unpacks the message data into an
Item
struct and sets up a tracing span. It also sets the queue and pipeline options for the item. Ensure that the unpacking and tracing logic is correct and that the queue and pipeline options are set as intended.92-98: The function checks if the
deleteAfterAck
option is enabled and sets the relevant options for the item. Ensure that the logic for handling thedeleteAfterAck
option is correct.113-119: The function handles the deletion of messages after acknowledgement. It checks if the
deleteAfterAck
option is enabled and deletes the message if it is. Ensure that the message deletion logic is correct and that messages are deleted as intended when thedeleteAfterAck
option is enabled.134-144: The function injects the tracing context into the item headers and inserts the item into the queue. It also handles the stopping of the consumer. Ensure that the context injection, item insertion, and consumer stopping logic are correct.
natsjobs/item.go (3)
1-12: The import statements have been updated to reflect the new Jetstream API. Ensure that the new import paths are correct and that the old NATS API is no longer being used anywhere in the codebase.
34-49: The type of the
sub
field in theOptions
struct has been changed fromnats.JetStreamContext
tojetstream.Stream
. This change is consistent with the switch to the new Jetstream API. Make sure that all uses of this field in the codebase have been updated to match the new type.144-150: The error handling logic here seems to be correct. If the
requeueFn
function returns an error and the message was not auto acknowledged, thenak
function is called. If thenak
function also returns an error, both errors are joined and returned. This ensures that all errors are properly handled and reported.natsjobs/driver.go (11)
2-14: Imports have been updated to include the new Jetstream API and remove the old NATS API. Ensure that all dependencies are correctly managed and updated.
46-65: The
Driver
struct has been updated to include new fields related to the Jetstream API. Ensure that these fields are correctly initialized and used throughout the code.67-71: A new
consumer
struct has been introduced. Ensure that this struct is correctly used and managed throughout the code.116-132: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [73-130]
The
FromConfig
function has been updated to use the new Jetstream API. Ensure that the function correctly initializes theDriver
struct and handles errors.
194-211: The
FromPipeline
function has been updated to use the new Jetstream API. Ensure that the function correctly initializes theDriver
struct and handles errors.256-265: The
Push
function has been updated to use the new Jetstream API. Ensure that the function correctly publishes messages and handles errors.319-329: The
Pause
function has been updated to use the new Jetstream API. Ensure that the function correctly stops the consumer and handles errors.375-391: The
State
function has been updated to use the new Jetstream API. Ensure that the function correctly retrieves the state of the consumer and handles errors.405-419: The
Stop
function has been updated to use the new Jetstream API. Ensure that the function correctly stops the consumer, deletes the stream if necessary, and handles errors.443-458: The
requeue
function has been updated to use the new Jetstream API. Ensure that the function correctly requeues messages and handles errors.467-476: The
disconnectHandler
function has been updated to handle disconnections from the Jetstream API. Ensure that the function correctly logs disconnections and handles errors.tests/jobs_nats_test.go (8)
4-4: The
context
package is imported but not used in the hunks provided. Ensure it's used elsewhere in the file.23-23: The
github.com/nats-io/nats.go/jetstream
package is imported but not used in the hunks provided. Ensure it's used elsewhere in the file.516-516: The use of wildcard characters in pipeline and stream names can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.
596-596: The use of wildcard characters in pipeline and stream names can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.
686-688: The JetStream context is created but not used in the hunks provided. Ensure it's used elsewhere in the file.
827-827: The use of wildcard characters in pipeline and stream names can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.
856-856: The queue name is asserted with a wildcard character. This can lead to unexpected behavior if not handled correctly. Ensure that the wildcard character is being used appropriately in the context of your application.
991-991: The
stream
field has been replaced withstream_id
. Ensure that all references to this field in the codebase have been updated to match the new field name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 8
Configuration used: CodeRabbit UI
Files selected for processing (11)
- natsjobs/config.go (2 hunks)
- natsjobs/driver.go (13 hunks)
- tests/configs/.rr-nats-durability-redial.yaml (2 hunks)
- tests/configs/.rr-nats-init-v27-br.yaml (2 hunks)
- tests/configs/.rr-nats-init-v27.yaml (2 hunks)
- tests/configs/.rr-nats-init.yaml (2 hunks)
- tests/configs/.rr-nats-otel.yaml (1 hunks)
- tests/configs/.rr-nats-pq.yaml (2 hunks)
- tests/configs/.rr-nats-raw.yaml (1 hunks)
- tests/configs/.rr-no-global.yaml (2 hunks)
- tests/jobs_nats_test.go (11 hunks)
Files skipped from review due to trivial changes (1)
- tests/configs/.rr-nats-durability-redial.yaml
Additional comments: 25
natsjobs/config.go (2)
21-27: The
Stream
field has been renamed toStreamID
. Ensure that all references to this field in the codebase have been updated accordingly. Also, verify that the external systems that interact with this configuration are compatible with this change.45-47: The default value for
StreamID
has been set to "default-stream". This change aligns with the previous comment made byrustatian
. Ensure that this default value is acceptable in all use cases.tests/configs/.rr-nats-raw.yaml (1)
- 31-37: The change in the
subject
field from "default-raw" to "default-raw." will allow the job to subscribe to subjects matching the pattern "default-raw." instead of only "default-raw". This change will likely affect the subscription behavior of the job. Ensure that this change is intended and that it does not cause any unintended side effects in the subscription behavior of the job. Also, thestream
field has been replaced withstream_id
. Make sure that all references to thestream
field in the codebase have been updated tostream_id
.tests/configs/.rr-no-global.yaml (2)
33-33: The subject field has been changed from "default" to "default.*". This change likely affects the message routing or filtering behavior within the system. Ensure that this change is intentional and that it does not break any existing functionality.
42-42: The subject field has been changed from "default" to "default.*". This change likely affects the message routing or filtering behavior within the system. Ensure that this change is intentional and that it does not break any existing functionality.
tests/configs/.rr-nats-init-v27.yaml (2)
- 31-37: The subject field has been changed to include a wildcard character. This change will affect the subscription behavior of the job. Ensure that this change is intended and that the wildcard character is correctly handled in the message subscription logic.
- subject: "default-3" + subject: "default-3.*"
- 40-46: The subject field has been changed to include a wildcard character. This change will affect the subscription behavior of the job. Ensure that this change is intended and that the wildcard character is correctly handled in the message subscription logic.
- subject: "default-4" + subject: "default-4.*"tests/configs/.rr-nats-init.yaml (2)
31-37: The
subject
field has been changed to "xxxxxxx.*". This indicates that the job is now subscribed to all subjects that start with "xxxxxxx". Ensure that this is the intended behavior and that the wildcard character is being used correctly.41-47: The
subject
field has been changed to "default-2.*". This indicates that the job is now subscribed to all subjects that start with "default-2". Ensure that this is the intended behavior and that the wildcard character is being used correctly.tests/configs/.rr-nats-otel.yaml (1)
- 40-46: The change in the
subject
field from "default-otel" to "default-otel." will allow the NATS driver to subscribe to subjects matching "default-otel." instead of just "default-otel". Ensure that this change is intended and that the subjects of the messages being published match this new pattern.tests/configs/.rr-nats-init-v27-br.yaml (2)
31-37: The
subject
field has been updated to include a wildcard character, which will allow the job to match any subjects that start with "default-5.". Ensure that this change is intentional and that it won't lead to unexpected behavior due to the broader scope of subject matching. Thestream
field has been moved in the configuration. Verify that this reordering doesn't affect the functionality.41-47: The
subject
field has been updated to include a wildcard character, which will allow the job to match any subjects that start with "default-6.". Ensure that this change is intentional and that it won't lead to unexpected behavior due to the broader scope of subject matching.tests/configs/.rr-nats-pq.yaml (2)
29-35: The subject has been changed to "default-pq.*" which means it will match any subject that starts with "default-pq". Ensure that this is the intended behavior. Also, the "stream" field has been removed and replaced with "stream_id". Make sure that all references to the "stream" field in the codebase have been updated accordingly. The "delete_after_ack" field has been set to true, which means messages will be deleted after acknowledgment. Verify that this is the desired behavior.
39-45: The subject has been changed to "default-2-pq.*" which means it will match any subject that starts with "default-2-pq". Ensure that this is the intended behavior. Also, the "stream" field has been removed and replaced with "stream_id". Make sure that all references to the "stream" field in the codebase have been updated accordingly. The "delete_after_ack" field has been set to true, which means messages will be deleted after acknowledgment. Verify that this is the desired behavior.
tests/jobs_nats_test.go (11)
1-7: The
context
package is imported but not used in this hunk. Ensure it's used in the code not shown in this hunk.20-26: The
github.com/nats-io/nats.go/jetstream
package is imported but not used in this hunk. Ensure it's used in the code not shown in this hunk.207-207: The assertion message "nats disconnected" is now corrected. Good job.
516-519: Ensure that the wildcard character in the subject "default-10.*" is handled correctly in the
declareNATSPipe
function.596-599: Ensure that the wildcard character in the subject "default-11.*" is handled correctly in the
declareNATSPipe
function.683-695: The
jetstream.New
function is used to create a JetStream context. Ensure that theconn
object passed to this function is correctly initialized and connected to the NATS server.702-715: The
jetstream.StreamConfig
is used instead ofnats.StreamConfig
in stream creation. Also,js.CreateOrUpdateStream
is used instead ofjs.AddStream
to create or update a stream. Ensure that these changes are compatible with the rest of the codebase.827-830: Ensure that the wildcard character in the subject "default-13.*" is handled correctly in the
declareNATSPipe
function.840-843: Ensure that the queue name "default-13.*" is correctly handled in the
out
object.856-859: Ensure that the queue name "default-13.*" is correctly handled in the
out
object.988-994: The
stream
field is replaced withstream_id
. Ensure that this change is compatible with the rest of the codebase.
@coderabbitai pause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- natsjobs/driver.go (13 hunks)
Additional comments: 6
natsjobs/driver.go (6)
30-35: The
Configurer
interface is still not being used anywhere else in the code. If it's not used elsewhere, consider removing it.73-75: The function
FromConfig
still lacks error handling for whencfg.Has(configKey)
andcfg.Has(pluginName)
return false. Consider adding error handling for these cases.119-130: The function
FromConfig
still lacks error handling for whenjs.CreateOrUpdateStream
fails. Consider adding error handling for this case.319-329: The function
Pause
still lacks error handling for whenc.consumer.context.Stop()
fails. Consider adding error handling for this case.405-419: The function
Stop
still lacks error handling for whenc.stream.Purge(ctx)
andc.jetstream.DeleteStream(ctx, c.streamID)
fail. Consider adding error handling for these cases.443-458: The function
requeue
still lacks error handling for whenc.jetstream.PublishMsg
andc.stream.DeleteMsg
fail. Consider adding error handling for these cases.
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 4
Configuration used: CodeRabbit UI
Files ignored due to filter (5)
- go.mod
- go.sum
- go.work.sum
- tests/go.mod
- tests/go.sum
Files selected for processing (3)
- natsjobs/listener.go (6 hunks)
- tests/jobs_nats_durability_test.go (1 hunks)
- tests/jobs_nats_test.go (20 hunks)
Additional comments: 9
natsjobs/listener.go (6)
14-43: The
listenerInit
function has been significantly updated to use the new Jetstream API. Ensure that the new consumer configuration and the use of theConsume
method align with your application's requirements. Also, consider adding error handling for theuuid.NewString()
function in case it fails to generate a unique ID.49-58: The error handling logic for message consumption has been updated. Ensure that the new logic aligns with your application's requirements. Also, consider adding a log statement or some form of notification if the
ok
check fails, as this could indicate a serious issue with the message channel.76-88: The context extraction and setting of queue and pipeline options have been updated. Ensure that these changes align with your application's requirements. Also, consider adding error handling for the
c.unpack(m.Data(), item)
function in case it fails to unpack the message data.96-101: The handling of the
deleteAfterAck
option has been updated. Ensure that these changes align with your application's requirements. Also, consider adding a log statement or some form of notification if thedeleteAfterAck
option is set to true, as this could indicate a significant change in message handling behavior.116-122: The handling of the
deleteAfterAck
option within theAutoAck
block has been updated. Ensure that these changes align with your application's requirements. Also, consider adding a log statement or some form of notification if thedeleteAfterAck
option is set to true and the message deletion fails, as this could indicate a serious issue with the message handling logic.137-147: The injection of context into the headers and the handling of the
stopCh
channel have been updated. Ensure that these changes align with your application's requirements. Also, consider adding a log statement or some form of notification if thestopCh
channel is closed, as this could indicate a significant change in the listener's state.tests/jobs_nats_test.go (3)
4-4: The
context
package is imported but not used in this hunk. Ensure it's used in the complete file.23-23: The
jetstream
package is imported but not used in this hunk. Ensure it's used in the complete file.1049-1074: The
cleanupNats
function is well implemented. It creates a new connection, initializes a new JetStream context, and deletes the specified streams. The error handling is also properly done.
tests/jobs_nats_durability_test.go
Outdated
|
||
stopCh <- struct{}{} | ||
wg.Wait() | ||
|
||
t.Cleanup(func() { | ||
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2") | ||
t.Log(errc) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup function is correctly added using t.Cleanup
. However, it's important to check if the error returned by cleanupNats
is not nil and handle it appropriately. Ignoring errors can lead to unexpected behavior or difficult-to-diagnose bugs.
t.Cleanup(func() {
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
- t.Log(errc)
+ if errc != nil {
+ t.Errorf("cleanupNats failed: %v", errc)
+ }
})
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
stopCh <- struct{}{} | |
wg.Wait() | |
t.Cleanup(func() { | |
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2") | |
t.Log(errc) | |
}) | |
} | |
stopCh <- struct{}{} | |
wg.Wait() | |
t.Cleanup(func() { | |
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo2") | |
if errc != nil { | |
t.Errorf("cleanupNats failed: %v", errc) | |
} | |
}) | |
} |
tests/jobs_nats_test.go
Outdated
t.Cleanup(func() { | ||
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo-2") | ||
t.Log(errc) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup function is a good practice to ensure resources are released after the test. However, the error returned by cleanupNats
is only logged and not asserted. Consider asserting the error to ensure the cleanup was successful.
- t.Log(errc)
+ require.NoError(t, errc)
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
t.Cleanup(func() { | |
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo-2") | |
t.Log(errc) | |
}) | |
t.Cleanup(func() { | |
errc := cleanupNats("nats://127.0.0.1:4222", "foo", "foo-2") | |
require.NoError(t, errc) | |
}) |
@@ -202,7 +209,12 @@ func TestNATSRemoveAllPQ(t *testing.T) { | |||
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) | |||
assert.Equal(t, 200, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) | |||
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len()) | |||
assert.Equal(t, 2, oLogger.FilterMessageSnippet("nast disconnected").Len()) | |||
assert.Equal(t, 2, oLogger.FilterMessageSnippet("nats disconnected").Len()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion message "nats disconnected" is not clear. Consider providing more context in the assertion message.
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ | ||
Name: "foo-raw", | ||
Subjects: []string{"default-raw"}, | ||
Subjects: []string{"default-raw.*"}, | ||
}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
_, err = js.Publish("default-raw", []byte("foo-barrrrrr-bazzzzz")) | ||
_, err = js.PublishMsg(ctx, &nats.Msg{ | ||
Data: []byte("foo-barrrrrr-bazzzzz"), | ||
Subject: "default-raw.*", | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream is created or updated and a message is published, but there is no error handling for the case where the stream creation or update fails. Consider adding error handling for these cases.
+ if si == nil {
+ _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
+ Name: "foo-raw",
+ Subjects: []string{"default-raw.*"},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ _, err = js.PublishMsg(ctx, &nats.Msg{
+ Data: []byte("foo-barrrrrr-bazzzzz"),
+ Subject: "default-raw.*",
+ })
+ require.NoError(t, err)
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ | |
Name: "foo-raw", | |
Subjects: []string{"default-raw"}, | |
Subjects: []string{"default-raw.*"}, | |
}) | |
if err != nil { | |
t.Fatal(err) | |
} | |
} | |
_, err = js.Publish("default-raw", []byte("foo-barrrrrr-bazzzzz")) | |
_, err = js.PublishMsg(ctx, &nats.Msg{ | |
Data: []byte("foo-barrrrrr-bazzzzz"), | |
Subject: "default-raw.*", | |
}) | |
if si == nil { | |
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ | |
Name: "foo-raw", | |
Subjects: []string{"default-raw.*"}, | |
}) | |
if err != nil { | |
t.Fatal(err) | |
} | |
} | |
_, err = js.PublishMsg(ctx, &nats.Msg{ | |
Data: []byte("foo-barrrrrr-bazzzzz"), | |
Subject: "default-raw.*", | |
}) | |
require.NoError(t, err) |
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Configuration used: CodeRabbit UI
Files selected for processing (3)
- tests/helpers/helpers.go (2 hunks)
- tests/jobs_nats_durability_test.go (1 hunks)
- tests/jobs_nats_test.go (19 hunks)
Additional comments: 6
tests/helpers/helpers.go (2)
2-17: The new imports are correctly added and are necessary for the new function
CleanupNats
.199-224: The new function
CleanupNats
is correctly implemented. It connects to a NATS server, creates a JetStream context, and deletes specified streams. However, it's important to ensure that the streams to be deleted are not being used elsewhere in the codebase or by other services.func CleanupNats(address string, stream ...string) error { conn, err := nats.Connect(address, nats.NoEcho(), nats.Timeout(time.Minute), nats.MaxReconnects(-1), nats.PingInterval(time.Second*10), nats.ReconnectWait(time.Second), ) if err != nil { return err } js, err := jetstream.New(conn) if err != nil { return err } for _, s := range stream { err = js.DeleteStream(context.Background(), s) if err != nil { return err } } return nil }tests/jobs_nats_test.go (4)
4-4: The context package is imported but not used in the hunks provided. Ensure it's used elsewhere in the code.
23-23: The jetstream package is imported but not used in the hunks provided. Ensure it's used elsewhere in the code.
212-212: The assertion message "nats disconnected" is not clear. Consider providing more context in the assertion message.
736-748: The stream is created or updated and a message is published, but there is no error handling for the case where the stream creation or update fails. Consider adding error handling for these cases.
|
||
stopCh <- struct{}{} | ||
wg.Wait() | ||
|
||
t.Cleanup(func() { | ||
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2") | ||
t.Log(errc) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup function is correctly added using t.Cleanup
. However, the error returned by helpers.CleanupNats
is logged without checking if it's nil
. This could lead to logging nil
errors which might be confusing. Consider checking if errc
is not nil
before logging it.
t.Cleanup(func() {
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2")
- t.Log(errc)
+ if errc != nil {
+ t.Log(errc)
+ }
})
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
stopCh <- struct{}{} | |
wg.Wait() | |
t.Cleanup(func() { | |
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2") | |
t.Log(errc) | |
}) | |
} | |
stopCh <- struct{}{} | |
wg.Wait() | |
t.Cleanup(func() { | |
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo", "foo2") | |
if errc != nil { | |
t.Log(errc) | |
} | |
}) | |
} |
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-otel") | ||
t.Log(errc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup function is a good practice to ensure resources are released after the test. However, the error returned by helpers.CleanupNats
is logged but not asserted. Consider asserting the error to ensure the cleanup is successful.
- t.Log(errc)
+ require.NoError(t, errc)
Commitable suggestion
[!IMPORTANT]
Carefully review the code before committing. Make sure it correctly replaces the highlighted code, has no missing lines and indentaion issues.
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-otel") | |
t.Log(errc) | |
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "foo-otel") | |
require.NoError(t, errc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @cv65kr 👍
Reason for This PR
closes: roadrunner-server/roadrunner#1574
Description of Changes
License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.]
[Reviewer TODO: Verify that these criteria are met. Request changes if not]
git commit -s
).CHANGELOG.md
.Summary by CodeRabbit
Summary by CodeRabbit
New Features
Search
class with methods for fetching and displaying results.Hero
component and aSearch
component to theApp
component.styles.css
file.Refactor
Chores
Tests
Documentation