Skip to content
This repository has been archived by the owner on May 11, 2023. It is now read-only.

Commit

Permalink
Basic implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zhulik committed Feb 9, 2023
1 parent 9c3c00c commit de200f0
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@

# rspec failure tracking
.rspec_status

/config.yaml
1 change: 0 additions & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Layout/LineLength:
Metrics/BlockLength:
Exclude:
- spec/**/*_spec.rb
- "*.gemspec"

Metrics/MethodLength:
Max: 10
Expand Down
5 changes: 5 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ source "https://rubygems.org"
gem "async"
gem "nats-pure"

gem "faraday"
gem "memery"
gem "zeitwerk"

gem "dry-initializer"
gem "dry-struct"

group :development, :test do
gem "rake"

Expand Down
33 changes: 33 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,41 @@ GEM
backport (1.2.0)
benchmark (0.2.1)
childprocess (4.1.0)
concurrent-ruby (1.2.0)
console (1.16.2)
fiber-local
crack (0.4.5)
rexml
diff-lcs (1.5.0)
docile (1.4.0)
dry-core (1.0.0)
concurrent-ruby (~> 1.0)
zeitwerk (~> 2.6)
dry-inflector (1.0.0)
dry-initializer (3.1.1)
dry-logic (1.5.0)
concurrent-ruby (~> 1.0)
dry-core (~> 1.0, < 2)
zeitwerk (~> 2.6)
dry-struct (1.6.0)
dry-core (~> 1.0, < 2)
dry-types (>= 1.7, < 2)
ice_nine (~> 0.11)
zeitwerk (~> 2.6)
dry-types (1.7.0)
concurrent-ruby (~> 1.0)
dry-core (~> 1.0, < 2)
dry-inflector (~> 1.0, < 2)
dry-logic (>= 1.4, < 2)
zeitwerk (~> 2.6)
e2mmap (0.1.0)
faraday (2.7.4)
faraday-net_http (>= 2.0, < 3.1)
ruby2_keywords (>= 0.0.4)
faraday-net_http (3.0.2)
fiber-local (1.0.0)
hashdiff (1.0.1)
ice_nine (0.11.2)
iniparse (1.5.0)
io-event (1.1.6)
jaro_winkler (1.5.4)
Expand All @@ -32,6 +58,8 @@ GEM
rexml
kramdown-parser-gfm (1.1.0)
kramdown (~> 2.0)
memery (1.4.1)
ruby2_keywords (~> 0.0.2)
nats-pure (2.2.1)
nokogiri (1.14.1-x86_64-linux)
racc (~> 1.4)
Expand Down Expand Up @@ -88,6 +116,7 @@ GEM
rubocop (~> 1.33)
rubocop-capybara (~> 2.17)
ruby-progressbar (1.11.0)
ruby2_keywords (0.0.5)
simplecov (0.22.0)
docile (~> 1.1)
simplecov-html (~> 0.11)
Expand Down Expand Up @@ -128,6 +157,10 @@ PLATFORMS
DEPENDENCIES
async
async-rspec
dry-initializer
dry-struct
faraday
memery
nats-pure
overcommit
rake
Expand Down
2 changes: 1 addition & 1 deletion bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# frozen_string_literal: true

require "bundler/setup"
require "nats_streamer"
require_relative "../lib/nats_streamer"

# You can add fixtures and/or initialization code here to make experimenting
# with your gem easier. You can also use a different console, if you like.
Expand Down
8 changes: 7 additions & 1 deletion exe/nats_streamer
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require "nats_streamer"
require_relative "../lib/nats_streamer"

CONFIG = YAML.load(File.read(ARGV[0]), symbolize_names: true)

Sync do
NatsStreamer::Application.new(url: "nats://127.0.0.1:4222", config: NatsStreamer::Config.new(CONFIG)).run
end
10 changes: 10 additions & 0 deletions lib/nats_streamer.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
# frozen_string_literal: true

require "erb"
require "yaml"

require "zeitwerk"

require "async"
require "async/notification"
require "nats/client"
require "memery"
require "faraday"

require "dry-initializer"
require "dry/struct"
require "dry/types"

loader = Zeitwerk::Loader.for_gem
loader.setup
Expand Down
25 changes: 25 additions & 0 deletions lib/nats_streamer/application.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

class NatsStreamer::Application
extend Dry::Initializer

include NatsStreamer::Logger
include Memery

option :url
option :config

def run
config.streams.each do |name, subjects|
NatsStreamer::Stream.new(jsm:, name:, subjects:).run
end

Async::Notification.new.wait # wait forever
end

private

memoize def client = NATS.connect(url)

def jsm = client.jsm
end
22 changes: 22 additions & 0 deletions lib/nats_streamer/config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

class NatsStreamer::Config < Dry::Struct
Types = Dry.Types

class Subscriber < Dry::Struct
attribute :name, Types::String # Must be uniq
attribute :url, Types::String
attribute :params, Types::Hash.map(Types::Coercible::String, Types::String).default({}.freeze)
end

attribute :streams, Types::Hash.map(
Types::Coercible::String,
Types::Hash.map(
Types::Coercible::String,
Types::Hash.map(
Types::Coercible::String,
Types::Array.of(Subscriber)
)
)
)
end
20 changes: 20 additions & 0 deletions lib/nats_streamer/deliverer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

class NatsStreamer::Deliverer
extend Dry::Initializer

include NatsStreamer::Logger
include Memery

# TODO: how to propely unsubscribe?
option :subscriber

def deliver(**) = connection.post(".", **)

memoize def connection
Faraday.new(subscriber.url) do |f|
f.request :json
f.response :raise_error
end
end
end
43 changes: 43 additions & 0 deletions lib/nats_streamer/listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

class NatsStreamer::Listener
extend Dry::Initializer

include NatsStreamer::Logger
include Memery

# TODO: how to propely unsubscribe?
option :jsm
option :subject
option :subscriber

def run
pull do |msg|
Async { handle_message(msg) }
end
end

private

memoize def params_renderer = NatsStreamer::ParamsRenderer.new(subject:, subscriber:)
memoize def deliverer(subscriber) = NatsStreamer::Deliverer.new(subscriber:)

def pull(&)
psub = @jsm.pull_subscribe(subject, "nats-streamer-subscriber-#{subscriber.name}")

loop do
psub.fetch(1).each(&)
rescue NATS::IO::Timeout
debug { "Pulling timeout, retrying..." }
end
end

def handle_message(msg)
event = JSON.parse(msg.data, symbolize_names: true)
debug { "subject=#{subject}, event=#{event}" }

deliverer(subscriber).deliver(**params_renderer.render(event))

msg.ack
end
end
9 changes: 9 additions & 0 deletions lib/nats_streamer/logger.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module NatsStreamer::Logger
[:debug, :info, :warn, :error, :fatal].each do |name|
define_method(name) do |msg = nil, &block|
Console.logger.public_send(name, self, msg, &block)
end
end
end
18 changes: 18 additions & 0 deletions lib/nats_streamer/params_renderer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

class NatsStreamer::ParamsRenderer
extend Dry::Initializer

include NatsStreamer::Logger
include Memery

option :subject
option :subscriber

def render(event)
subscriber.params.transform_values do |v|
binding.local_variable_set(:event, event)
ERB.new(v).result(binding)
end.merge(event:, subject:)
end
end
41 changes: 41 additions & 0 deletions lib/nats_streamer/stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

class NatsStreamer::Stream
extend Dry::Initializer

include NatsStreamer::Logger
include Memery

option :jsm
option :name
option :subjects

def run
create_stream!

subjects.each do |subject, events|
events.each do |event_name, subscribers|
subscribers.each do |subscriber|
subject = [subject, event_name].join(".")
NatsStreamer::Listener.new(jsm:, subject:, subscriber:).run
end
end
end
end

private

# ["{subject_name}.{event_name}"]
memoize def subject_names
subjects.flat_map do |subject, events|
[subject].product(events.keys).map { _1.join(".") }
end
end

def create_stream!
jsm.add_stream(name:, subjects: subject_names)
rescue NATS::JetStream::Error::BadRequest
jsm.delete_stream(name)
retry
end
end

0 comments on commit de200f0

Please sign in to comment.