Aug 28, 2018

Transactional saga in Scala IV

Part V : Mapping future

.. in which Japanese researchers and a mad Russian scientist continue

With a reasonable Scala implementation and its Java approximation the original problem was solved. What kept nagging me was a feeling that something "monadic" could be possible. Sagas are not new, a lot of functional idioms are about sequences. Surely some prior art must exist.

As luck would have it I found an interesting paper almost immediately. It was both very relevant (see "2.1 Atomic Action and Workflow") and accompanied by Scala code. Fundamentally the paper considers some advanced features but it starts from a key idea of applying the Continuation Monad to chaining saga transactions. The bad news is that the original code relies on scalaz and does not work with Futures.

At first I stared at the code in despair. Then I remembered that I happen to sit nine feet from a legit mad scientist who is really into FP. He got interested in my use case and graciously adapted the code to the problem at hand. Behold the monadic version:
/** A saga transaction based on the Continuation monad */
trait ObjectStoreTx[A] {
self =>
type Tx[B, R] = B => Future[R]
type Continuation[B, R] = (Tx[B, R]) => Future[R]
def run[R]: Continuation[A, R]
final def map[B](f: A => B): ObjectStoreTx[B] = new ObjectStoreTx[B] {
def run[R]: Continuation[B, R] = { g: Tx[B, R] => self.run(f andThen g) }
}
final def flatMap[B](f: A => ObjectStoreTx[B]): ObjectStoreTx[B] = new ObjectStoreTx[B] {
def run[R]: Continuation[B, R] = { g: Tx[B, R] => self.run(f(_).run(g)) }
}
final def toFuture: Future[A] = run { a: A => Future.successful(a) }
}
/** Helper functions to convert an action represented by a Future into a saga transaction */
trait ObjectStoreTxs {
def toRevertableTx[A](action: => Future[A])(rollback: A => Try[Unit])(implicit ec: ExecutionContext): ObjectStoreTx[A] = new ObjectStoreTx[A] {
override def run[R]: Continuation[A, R] = (restOfPipeline: Tx[A, R]) =>
for {
a <- action
result <- restOfPipeline(a).transform(identity, { ex => rollback(a); ex })
} yield result
}
}
/** Compose three transactions into a saga monadically */
object ObjectStoreSaga {
def apply(writeTx: ObjectStoreTx[(String, Long)], objId: ObjectId, catalog: ObjectCatalog, storage: ObjectStorage): Future[(String, Long, String)] = {
val saga: ObjectStoreTx[SagaResult] = for {
(tmpLocation, fileSize) <- toRevertableTx(writeArrayToTmpLocation(objId, obj, path, storage)) { case (filePath, _) => deleteFile(filePath, storage) }
revision <- toRevertableTx(catalog.createRevision(objId, catalog))(rev => catalog.forgetRevision(rev))
permanentPath <- toRevertableTx(renameTmpFile(tmpLocation, storage.persistentPath(objId, revision), storage))(_ => Success(()))
} yield (permanentPath, fileSize, revision)
saga.toFuture
}
}
If you are like me it'll take you a while to meditate on the simplified code above. First continuation machinery is defined, then a means of converting an action into a continuation is provided (notice the "a: => Future[A]" syntax), and finally our favorite saga is actually composed.

Now about that continuation monad. Apparently it is classical but not really discussed even in the red book. Enough people were confused at work about it that the very same scientist came up with a lengthy explanation. I couldn't recommend it enough. Also now I know that implementing a monad in Scala requires three methods that satisfy the monadic laws.

This post is unusual because I don't fully understand the stuff I am writing about. I am still trying to wrap my head around it. My lame excuse is that very few people at work are comfortable with this code. It has nice type safety and composability. For me writing the monadic wiring from scratch would be a stretch. Lots of intellectual stimulation but merging something like that into a real codebase is a very open question for me.

What is worse is that for those who are unprepared (empirically, the vast majority) the core of this code is unmaintainable, pretty much "write-only" incantations. Concurrency could be hard but one can always go read JCiP one more time. For this stuff there is no bible. As a matter of fact I know only one book that successfully tries to put FP into practical use but it's too basic anyway.

P.S.
Speaking of monads and scientists, if you are into akka-http go look at the pull request making Directives monadic. Personally I expected the Akka guys to be much more impressed.

Aug 14, 2018

Transactional saga in Java

Part IV : Intermezzo

.. in which we try and fail exceptionally to have Futures more complete

Having developed the third Scala version I got curious about doing the same in Java.  I more or less expected the CompletableFuture to be a more verbose flavor of the Scala Future. It had also been a year since I stopped writing Java and I wanted to check how I felt about it. If you remember the last Scala iteration the following should look familiar:

interface ObjectStoreTx {
CompletableFuture<Void> execute(TxContext ctx);
Optional<Throwable> rollback();
}
public final class CompletableFutureSaga {
private CompletableFuture<TxContext> executeHeadTx(Iterator<ObjectStoreTx> txs, TxContext ctx) {
if (!txs.hasNext()) {
return CompletableFuture.completedFuture(ctx);
}
final ObjectStoreTx tx = txs.next();
return tx.
execute(ctx).
exceptionally(e -> {
rollback(tx);
throw new RuntimeException("Cancelling saga after tx failure");
}).
thenCompose(unit -> {
return executeHeadTx(txs, ctx);
});
}
private void rollback(ObjectStoreTx failedTx) {
for (int txIndex = transactions.indexOf(failedTx) - 1; txIndex >= 0; txIndex--) {
final ObjectStoreTx tx = transactions.get(txIndex);
final Optional<Throwable> e = tx.rollback();
if (e.isPresent()) {
logger.error("Failed to roll back " + tx, e.get());
}
}
}
}

At first glance "thenCompose" is like "flatMap" and "exceptionally" resembles "recoverWith". Close but no cigar. You cannot return a CompletableFuture from a function taken by CF::exceptionally. It does not cancel the rest of saga execution either. Not even in JDK10. So I had to throw an exception and to manually roll back transactions the way it worked in the second Scala version. It's tolerable but not exactly elegant.

One lesson here is that Scala Futures are at least a little more powerful than the Java ones. Once you get used to monadic composition two dozen Java signatures do not look all that readable anymore. While typing Java code I also noticed how tedious it is in comparison with Scala. All trivial bits such as constructors, semicolons, and no variable type inference result in constant friction. I am afraid I don't feel like writing Java anymore.

Aug 10, 2018

Transactional saga in Scala III

Part III : Back to the Future

.. in which we return to a Futures-based design but this time compose them idiomatically

Now that a naive Future-based version is out it's time to make it look idiomatic in the third iteration.  I was thinking about composing Futures with flatMap. I did not know how to chain multiple transaction rollbasks though. What I learned in the code review was unexpected.

A colleague caught me by surprise by showing that in a chain of Futures a recoverWith clause is called not just for the Future that failed but also for all the preceding ones.
/** Simiular to ImperativeFutureSaga but somposes Futures idiomatically. */
class FunctionalFuturesSaga(txs: Seq[ObjectStoreTx]) extends Function0[Future[TxContext]] {
def apply(): Future[TxContext] = executeHeadTx(txs, TxContext())
private def executeHeadTx(txs: Seq[ObjectStoreTx], ctx: TxContext) : Future[TxContext] = {
txs match {
case Nil => Future.successful(ctx)
case tx :: tail =>
tx.
execute(ctx).
flatMap {_ => executeHeadTx(tail, ctx)}.
recoverWith { case e: Exception =>
e match {
case _: NestedTxException => // the actual failed tx has been logged
case _ => logger.error(s"Failed to execute ${tx.toString}")
}
tx.rollback() match {
case Success(_) =>
case Failure(ue) => logger.error(s"Failed to roll back ${tx.toString}", ue)
}
Future.failed(new NestedTxException(e))
}
}
}
/** To distinguish between the actual error happening and its propagation through the chain of Futures */
private final class NestedTxException(e: Exception) extends RuntimeException(e)
}
Another small difference is the nested exception class. The problem it solves is logging the transaction that failed. Without it every rolled back transaction would be logged. This cosmetic improvement doubles the size of the recoverWith body though. Without it this version looks really slick.

At this point I was happy with my pull request. But one thought spoiled the pleasure. We are dealing with a sequence of monadic abstractions. We use flatMap. Could there be some truly monadic approach to composing sagas? Yes, we are finally leaving common sense territory and going some controversial places.

Aug 9, 2018

Transactional saga in Scala II

Part II : Future Imperfect

.. in which Future-based signatures are introduced but the Futures are orchestrated naively 

Now that the problem is understood it's time to consider the second iteration. This time all transactions return Futures. It is very convenient from the perspective of building a reactive service.  Compensation actions still return Tries. Partially for the sake of simplicity, partially to reduce potential for additional errors in the error handling path. The whole saga is wrapped into a Promise to make possible for the client to wait for completion.
/** A saga transaction whose action is represented by a Future */
trait ObjectStoreTx {
def execute(ctx: TxContext): Future[Unit]
def rollback(): Try[Unit]
}
/** A saga comprised of a sequence of ObjectStoreTxs, each to be executed one by one but potentially on different threads. */
class ImperativeFutureSaga(txs: Seq[ObjectStoreTx]) extends Function0[Future[TxContext]] {
private val txIndex = new AtomicInteger(0)
def apply(): Future[TxContext] = {
val endOfSagaSignal = Promise[TxContext]()
Future {
run(txs.head, TxContext(), endOfSagaSignal)
}
endOfSagaSignal.future
}
/**
* Run a saga tx. If successful, either trigger next tx execution or signal the end of saga.
* In case of a failure, trigger rollback of the executed txs.
* @param tx the transaction to execute
* @param ctx the state to hand off to the next transaction
* @param endOfSagaSignal the promise to complete at the end of the saga
*/
private def run(tx: ObjectStoreTx, ctx: TxContext, endOfSagaSignal: Promise[TxContext]) : Unit = {
tx.
execute(ctx).
onComplete {
case Success(_) =>
val nextIndex = txIndex.incrementAndGet()
if (nextIndex < txs.length) {
run(txs(nextIndex), ctx, endOfSagaSignal)
} else {
endOfSagaSignal.success(ctx)
}
case Failure(e) =>
rollback(txIndex.get())
endOfSagaSignal.failure(e)
}
}
/** Revert previously executed stages but applying the corresponding compensation actions (in reverse order) */
private def rollback(failedTxIndex: Int) : Unit = {
for (i <- failedTxIndex-1 to 0 by -1) {
val tx = txs(i)
tx.rollback() match {
case Success(_) =>
case Failure(e) => logger.error(s"Failed to roll back ${tx.toString}", e)
}
}
}
}
As you can see, the conversion to Futures was done rather literally. On the positive side, it's so imperative one can easily see how it works. On the other hand, it looks more like Java. Stay tuned for what I learned during the code review.

Aug 8, 2018

Transactional saga in Scala I

Part I : Tried But Not True

.. in which we come across a real-life problem, establish its context, and try a naive approach to set the stage for the following posts

Recently I faced a practical problem at work which resulted in four iterations of a promising idea. I found the process instructive enough to share some details with the world. Individual iterations could be not particularly interesting but it's relatively unusual to see a head-to-head comparison of this kind. In this and following posts I am going to embed only skeleton implementations from a gist. If you are interested in detail, please see the real code with tests. 

The problem I was working on was making an operation spanning HDFS and RDBMS transactional. Imagine a (micro)service dealing with file uploads. In a typical flow you first put a file into a temporary location, then register it in the DB, and at last move the file to its real location based on a version-like value returned by the DB. We've got three atomic transactions. They should either all succeed or the ones that have been executed by the time of a failure must be rolled back.

My first and, frankly, only idea was to use sagas. It sounds epic :) but it was not supposed to be about fancy CQRS frameworks. I was just looking for a clean way to compose a few basic Scala types such as Try or Future. Also notice that the entire operation is stateful - each transaction returns some values and usually uses the values produced by previously executed transactions. We'll refer to that sate as transaction context. We can represent the basic abstractions like that
/** The state shared among the transactions of a saga */
trait TxContext {
def getLong(name: String): Long
def setLong(name: String, value: Long): Unit
}
/** An object id with a revision assigned by the storage */
case class ObjectId(name: String, revision: Option[String])
/** In real life files would be in HDFS */
trait ObjectStorage {
def createFile(obj: Array[Byte], path: String): Try[Unit]
def deleteFile(path: String): Option[Try[Unit]]
}
/** In real life it would invoke a DAO to modify the DB */
trait ObjectCatalog {
def createRevision(objId: ObjectId): Try[String]
def forgetRevision(objId: ObjectId): Try[Unit]
}
view raw TxContext.scala hosted with ❤ by GitHub


  • TxContext represents shared state with type-safe access
  • ObjectStore represents operations with HDFS files; some of them return a Future because input could be an akka-stream Source, others take an array and so return a Try
  • ObjectCatalog represents operations with RDBMS rows


My first iteration was about sketching the general flow of a saga. To begin with there were individual transactions doing actual work. Then there was some looping machinery to execute them and support automatic rollback. 
/** A saga transaction whose action is represented by a Try */
trait ObjectStoreTx {
def run(ctx: TxContext): Try[Unit]
def rollback(): Try[Unit]
}
/** A saga comprised of a sequence of ObjectStoreTxs executed on the calling thread */
class TryListSaga(txs: List[ObjectStoreTx]) extends Function0[Try[TxContext]] {
def apply(): Try[TxContext] = {
val ctx = TxContext()
for (txIndex <- txs.indices) {
val tx = txs(txIndex)
tx.run(ctx) match {
case Failure(e) =>
rollback(txIndex)
return Failure(e)
case _ =>
}
}
Success(ctx)
}
private def rollback(failedTxIndex: Int) : Unit = {
for (i <- failedTxIndex-1 to 0 by -1) {
val tx = txs(i)
tx.rollback() match {
case Failure(e) => logger.error(s"Failed to roll back ${tx.toString}", e)
case _ =>
}
}
}
}
view raw TrySaga.scala hosted with ❤ by GitHub
The ObjectStoreTx trait represents a transaction that can be executed with a context or rolled back. The TryListSaga object implements a loop that executes transactions from a list sequentially and can trigger rollback in case of error.

One obvious problem that the code surfaced is very common in Scala if you mix APIs based on Futures and Tries. No only they do not compose easily but also "waiting on a Future" should happen only at the top level. I had to find a way to rely on Futures everywhere.