From de200f0f586af0739932e01eba64d86b5e316b4c Mon Sep 17 00:00:00 2001 From: Gleb Sinyavskiy Date: Thu, 9 Feb 2023 01:27:06 +0100 Subject: [PATCH] Basic implementation --- .gitignore | 2 ++ .rubocop.yml | 1 - Gemfile | 5 ++++ Gemfile.lock | 33 +++++++++++++++++++++ bin/console | 2 +- exe/nats_streamer | 8 +++++- lib/nats_streamer.rb | 10 +++++++ lib/nats_streamer/application.rb | 25 ++++++++++++++++ lib/nats_streamer/config.rb | 22 ++++++++++++++ lib/nats_streamer/deliverer.rb | 20 +++++++++++++ lib/nats_streamer/listener.rb | 43 ++++++++++++++++++++++++++++ lib/nats_streamer/logger.rb | 9 ++++++ lib/nats_streamer/params_renderer.rb | 18 ++++++++++++ lib/nats_streamer/stream.rb | 41 ++++++++++++++++++++++++++ 14 files changed, 236 insertions(+), 3 deletions(-) create mode 100644 lib/nats_streamer/application.rb create mode 100644 lib/nats_streamer/config.rb create mode 100644 lib/nats_streamer/deliverer.rb create mode 100644 lib/nats_streamer/listener.rb create mode 100644 lib/nats_streamer/logger.rb create mode 100644 lib/nats_streamer/params_renderer.rb create mode 100644 lib/nats_streamer/stream.rb diff --git a/.gitignore b/.gitignore index b04a8c8..f21b5fa 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ # rspec failure tracking .rspec_status + +/config.yaml \ No newline at end of file diff --git a/.rubocop.yml b/.rubocop.yml index a302480..febbcc4 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -15,7 +15,6 @@ Layout/LineLength: Metrics/BlockLength: Exclude: - spec/**/*_spec.rb - - "*.gemspec" Metrics/MethodLength: Max: 10 diff --git a/Gemfile b/Gemfile index 5b0c481..419b309 100644 --- a/Gemfile +++ b/Gemfile @@ -5,8 +5,13 @@ source "https://rubygems.org" gem "async" gem "nats-pure" +gem "faraday" +gem "memery" gem "zeitwerk" +gem "dry-initializer" +gem "dry-struct" + group :development, :test do gem "rake" diff --git a/Gemfile.lock b/Gemfile.lock index 374454d..536c699 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -15,15 +15,41 @@ GEM backport (1.2.0) benchmark (0.2.1) childprocess (4.1.0) + concurrent-ruby (1.2.0) console (1.16.2) fiber-local crack (0.4.5) rexml diff-lcs (1.5.0) docile (1.4.0) + dry-core (1.0.0) + concurrent-ruby (~> 1.0) + zeitwerk (~> 2.6) + dry-inflector (1.0.0) + dry-initializer (3.1.1) + dry-logic (1.5.0) + concurrent-ruby (~> 1.0) + dry-core (~> 1.0, < 2) + zeitwerk (~> 2.6) + dry-struct (1.6.0) + dry-core (~> 1.0, < 2) + dry-types (>= 1.7, < 2) + ice_nine (~> 0.11) + zeitwerk (~> 2.6) + dry-types (1.7.0) + concurrent-ruby (~> 1.0) + dry-core (~> 1.0, < 2) + dry-inflector (~> 1.0, < 2) + dry-logic (>= 1.4, < 2) + zeitwerk (~> 2.6) e2mmap (0.1.0) + faraday (2.7.4) + faraday-net_http (>= 2.0, < 3.1) + ruby2_keywords (>= 0.0.4) + faraday-net_http (3.0.2) fiber-local (1.0.0) hashdiff (1.0.1) + ice_nine (0.11.2) iniparse (1.5.0) io-event (1.1.6) jaro_winkler (1.5.4) @@ -32,6 +58,8 @@ GEM rexml kramdown-parser-gfm (1.1.0) kramdown (~> 2.0) + memery (1.4.1) + ruby2_keywords (~> 0.0.2) nats-pure (2.2.1) nokogiri (1.14.1-x86_64-linux) racc (~> 1.4) @@ -88,6 +116,7 @@ GEM rubocop (~> 1.33) rubocop-capybara (~> 2.17) ruby-progressbar (1.11.0) + ruby2_keywords (0.0.5) simplecov (0.22.0) docile (~> 1.1) simplecov-html (~> 0.11) @@ -128,6 +157,10 @@ PLATFORMS DEPENDENCIES async async-rspec + dry-initializer + dry-struct + faraday + memery nats-pure overcommit rake diff --git a/bin/console b/bin/console index cc5ff8d..44598ba 100755 --- a/bin/console +++ b/bin/console @@ -2,7 +2,7 @@ # frozen_string_literal: true require "bundler/setup" -require "nats_streamer" +require_relative "../lib/nats_streamer" # You can add fixtures and/or initialization code here to make experimenting # with your gem easier. You can also use a different console, if you like. diff --git a/exe/nats_streamer b/exe/nats_streamer index e80823d..168727c 100755 --- a/exe/nats_streamer +++ b/exe/nats_streamer @@ -1,4 +1,10 @@ #!/usr/bin/env ruby # frozen_string_literal: true -require "nats_streamer" +require_relative "../lib/nats_streamer" + +CONFIG = YAML.load(File.read(ARGV[0]), symbolize_names: true) + +Sync do + NatsStreamer::Application.new(url: "nats://127.0.0.1:4222", config: NatsStreamer::Config.new(CONFIG)).run +end diff --git a/lib/nats_streamer.rb b/lib/nats_streamer.rb index 2bae014..5082a30 100644 --- a/lib/nats_streamer.rb +++ b/lib/nats_streamer.rb @@ -1,9 +1,19 @@ # frozen_string_literal: true +require "erb" +require "yaml" + require "zeitwerk" require "async" +require "async/notification" require "nats/client" +require "memery" +require "faraday" + +require "dry-initializer" +require "dry/struct" +require "dry/types" loader = Zeitwerk::Loader.for_gem loader.setup diff --git a/lib/nats_streamer/application.rb b/lib/nats_streamer/application.rb new file mode 100644 index 0000000..883a366 --- /dev/null +++ b/lib/nats_streamer/application.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +class NatsStreamer::Application + extend Dry::Initializer + + include NatsStreamer::Logger + include Memery + + option :url + option :config + + def run + config.streams.each do |name, subjects| + NatsStreamer::Stream.new(jsm:, name:, subjects:).run + end + + Async::Notification.new.wait # wait forever + end + + private + + memoize def client = NATS.connect(url) + + def jsm = client.jsm +end diff --git a/lib/nats_streamer/config.rb b/lib/nats_streamer/config.rb new file mode 100644 index 0000000..0410857 --- /dev/null +++ b/lib/nats_streamer/config.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class NatsStreamer::Config < Dry::Struct + Types = Dry.Types + + class Subscriber < Dry::Struct + attribute :name, Types::String # Must be uniq + attribute :url, Types::String + attribute :params, Types::Hash.map(Types::Coercible::String, Types::String).default({}.freeze) + end + + attribute :streams, Types::Hash.map( + Types::Coercible::String, + Types::Hash.map( + Types::Coercible::String, + Types::Hash.map( + Types::Coercible::String, + Types::Array.of(Subscriber) + ) + ) + ) +end diff --git a/lib/nats_streamer/deliverer.rb b/lib/nats_streamer/deliverer.rb new file mode 100644 index 0000000..3fac0a9 --- /dev/null +++ b/lib/nats_streamer/deliverer.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +class NatsStreamer::Deliverer + extend Dry::Initializer + + include NatsStreamer::Logger + include Memery + + # TODO: how to propely unsubscribe? + option :subscriber + + def deliver(**) = connection.post(".", **) + + memoize def connection + Faraday.new(subscriber.url) do |f| + f.request :json + f.response :raise_error + end + end +end diff --git a/lib/nats_streamer/listener.rb b/lib/nats_streamer/listener.rb new file mode 100644 index 0000000..ea73928 --- /dev/null +++ b/lib/nats_streamer/listener.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +class NatsStreamer::Listener + extend Dry::Initializer + + include NatsStreamer::Logger + include Memery + + # TODO: how to propely unsubscribe? + option :jsm + option :subject + option :subscriber + + def run + pull do |msg| + Async { handle_message(msg) } + end + end + + private + + memoize def params_renderer = NatsStreamer::ParamsRenderer.new(subject:, subscriber:) + memoize def deliverer(subscriber) = NatsStreamer::Deliverer.new(subscriber:) + + def pull(&) + psub = @jsm.pull_subscribe(subject, "nats-streamer-subscriber-#{subscriber.name}") + + loop do + psub.fetch(1).each(&) + rescue NATS::IO::Timeout + debug { "Pulling timeout, retrying..." } + end + end + + def handle_message(msg) + event = JSON.parse(msg.data, symbolize_names: true) + debug { "subject=#{subject}, event=#{event}" } + + deliverer(subscriber).deliver(**params_renderer.render(event)) + + msg.ack + end +end diff --git a/lib/nats_streamer/logger.rb b/lib/nats_streamer/logger.rb new file mode 100644 index 0000000..1d34c98 --- /dev/null +++ b/lib/nats_streamer/logger.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +module NatsStreamer::Logger + [:debug, :info, :warn, :error, :fatal].each do |name| + define_method(name) do |msg = nil, &block| + Console.logger.public_send(name, self, msg, &block) + end + end +end diff --git a/lib/nats_streamer/params_renderer.rb b/lib/nats_streamer/params_renderer.rb new file mode 100644 index 0000000..b88481b --- /dev/null +++ b/lib/nats_streamer/params_renderer.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class NatsStreamer::ParamsRenderer + extend Dry::Initializer + + include NatsStreamer::Logger + include Memery + + option :subject + option :subscriber + + def render(event) + subscriber.params.transform_values do |v| + binding.local_variable_set(:event, event) + ERB.new(v).result(binding) + end.merge(event:, subject:) + end +end diff --git a/lib/nats_streamer/stream.rb b/lib/nats_streamer/stream.rb new file mode 100644 index 0000000..b866f86 --- /dev/null +++ b/lib/nats_streamer/stream.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +class NatsStreamer::Stream + extend Dry::Initializer + + include NatsStreamer::Logger + include Memery + + option :jsm + option :name + option :subjects + + def run + create_stream! + + subjects.each do |subject, events| + events.each do |event_name, subscribers| + subscribers.each do |subscriber| + subject = [subject, event_name].join(".") + NatsStreamer::Listener.new(jsm:, subject:, subscriber:).run + end + end + end + end + + private + + # ["{subject_name}.{event_name}"] + memoize def subject_names + subjects.flat_map do |subject, events| + [subject].product(events.keys).map { _1.join(".") } + end + end + + def create_stream! + jsm.add_stream(name:, subjects: subject_names) + rescue NATS::JetStream::Error::BadRequest + jsm.delete_stream(name) + retry + end +end