accept: (State, Input) => State
Any mutable, side-effect free code can be rewritten into this form and be reused on any iterator with a `scanLeft` method.
Any mutable, side-effecting code can be rewritten into this form, followed by your side-effecting code.
Because this function is not tied to any particular streaming implementation, you can plug it into akka-streams Flow (and thus akka-streams-kafka), as well as Scala's FS2 functional streams or a ScanLeft/ScanRight, or FoldLeft/FoldRight, or a LazyList. into a mutable while loop, a pure test, a tail recursive function, whatever — you are never locked into any one particular orchestration.
I shall give you some examples of this:
var state: State = initialState
while (true) {
dumpState(state)
state = accept(state, fetchInput())
}
iteratorInput()
.scanLeft(initialState)(accept)
flowInput
.scan(initialState)(accept)
.runForeach(dumpState)
@tailrec
def go(state: State): Unit = {
dumpState(state)
go(accept(state, fetchInput()))
}
go(initialState)
But suppose you don't want to output the full state however, but just want to emit an Output
.
Your Output
can be Iterator[Stuff]
, or Either[Bad, Good]
Or a
Try[Good]
or SomeTrait
or anything you like.
var state: State = initialState
while (true) {
dump(extract(state))
state = accept(state, fetchInput())
}
iteratorInput()
.scanLeft(initialState)(accept)
.map(extract)
.foreach(dump)
flowInput
.scan(initialState)(accept)
.map(extract)
.runForeach(dump)
@tailrec
def go(state: State): Unit = {
dump(extract(state))
go(accept(state, fetchInput()))
}
go(initialState)
We'll deduplicate consecutive items here. Sample usage:
Iterator.apply[String]("X", "Y", "Y").scanLeft(Deduplicate.initial[String])(_.accept(_)).flatMap(_.emit)
/**
* accept: (Deduplicate[T], T) => Deduplicate[T]
* extract: emit: Deduplicate[T] => Option[T]
*/
case class Deduplicate[T](lastSeen: Option[T], emit: Option[T]) {
def accept(input: T): Deduplicate[T] = {
if (lastSeen.contains(input)) copy(emit = None)
else Deduplicate(Some(input), Some(input))
}
}
object Deduplicate {
def initial[T]: Deduplicate[T] = Deduplicate(None, None)
}
Look at just how easy it is!
class DeduplicationSample$Test extends FunSuite {
import DeduplicationSample.Deduplicate.initial
test("Empty is empty") {
initial.emit shouldBe empty
}
test("Single is emmitted") {
initial.accept("Stuff").emit shouldBe Some("Stuff")
}
test("Two in a row makes one emit") {
initial.accept("Stuff").accept("Stuff").emit shouldBe empty
}
test("Three in a row makes one emit") {
initial.accept("Stuff").accept("Stuff").accept("Stuff").emit shouldBe empty
}
test("X and then Y gives Y at the end") {
initial.accept("X").accept("Y").emit shouldBe Some("Y")
}
test("X and then Y and then Y gives Y") {
initial.accept("X").accept("Y").accept("Y").emit shouldBe empty
}
}
The input type can be a List[I]
, so now you can support batching.
Your lists can have size 0, size 1 or size n.
By using this pure immutable approach, your code can be used from multiple places and even extremely easily tested. Overcomplicate it, and your wiring becomes your "domain logic".
Note that you can reuse this code from now on, in any project, in any sort of implementation. You can even plonk it inside a Free Monad if you wish to do so. Because it's simple and is the minimal necessary abstraction to get most work done.
You can test the whole thing step-by-step with simple pure functions.
These things can also be called State Machines. Separate pure logic from side effects, that's all.
Just wrote this: https://t.co/TXQbzeKL2y
— William Narmontas (@ScalaWilliam) December 14, 2016
It's a draft but important enough to go out :) #streaming #kafka #reactive #akka #scala #jvm