Scala concurrency with Future explains step by step blocking vs unblocking calls and how implicit and apply concepts work under the covers. Scala extends the Java programming language and runtime in many ways, including concurrency where Scala version of Future<T> is more flexible than the Java version as you can create futures directly from blocks of code, and you can attach callbacks to futures for handling completions.
Scala Future – blocking call
Scala’s Future[T], resides in the “scala.concurrent” package, and it is container type, representing a computation that is supposed to eventually result in a value of type T. The simple “Future” demo code below blocks until execution finishes or timeout occurs in 5 seconds.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import scala.concurrent.duration._ // 5 seconds import scala.concurrent.{Await, Future} import scala.concurrent.ExecutionContext.Implicits.global // required to create "Future {.....}" object SimpleFuture extends App { implicit val timeout = 5 seconds val f: Future[Int] = Future { Thread.sleep(2000) // 2000ms processing time 123 // fake result } val result: Int = Await.result(f, timeout); // blocked until execution finishes or // timeout occurs in 5 seconds println("I was blocked for 2000ms") println("result=" + result) } |
Output:
1 2 3 4 |
I was blocked for 2000ms result=123 |
Scala Future – non-blocking call
“onComplete” method for non-blocking execution. “Await.result” is used to ensure that the main thread does not die before the “f” completes. “f” is a closure, which will be evaluated when needed i.e. f.onComplete {….}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import scala.concurrent.duration._ // 5 seconds import scala.concurrent.{Await, Future} import scala.concurrent.ExecutionContext.Implicits.global // required to create "Future {.....}" import scala.util.{Failure, Success} object SimpleFuture extends App { implicit val timeout = 5 seconds val f: Future[Int] = Future { Thread.sleep(2000) // 2000ms processing time 123 // fake result } f.onComplete { case Success(result) => println("result=" + result) case Failure(e) => e.printStackTrace } println("I am not blocked at all") println("More tasks can be done here ....") Await.result(f, 10 seconds) // wait till f is done println("All done") } |
The default JVM thread & spawned thread
The above code runs two threads. The “main” is a default thread created by the JVM. “ForkJoinPool-1-worker-5” is a worker thread spawned by a thread pool. “import scala.concurrent.ExecutionContext.Implicits.global” is used to obtain an implicit ExecutionContext. This global context is a default thread pool.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
import scala.concurrent.duration._ // 5 seconds import scala.concurrent.{Await, Future} import scala.concurrent.ExecutionContext.Implicits.global // required to create "Future {.....}" import scala.util.{Failure, Success} object SimpleFuture extends App { implicit val timeout = 5 seconds val f: Future[Int] = Future { Thread.sleep(2000) // 2000ms processing time 123 // fake result } f.onComplete { case Success(result) => println(Thread.currentThread().getName() + " result=" + result) case Failure(e) => e.printStackTrace } val threadName = Thread.currentThread().getName() + ":"; println(threadName + " I am not blocked at all") println(threadName + " More tasks can be done here ....") Await.result(f, 10 seconds) // wait till f is done println(threadName + " All done") } |
Output:
1 2 3 4 5 6 |
main: I am not blocked at all main: More tasks can be done here .... main: All done ForkJoinPool-1-worker-5 result=123 |
Create your own thread pool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
import scala.concurrent.duration._ // 5 seconds import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} import scala.concurrent.ExecutionContext import java.util.concurrent.Executors object SimpleFuture extends App { implicit val timeout = 5 seconds //thread pool with 10 threads implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) //use the thread pool implicitly val f: Future[Int] = Future { Thread.sleep(2000) // 2000ms processing time 123 // fake result } //use the thread pool implicitly f.onComplete { case Success(result) => println(Thread.currentThread().getName() + " result=" + result) case Failure(e) => e.printStackTrace } val threadName = Thread.currentThread().getName() + ":"; println(threadName + " I am not blocked at all") println(threadName + " More tasks can be done here ....") Await.result(f, 10 seconds) // wait till f is done println(threadName + " All done") } |
Demystifying implicits & apply
The Future {….} is same as writing Future.apply {……}. The apply function is as shown below and applies the body of the function.
1 2 3 |
def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] |
So, it can be written explicitly as
1 2 3 4 5 6 7 8 9 10 |
//thread pool with 10 threads implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) //use the thread pool implicitly val f: Future[Int] = Future.apply { Thread.sleep(2000) // 2000ms processing time 123 // fake result }(ec) |
and then the future.onComplete method is like
1 2 3 |
def onComplete[U](f: Try[Int] => U)(implicit executor: ExecutionContext): Unit |
Which can be written as
1 2 3 4 5 6 7 8 |
//use the thread pool implicitly f.onComplete { case Success(result) => println(Thread.currentThread().getName() + " result=" + result) case Failure(e) => e.printStackTrace } (ec) |