8
Async Redis key mutation notifications in Rails
I am a huge fan of Kredis. It allows Rails developers to see Redis as far more than just a fragment cache and "where jobs are".
Working with Kredis made me want to be able to run arbitrary operations in my Rails app when specific keys are modified via specific Redis commands. Redis has an excellent pub/sub infrastructure, and all Redis commands publish messages.
While it's true that changes to data that occur within a typical Rails app are already well covered by model callbacks, state machines and other standard tooling, an entire world of real-time stream processing, ETL and multi-application use cases open up when you can run redis-cli set leastbad rules
on your terminal and pick it up in your app.
Problem #1: Listening for messages blocks execution.
Solution #1: Spin up a thread!
Problem #2: Every dyno/server is going to receive the same messages, causing mayhem as developers respond to those messages with database updates. Side-effect chaos!
Solution #2: A standalone process that can be registered as a worker
in Procfile
... sort of like Sidekiq.
At first, I was just planning on borrowing 95% of Mike Perham's battle-hardened code. Then I realized that the Venn diagram of "people who want a Redis changeset firehose" and "Sidekiq users" is close to 100%... so I just bolted what I needed onto Sidekiq.
What follows is the MVP of my new gem. In fact, it's not a gem, yet: it's an initializer! It has no tests and is hours old. My janky code would make poor Mike bleed out. The goal is to see if folks actually need/want this to exist. I'm looking for feedback on what the ideal Rails-side API would actually look like.
Your Rails app needs to be up and running with Sidekiq. Just stick this in config/initializers/sidekiq.rb
:
module Sidekiq
class Subscriber
include ::Sidekiq::Util
def initialize
@done = false
@thread = nil
end
def start
@thread ||= safe_thread("subscriber") {
until @done
Sidekiq.redis do |conn|
# https://redis.io/topics/notifications#configuration
conn.config(:set, "notify-keyspace-events", "E$lshz")
# https://redis.io/topics/notifications#events-generated-by-different-commands
conn.psubscribe("__key*__:*") do |on|
on.psubscribe do
@firehose = Firehose.new
end
on.pmessage do |pattern, command, key|
@firehose.process(command.split(":").last.to_sym, key)
end
on.punsubscribe do
@firehose = nil
end
end
end
end
Sidekiq.logger.info("Subscriber exiting...")
}
end
def terminate
@done = true
if @thread
t = @thread
Thread.kill(@thread)
@thread = nil
t.value
end
end
end
end
module CoreExtensions
module Sidekiq
module Launcher
attr_accessor :subscriber
def initialize(options)
@subscriber = ::Sidekiq::Subscriber.new
super(options)
end
def run
super
subscriber.start
end
def quiet
subscriber.terminate
super
end
def stop
subscriber.terminate
super
end
end
end
end
Sidekiq.configure_server do
require "sidekiq/launcher"
::Sidekiq::Launcher.prepend(CoreExtensions::Sidekiq::Launcher)
end
I'm using CableReady to send console log notifications to the Console Inspector whenever a key is updated with the Redis SET
command. I have a simple AllUsers
ActionCable Channel in play for testing. This lives in app/lib/firehose.rb
:
class Firehose
include CableReady::Broadcaster
attr_reader :redis
def initialize
@redis = ::ActionCable.server.pubsub.redis_connection_for_subscriptions
end
def process(command, key)
case command # https://github.com/rails/kredis#examples
when :set # string, integer, json
cable_ready["all_users"].console_log(message: "#{key} was just updated to #{redis.get(key)}").broadcast
when :rpush # list
when :lrem # unique_list
when :sadd # set
when :incr
when :decr
when :incrby
when :decrby
when :exists
when :del
cable_ready["all_users"].console_log(message: "#{key} was deleted").broadcast
else
end
end
end
As Seinfeld would say, is this anything?
8