The default interpretation of Eff
values is “monadic”
meaning that effectful values are being evaluated in order. This becomes
clear when traversing a list of values with the
FutureEffect
:
import org.atnos.eff._, all._, future._, syntax.all._
import cats.Eval
import cats.data.Writer
import cats.syntax.traverse._
import cats.instances.list._
import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import org.atnos.eff.concurrent.Scheduler
import org.atnos.eff.syntax.future._
type WriterString[A] = Writer[String, A]
type _writerString[R] = WriterString |= R
type S = Fx.fx3[Eval, TimedFuture, WriterString]
implicit val scheduler: Scheduler = ExecutorServices.schedulerFromGlobalExecutionContext
def execute[E: _eval: _writerString: _future](i: Int): Eff[E, Int] =
for {
i1 <- delay(i)
i2 <- futureDelay(i1)
_ <- tell(i2.toString)
} yield i2
val action: Eff[S, List[Int]] =
List(1000, 500, 50).traverse(execute[S])
Await.result(action.runEval.runWriterLog.runSequential, 2.seconds)
> List(1000, 500, 50)
We can however run all those computations concurrently using the
applicative execution for Eff
:
val action: Eff[S, List[Int]] =
List(1000, 500, 50).traverseA(execute[S])
Await.result(
Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(scheduler, global),
2.seconds
)
> List(1000, 500, 50)
This uses now traverseA
(instead of
traverse
) to do an applicative traversal and execute
futures concurrently and the fastest actions finish first.
Another advantage of applicative effects is that we can intercept them individual requests and “batch” them into one single request. For example:
import org.atnos.eff._, all._, syntax.all._
// An effect to get users from a database
// calls can be individual or batched
case class User(i: Int)
sealed trait UserDsl[+A]
case class GetUser(i: Int) extends UserDsl[User]
case class GetUsers(is: List[Int]) extends UserDsl[List[User]]
type _userDsl[R] = UserDsl /= R
def getUser[R: _userDsl](i: Int): Eff[R, User] =
send[UserDsl, R, User](GetUser(i))
Let’s create an interpreter for this DSL:
// the real method calls to a webservice
def getWebUser(i: Int): User = User(i)
def getWebUsers(is: List[Int]): List[User] = is.map(i => User(i))
// the interpreter simply calls the webservice
// and return a trace of the executed call
def runDsl[A](eff: Eff[Fx1[UserDsl], A]): (A, Vector[String]) = {
@tailrec
def go(e: Eff[Fx1[UserDsl], A], trace: Vector[String]): (A, Vector[String]) =
e match {
case Pure(a, _) => (a, trace)
case Impure(UnionTagged(GetUser(i), _), c, _) => go(c(getWebUser(i)), trace :+ "getWebUser")
case Impure(UnionTagged(GetUsers(is), _), c, _) => go(c(getWebUsers(is)), trace :+ "getWebUsers")
case ap @ ImpureAp(_, _, _) => go(ap.toMonadic, trace)
case Impure(_, _, _) => sys.error("this should not happen with just one effect")
}
go(eff, Vector())
}
We can also optimise a UserDsl
program by providing a
Batchable
instance describing how to “batch” 2 calls into
1:
implicit def BatchableUserDsl: Batchable[UserDsl] = new Batchable[UserDsl] {
type Z = List[User]
type E = User
def distribute(z: List[User]) = z
def batch[X, Y](tx: UserDsl[X], ty: UserDsl[Y]): Option[UserDsl[Z]] = Option {
(tx, ty) match {
case (GetUser(i), GetUser(j)) => GetUsers(List(i, j))
case (GetUser(i), GetUsers(is)) => GetUsers(i :: is)
case (GetUsers(is), GetUser(i)) => GetUsers(is :+ i)
case (GetUsers(is), GetUsers(js)) => GetUsers(is ++ js)
}
}
}
Now let’s create a program using the User
DSL with
applicative calls which can be optimised:
def program[R: _userDsl]: Eff[R, List[User]] =
Eff.traverseA(List(1, 2, 3))(i => getUser(i))
And its optimised version:
def optimised[R: _userDsl]: Eff[R, List[User]] =
program.batch
Running the optimised and non-optimised version of the program must yield the same results:
show(runDsl(program[Fx1[UserDsl]]), runDsl(optimised[Fx1[UserDsl]]))
original: User(1), User(2), User(3)
trace: getWebUser, getWebUser, getWebUser
optimised: User(1), User(2), User(3)
trace: getWebUsers