Async Redis key mutation notifications in Rails

I am a huge fan of Kredis. It allows Rails developers to see Redis as far more than just a fragment cache and "where jobs are".

Working with Kredis made me want to be able to run arbitrary operations in my Rails app when specific keys are modified via specific Redis commands. Redis has an excellent pub/sub infrastructure, and all Redis commands publish messages.

Why would someone want this?

While it's true that changes to data that occur within a typical Rails app are already well covered by model callbacks, state machines and other standard tooling, an entire world of real-time stream processing, ETL and multi-application use cases open up when you can run redis-cli set leastbad rules on your terminal and pick it up in your app.

Problem #1: Listening for messages blocks execution.
Solution #1: Spin up a thread!

Problem #2: Every dyno/server is going to receive the same messages, causing mayhem as developers respond to those messages with database updates. Side-effect chaos!
Solution #2: A standalone process that can be registered as a worker in Procfile... sort of like Sidekiq.

At first, I was just planning on borrowing 95% of Mike Perham's battle-hardened code. Then I realized that the Venn diagram of "people who want a Redis changeset firehose" and "Sidekiq users" is close to 100%... so I just bolted what I needed onto Sidekiq.

Try it out!

What follows is the MVP of my new gem. In fact, it's not a gem, yet: it's an initializer! It has no tests and is hours old. My janky code would make poor Mike bleed out. The goal is to see if folks actually need/want this to exist. I'm looking for feedback on what the ideal Rails-side API would actually look like.

Your Rails app needs to be up and running with Sidekiq. Just stick this in config/initializers/sidekiq.rb:

module Sidekiq
  class Subscriber
    include ::Sidekiq::Util

    def initialize
      @done = false
      @thread = nil
    end

    def start
      @thread ||= safe_thread("subscriber") {
        until @done
          Sidekiq.redis do |conn|
            # https://redis.io/topics/notifications#configuration
            conn.config(:set, "notify-keyspace-events", "E$lshz")
            # https://redis.io/topics/notifications#events-generated-by-different-commands
            conn.psubscribe("__key*__:*") do |on|
              on.psubscribe do
                @firehose = Firehose.new
              end
              on.pmessage do |pattern, command, key|
                @firehose.process(command.split(":").last.to_sym, key)
              end
              on.punsubscribe do
                @firehose = nil
              end
            end
          end
        end
        Sidekiq.logger.info("Subscriber exiting...")
      }
    end

    def terminate
      @done = true
      if @thread
        t = @thread
        Thread.kill(@thread)
        @thread = nil
        t.value
      end
    end
  end
end

module CoreExtensions
  module Sidekiq
    module Launcher
      attr_accessor :subscriber

      def initialize(options)
        @subscriber = ::Sidekiq::Subscriber.new
        super(options)
      end

      def run
        super
        subscriber.start
      end

      def quiet
        subscriber.terminate
        super
      end

      def stop
        subscriber.terminate
        super
      end
    end
  end
end

Sidekiq.configure_server do
  require "sidekiq/launcher"
  ::Sidekiq::Launcher.prepend(CoreExtensions::Sidekiq::Launcher)
end

I'm using CableReady to send console log notifications to the Console Inspector whenever a key is updated with the Redis SET command. I have a simple AllUsers ActionCable Channel in play for testing. This lives in app/lib/firehose.rb:

class Firehose
  include CableReady::Broadcaster

  attr_reader :redis

  def initialize
    @redis = ::ActionCable.server.pubsub.redis_connection_for_subscriptions
  end

  def process(command, key)
    case command # https://github.com/rails/kredis#examples
    when :set    # string, integer, json
      cable_ready["all_users"].console_log(message: "#{key} was just updated to #{redis.get(key)}").broadcast
    when :rpush  # list
    when :lrem   # unique_list
    when :sadd   # set
    when :incr
    when :decr
    when :incrby
    when :decrby
    when :exists
    when :del
      cable_ready["all_users"].console_log(message: "#{key} was deleted").broadcast
    else
    end
  end
end

As Seinfeld would say, is this anything?

8