Aug 9, 2018

Transactional saga in Scala II

Part II : Future Imperfect

.. in which Future-based signatures are introduced but the Futures are orchestrated naively 

Now that the problem is understood it's time to consider the second iteration. This time all transactions return Futures. It is very convenient from the perspective of building a reactive service.  Compensation actions still return Tries. Partially for the sake of simplicity, partially to reduce potential for additional errors in the error handling path. The whole saga is wrapped into a Promise to make possible for the client to wait for completion.
/** A saga transaction whose action is represented by a Future */
trait ObjectStoreTx {
def execute(ctx: TxContext): Future[Unit]
def rollback(): Try[Unit]
}
/** A saga comprised of a sequence of ObjectStoreTxs, each to be executed one by one but potentially on different threads. */
class ImperativeFutureSaga(txs: Seq[ObjectStoreTx]) extends Function0[Future[TxContext]] {
private val txIndex = new AtomicInteger(0)
def apply(): Future[TxContext] = {
val endOfSagaSignal = Promise[TxContext]()
Future {
run(txs.head, TxContext(), endOfSagaSignal)
}
endOfSagaSignal.future
}
/**
* Run a saga tx. If successful, either trigger next tx execution or signal the end of saga.
* In case of a failure, trigger rollback of the executed txs.
* @param tx the transaction to execute
* @param ctx the state to hand off to the next transaction
* @param endOfSagaSignal the promise to complete at the end of the saga
*/
private def run(tx: ObjectStoreTx, ctx: TxContext, endOfSagaSignal: Promise[TxContext]) : Unit = {
tx.
execute(ctx).
onComplete {
case Success(_) =>
val nextIndex = txIndex.incrementAndGet()
if (nextIndex < txs.length) {
run(txs(nextIndex), ctx, endOfSagaSignal)
} else {
endOfSagaSignal.success(ctx)
}
case Failure(e) =>
rollback(txIndex.get())
endOfSagaSignal.failure(e)
}
}
/** Revert previously executed stages but applying the corresponding compensation actions (in reverse order) */
private def rollback(failedTxIndex: Int) : Unit = {
for (i <- failedTxIndex-1 to 0 by -1) {
val tx = txs(i)
tx.rollback() match {
case Success(_) =>
case Failure(e) => logger.error(s"Failed to roll back ${tx.toString}", e)
}
}
}
}
As you can see, the conversion to Futures was done rather literally. On the positive side, it's so imperative one can easily see how it works. On the other hand, it looks more like Java. Stay tuned for what I learned during the code review.

No comments: