Skip to content
This repository has been archived by the owner on May 11, 2023. It is now read-only.

Commit

Permalink
Integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhulik committed Feb 10, 2023
1 parent 616ecaa commit 160a2ea
Show file tree
Hide file tree
Showing 20 changed files with 314 additions and 84 deletions.
3 changes: 2 additions & 1 deletion .rspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--format documentation
--format progress
--order random
--color
--require spec_helper
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ Style/WordArray:

Style/ClassAndModuleChildren:
EnforcedStyle: compact

RSpec/MultipleMemoizedHelpers:
Max: 10
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
source "https://rubygems.org"

gem "async"
gem "async-http"
gem "nats-pure"

gem "faraday"
Expand Down
21 changes: 21 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ GEM
console (~> 1.10)
io-event (~> 1.1)
timers (~> 4.1)
async-http (0.60.1)
async (>= 1.25)
async-io (>= 1.28)
async-pool (>= 0.2)
protocol-http (~> 0.24.0)
protocol-http1 (~> 0.15.0)
protocol-http2 (~> 0.15.0)
traces (>= 0.8.0)
async-io (1.34.3)
async
async-pool (0.3.12)
async (>= 1.25)
async-rspec (1.16.1)
rspec (~> 3.0)
rspec-files (~> 1.0)
Expand Down Expand Up @@ -70,6 +82,13 @@ GEM
parallel (1.22.1)
parser (3.2.0.0)
ast (~> 2.4.1)
protocol-hpack (1.4.2)
protocol-http (0.24.0)
protocol-http1 (0.15.0)
protocol-http (~> 0.22)
protocol-http2 (0.15.0)
protocol-hpack (~> 1.4)
protocol-http (~> 0.18)
public_suffix (5.0.1)
racc (1.6.2)
rainbow (3.1.1)
Expand Down Expand Up @@ -141,6 +160,7 @@ GEM
thor (1.2.1)
tilt (2.0.11)
timers (4.3.5)
traces (0.8.0)
unicode-display_width (2.4.2)
webmock (3.18.1)
addressable (>= 2.8.0)
Expand All @@ -156,6 +176,7 @@ PLATFORMS

DEPENDENCIES
async
async-http
async-rspec
dry-initializer
dry-struct
Expand Down
3 changes: 3 additions & 0 deletions lib/nats_streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
require "zeitwerk"

require "async"
require "async/barrier"
require "async/http"
require "async/notification"

require "nats/client"
require "memery"
require "faraday"
Expand Down
19 changes: 19 additions & 0 deletions lib/nats_streamer/active_tasks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

class NatsStreamer::ActiveTasks
include NatsStreamer::Helpers

def async
tasks << Async do |task|
task.yield
yield(task)
tasks.delete(task)
end
end

def wait = tasks.each(&:wait)

private

memoize def tasks = []
end
46 changes: 35 additions & 11 deletions lib/nats_streamer/application.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,50 @@
# frozen_string_literal: true

class NatsStreamer::Application
extend Dry::Initializer
include NatsStreamer::Helpers

include NatsStreamer::Logger
include Memery

option :config
option :config, type: T.Instance(NatsStreamer::Config)

def run
config.streams.each do |name, subjects|
NatsStreamer::Stream.new(jsm:, name:, subjects:).run
end
metrics_store.run
metrics_server.run
wait
ensure
stop
wait

client.close
end

def stop
metrics_server.stop
streams.each(&:stop)
metrics_store.stop
end

Async::Notification.new.wait # wait forever
def wait
metrics_server.wait
streams.each(&:wait)
metrics_store.wait
end

private

memoize def jsm
memoize def jsm = client.jsm

# TODO: move port to config
memoize def metrics_server = NatsStreamer::Metrics::Server.new(port: 9294, metrics_store:)
memoize def metrics_store = NatsStreamer::Metrics::Store.new

memoize def streams
config.streams.map do |name, subjects|
NatsStreamer::Stream.new(jsm:, name:, subjects:, metrics_store:).tap(&:run)
end
end

memoize def client
NATS.connect(config.server_url).tap do
info { "Connected to #{config.server_url}" }
end.jsm
end
end
end
23 changes: 8 additions & 15 deletions lib/nats_streamer/config.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
# frozen_string_literal: true

class NatsStreamer::Config < Dry::Struct
Types = Dry.Types
T = Dry.Types

class Subscriber < Dry::Struct
attribute :name, Types::String # Must be uniq
attribute :url, Types::String
attribute :params, Types::Hash.map(Types::Coercible::String, Types::String).default({}.freeze)
attribute :name, T::String # Must be uniq
attribute :url, T::String
attribute :params, T::Hash.map(T::Coercible::String, T::String).default({}.freeze)
end

attribute :server_url, Types::Coercible::String
Events = T::Hash.map(T::Coercible::String, T::Array.of(Subscriber))
Subjects = T::Hash.map(T::Coercible::String, Events)

attribute :streams, Types::Hash.map(
Types::Coercible::String,
Types::Hash.map(
Types::Coercible::String,
Types::Hash.map(
Types::Coercible::String,
Types::Array.of(Subscriber)
)
)
)
attribute :server_url, T::Coercible::String
attribute :streams, T::Hash.map(T::Coercible::String, Subjects)
end
18 changes: 11 additions & 7 deletions lib/nats_streamer/deliverer.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# frozen_string_literal: true

class NatsStreamer::Deliverer
extend Dry::Initializer
include NatsStreamer::Helpers

include NatsStreamer::Logger
include Memery
option :subscriber, type: T.Instance(NatsStreamer::Config::Subscriber)
option :metrics_store, type: T.Instance(NatsStreamer::Metrics::Store)

option :subscriber

def deliver(**)
info_measure(-> { connection.post(".", **) }) { "Event delivered to #{subscriber.name}: #{_1.round(2)}s" }
def deliver(**params)
name = params.dig(:event, :name)
info_measure(-> { "Event #{name.inspect} delivered to #{subscriber.name.inspect}: #{_1.round(2)}s" }) do
connection.post(".", **params)
end
metrics_store.inc(:delivered, subscriber: subscriber.name)
end

private

memoize def connection = NatsStreamer::Connection.build(subscriber.url)
end
11 changes: 11 additions & 0 deletions lib/nats_streamer/helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module NatsStreamer::Helpers
T = Dry.Types

def self.included(base)
base.extend(Dry::Initializer)
base.include(NatsStreamer::Logger)
base.include(Memery)
end
end
43 changes: 18 additions & 25 deletions lib/nats_streamer/listener.rb
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
# frozen_string_literal: true

class NatsStreamer::Listener
extend Dry::Initializer
include NatsStreamer::Helpers

include NatsStreamer::Logger
include Memery

# TODO: how to propely unsubscribe?
option :jsm
option :subject
option :subscriber
option :jsm, type: T.Instance(NATS::JetStream)
option :subject, type: T::Coercible::String
option :subscriber, type: T.Instance(NatsStreamer::Config::Subscriber)
option :metrics_store, type: T.Instance(NatsStreamer::Metrics::Store)

def run
info { "Subscribing to #{subject}" }

pull do |msg|
Async { handle_message(msg) }
end
puller.pull { handle_message(_1) }
wait
end

private

memoize def durable = "nats-streamer-subscriber-#{subscriber.name}"
memoize def params_renderer = NatsStreamer::ParamsRenderer.new(subject:, params: subscriber.params)
memoize def deliverer(subscriber) = NatsStreamer::Deliverer.new(subscriber:)

def pull(&)
psub = @jsm.pull_subscribe(subject, "nats-streamer-subscriber-#{subscriber.name}")
memoize def active_tasks = NatsStreamer::ActiveTasks.new
memoize def puller = NatsStreamer::Puller.new(jsm:, subject:, durable:)
memoize def deliverer = NatsStreamer::Deliverer.new(subscriber:, metrics_store:)

loop do
psub.fetch(1).each(&)
rescue NATS::IO::Timeout
debug { "Pulling timeout, retrying..." }
end
end
def wait = active_tasks.wait

def handle_message(msg)
event = JSON.parse(msg.data, symbolize_names: true)
debug { "subject=#{subject}, event=#{event}" }
active_tasks.async do
event = JSON.parse(msg.data, symbolize_names: true)
debug { "subject=#{subject}, event=#{event}" }

deliverer(subscriber).deliver(**params_renderer.render(event))
deliverer.deliver(**params_renderer.render(event))

msg.ack
msg.ack
end
end
end
12 changes: 3 additions & 9 deletions lib/nats_streamer/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@ module NatsStreamer::Logger
Console.logger.public_send(name, self, msg, &block)
end

define_method("#{name}_measure") do |lambda, msg = nil, &block|
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC).then do |start|
lambda.call
Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
end

return public_send(name, msg) { block.call(elapsed) } unless block.nil?

public_send(name, msg)
define_method("#{name}_measure") do |lambda, &block|
Async::Clock.measure { block.call }
.then { |elapsed| public_send(name) { lambda.call(elapsed) } }
end
end
end
39 changes: 39 additions & 0 deletions lib/nats_streamer/metrics/server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

class NatsStreamer::Metrics::Server
include NatsStreamer::Helpers

PATHS = ["/metrics", "/metrics/"].freeze

NOT_FOUND = Protocol::HTTP::Response[404, {}, ["Not found"]].freeze

option :port, type: T::Integer
option :metrics_store, type: T.Instance(NatsStreamer::Metrics::Store)

def run = @task = Async { Async::HTTP::Server.new(method(:app), endpoint).run }

def stop = @task.stop
def wait = @task.wait

private

memoize def endpoint = Async::HTTP::Endpoint.parse("http://127.0.0.1:#{port}")

def app(request)
return NOT_FOUND unless PATHS.include?(request.path)

Protocol::HTTP::Response[200, {}, serialize_metrics]
end

def serialize_metrics
metrics_store.metrics.map do |value|
"#{metric_name(value)}{#{metric_tags(value)}} #{value[:value]}"
end.join("\n")
end

def metric_name(value, unit = "total")
"nats_streamer_#{value[:name]}_#{unit}"
end

def metric_tags(value) = value[:tags].map { |tag, tag_value| "#{tag}=#{tag_value.to_s.inspect}" }.join(",")
end
Loading

0 comments on commit 160a2ea

Please sign in to comment.