Concurrent evaluation

The default interpretation of Eff values is “monadic” meaning that effectful values are being evaluated in order. This becomes clear when traversing a list of values with the FutureEffect:

import org.atnos.eff._, all._, future._, syntax.all._
import cats.Eval
import cats.syntax.traverse._
import cats.instances.list._
import scala.concurrent._, duration._,
import org.atnos.eff.syntax.future._

type WriterString[A] = Writer[String, A]
type _writerString[R] = WriterString |= R

type S = Fx.fx3[Eval, TimedFuture, WriterString]

implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext

def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] =
  for {
    i1 <- delay(i)
    i2 <- futureDelay(i1)
    _  <- tell(i2.toString)
  } yield i2

val action: Eff[S, List[Int]] =
  List(1000, 500, 50).traverse(execute[S])

Await.result(action.runEval.runWriterLog.runSequential, 2.seconds)

> List(1000, 500, 50)

We can however run all those computations concurrently using the applicative execution for Eff:

val action: Eff[S, List[Int]] =
  List(1000, 500, 50).traverseA(execute[S])

Await.result(Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(scheduler, global), 2.seconds)

> List(1000, 500, 50)

This uses now traverseA (instead of traverse) to do an applicative traversal and execute futures concurrently and the fastest actions finish first.


Another advantage of applicative effects is that we can intercept them individual requests and “batch” them into one single request. For example:

import org.atnos.eff._, all._, syntax.all._
import cats.implicits._

// An effect to get users from a database
// calls can be individual or batched
case class User(i: Int)
sealed trait UserDsl[+A]

case class GetUser(i: Int) extends UserDsl[User]
case class GetUsers(is: List[Int]) extends UserDsl[List[User]]
type _userDsl[R] = UserDsl /= R

def getUser[R :_userDsl](i: Int): Eff[R, User] =
  send[UserDsl, R, User](GetUser(i))

Let’s create an interpreter for this DSL:

// the real method calls to a webservice
def getWebUser(i: Int): User = User(i)
def getWebUsers(is: List[Int]): List[User] = => User(i))

// the interpreter simply calls the webservice
// and return a trace of the executed call
def runDsl[A](eff: Eff[Fx1[UserDsl], A]): (A, Vector[String]) = {
  def go(e: Eff[Fx1[UserDsl], A], trace: Vector[String]): (A, Vector[String]) =
    e match {
      case Pure(a,_) => (a, trace)
      case Impure(UnionTagged(GetUser(i), _), c, _)   => go(c(getWebUser(i)), trace :+ "getWebUser")
      case Impure(UnionTagged(GetUsers(is), _), c, _) => go(c(getWebUsers(is)), trace :+ "getWebUsers")
      case ap @ ImpureAp(_, _, _)                     => go(ap.toMonadic, trace)
      case Impure(_, _, _)                            => sys.error("this should not happen with just one effect")
  go(eff, Vector())

We can also optimise a UserDsl program by providing a Batchable instance describing how to “batch” 2 calls into 1:

implicit def BatchableUserDsl: Batchable[UserDsl] = new Batchable[UserDsl] {
  type Z = List[User]
  type E = User

  def distribute(z: List[User]) = z

  def batch[X, Y](tx: UserDsl[X], ty: UserDsl[Y]): Option[UserDsl[Z]] = Option {
    (tx, ty) match {
      case (GetUser(i),   GetUser(j))   => GetUsers(List(i, j))
      case (GetUser(i),   GetUsers(is)) => GetUsers(i :: is)
      case (GetUsers(is), GetUser(i))   => GetUsers(is :+ i)
      case (GetUsers(is), GetUsers(js)) => GetUsers(is ++ js)

Now let’s create a program using the User DSL with applicative calls which can be optimised:

def program[R :_userDsl]: Eff[R, List[User]] =
  Eff.traverseA(List(1, 2, 3))(i => getUser(i))

And its optimised version:

def optimised[R :_userDsl]: Eff[R, List[User]] =

Running the optimised and non-optimised version of the program must yield the same results:

show(runDsl(program[Fx1[UserDsl]]), runDsl(optimised[Fx1[UserDsl]]))
original:  User(1), User(2), User(3)
  trace: getWebUser, getWebUser, getWebUser

optimised: User(1), User(2), User(3)
  trace: getWebUsers