Skip to content
This repository has been archived by the owner on May 11, 2023. It is now read-only.

Commit

Permalink
Better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
zhulik committed Feb 9, 2023
1 parent b8dd946 commit 9531eb0
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 16 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ RUN bundle install

FROM base

COPY --from=builder /mnt/vendor .
COPY --from=builder /mnt .

ADD . .

USER app

CMD ["bundle", "exec", "./exe/nats_streamer", "/config.yaml", "$PATH"]
CMD ["bundle", "exec", "./exe/nats_streamer", "/config.yaml"]
5 changes: 2 additions & 3 deletions exe/nats_streamer
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

require_relative "../lib/nats_streamer"

CONFIG = YAML.load(File.read(ARGV[0]), symbolize_names: true)

Sync do
NatsStreamer::Application.new(url: "nats://127.0.0.1:4222", config: NatsStreamer::Config.new(CONFIG)).run
config = NatsStreamer::Config.new(YAML.load(File.read(ARGV[0]), symbolize_names: true))
NatsStreamer::Application.new(config:).run
end
9 changes: 5 additions & 4 deletions lib/nats_streamer/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ class NatsStreamer::Application
include NatsStreamer::Logger
include Memery

option :url
option :config

def run
Expand All @@ -19,7 +18,9 @@ def run

private

memoize def client = NATS.connect(url)

def jsm = client.jsm
memoize def jsm
NATS.connect(config.server_url).tap do
info { "Connected to #{config.server_url}" }
end.jsm
end
end
2 changes: 2 additions & 0 deletions lib/nats_streamer/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class Subscriber < Dry::Struct
attribute :params, Types::Hash.map(Types::Coercible::String, Types::String).default({}.freeze)
end

attribute :server_url, Types::Coercible::String

attribute :streams, Types::Hash.map(
Types::Coercible::String,
Types::Hash.map(
Expand Down
10 changes: 10 additions & 0 deletions lib/nats_streamer/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

class NatsStreamer::Connection
def self.build(url)
Faraday.new(url) do |f|
f.request :json
f.response :raise_error
end
end
end
14 changes: 7 additions & 7 deletions lib/nats_streamer/deliverer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ class NatsStreamer::Deliverer
# TODO: how to propely unsubscribe?
option :subscriber

def deliver(**) = connection.post(".", **)

memoize def connection
Faraday.new(subscriber.url) do |f|
f.request :json
f.response :raise_error
end
def deliver(**)
info_measure(-> { connection.post(".", **) }) { "Event delivered to #{subscriber.name}: #{_1.round(2)}s" }
rescue StandardError => e
warn { "Event failed to be delivered to #{subscriber.name}" }
warn { e }
end

memoize def connection = NatsStreamer::Connection.build(subscriber.url)
end
2 changes: 2 additions & 0 deletions lib/nats_streamer/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class NatsStreamer::Listener
option :subscriber

def run
info { "Subscribing to #{subject}" }

pull do |msg|
Async { handle_message(msg) }
end
Expand Down
11 changes: 11 additions & 0 deletions lib/nats_streamer/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,16 @@ module NatsStreamer::Logger
define_method(name) do |msg = nil, &block|
Console.logger.public_send(name, self, msg, &block)
end

define_method("#{name}_measure") do |lambda, msg = nil, &block|
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC).then do |start|
lambda.call
Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
end

return Console.logger.public_send(name, self, msg) { block.call(elapsed) } unless block.nil?

Console.logger.public_send(name, self, msg)
end
end
end
4 changes: 4 additions & 0 deletions lib/nats_streamer/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ def run
end

def create_stream!
info { "Creating stream '#{name}' with subjects #{subject_names}" }

jsm.add_stream(name:, subjects: subject_names)
rescue NATS::JetStream::Error::BadRequest
warn { "Configuration changed, attempting deleting and recreating stream #{name}" }

jsm.delete_stream(name)
retry
end
Expand Down

0 comments on commit 9531eb0

Please sign in to comment.