Client-side stream processing

Solution

Given a bus and store:

struct Post {
  let id: String
  var text: String
  var likeState: Bool
}
protocol State {}
struct RootState : State {
  var userId: String? = nil
  var posts: [String:Post] = [:]
}
protocol Renderable {
  func render(_ state: State)
}
struct PostsImpression: Event {}
struct LikeRequested: Event {
  let postId: String
  let likeState: Bool
}
class Reducer : Subscriber {
  let store: Store
  let controller: Renderable
  var state: RootState
  init(store: Store, controller: Renderable, state: RootState){
    self.store = store
    self.controller = controller
    self.state = state
  }
  func onEvent(event: Event){
    switch event {
    case _ as PostsImpression:
      store.get("posts/\(state.userId!)")
      store.get("likes/\(state.userId!)")
    case let event as LikeRequested:
      store.set("likes/\(state.userId!)/\(event.postId)", event.likeState)
    case let event as Value where event.key.hasPrefix("likes"):
      let postId = event.key.components(separatedBy: "/").last!
      let likeState = event.val as! Bool
      state.posts[postId]?.likeState = likeState
      controller.render(state)
    case let event as Value where event.key.hasPrefix("posts"):
      let post = Post(
        id: event.key.components(separatedBy: "/").last!,
        text: event.val as! String,
        likeState: false) 
      state.posts[post.id] = post
      controller.render(state)
    default:
      break
    }
  }
}

Context

Redux’s reducer inspired me to think about this. Kleppmann’s blog post on turning the database inside out inspired me to think about stream processing in general.

Problem

Consolidate event processing from UI and data streams.

The weird world of observable keys 🔑👀

The get that keeps on getting.

Solution

Given a bus:

import Foundation
struct Value : Event {
  let key: String
  let val: Any?
}
protocol Store {
  func get(_ key: String)
  func set(_ key: String, _ val: Any?)
}
class LocalStore : Store {
  let db: UserDefaults
  let bus: Bus
  init(db: UserDefaults, bus: Bus){
    self.db = db
    self.bus = bus
  }
  func get(_ key: String) {
    let val = db.object(forKey: key)
    bus.pub(Value(key: key, val: val))
  }
  func set(_ key: String, _ val: Any?){
    db.set(val, forKey: key)
    bus.pub(Value(key: key, val: val))
  }
}
let bus = Bus()
bus.sub(StdoutSubscriber())
let local = LocalStore(db: UserDefaults.standard, bus: bus)
local.set("foo", "bar")
local.get("foo")

A couple features I like:

  • The bus provides a consistent interface for adding and removing subscriptions
  • There’s a straightforward get method to flush a value into the bus

Problem

If rationalizing events bubbling up from UI and disparate data sources is challenging, normalizing to a single bus interface may be helpful.

Related

Praise for the humble bus 🚌

Context

This is a stream-of-consciousness gush for a pattern I like. I start by stating some things I like followed by a pattern that produces these things and then attempt to state the problem being solved (in case other folks like me appreciate a problem statement).

I’m a fan of the unidirectional event flow first brought to my attention by React/Redux. Prakhar mentioned this is also called the yo-yo pattern. (Events bubble up, views render down). yo-yo.js provides a delightfully simple implemention. choo completes yo-yo pattern by building on yo-yo.js and injecting an event bus into the view renderer.

Slightly related, I’m also enamored by the notion of an append-only log, reverently described by Jay Kreps and Martin Kleppmann in The Log and Turning the database inside-out with Apache Samza, respectively. Kleppmann provides additional, wonderful context in Data Intensive Applications.

In my experience, event logging from a client can be tricky to maintain. A couple helpful patterns: enable stdout-logging close to the event source, and explicitly enumerate events.

Solution

In this context, I’ve developed deep appreciation for the simple pubsub pattern, and the notion of an "event bus" through which published events flow to subscribers. Although busses and logs (and indices) frequently appear together, the bus seems most primitive.

This pattern is nothing new, but here’s a simplistic implementation I find easy to reason about:

protocol Event {}
struct LikeEvent : Event {}
protocol Subscriber {
  func onEvent(event: Event)
}
class StdoutSubscriber : Subscriber {
  func onEvent(event: Event) {
    print(event)
  }
}
class Bus {
  var subscribers: [String:Subscriber] = [:]
  func sub(_ subscriber: Subscriber){
    self.subscribers[key(subscriber)] = subscriber
  }
  func unsub(subscriber: Subscriber){
    self.subscribers[key(subscriber)] = nil
  }
  func pub(_ event: Event){
    for subscriber in subscribers.values {
      subscriber.onEvent(event: event)
    }
  }
  func key(_ subscriber: Subscriber) -> String {
    return String(describing: type(of: subscriber))
  }
}
let bus = Bus()
bus.sub(StdoutSubscriber())
// ... on "like" button tap
bus.pub(LikeEvent())

Events are first-class in Node, so an easy equivalent to the above would be:

var EventEmitter = require('events')
var bus = new EventEmitter()
function stdoutSubscriber(event){
  console.log(`event=${event}`)
}
bus.on('event', stdoutSubscriber)
bus.emit('event', 'like')

Problem

Given all the above, I think the problem I find the bus solving is: reduce complexity in a distributed system by allowing event sources to publish, and event processors to subscribe, as plainly as possible.

Caveat

I think decoupling event production from processing does have a cost. We lose locality, which complicates reasoning. In cases where production/consumption can be colocated, eg async operations on a thread that’s safe to block (Finagle’s use of Scala’s composable futures is a great example), I think it’s worth considering.

Related

Node’s event emitter supports the notion of a "channel". Kafka calls them "topics". This concept reminds me of Objective C’s KVO, and Firebase’s realtime database, which allow me to subscribe to the stream of changes for a given "key" (or "path").