tersesystems.com Open in urlscan Pro
188.114.97.3  Public Scan

Submitted URL: http://www.tersesystems.com/
Effective URL: https://tersesystems.com/
Submission: On October 17 via api from US — Scanned from NL

Form analysis 0 forms found in the DOM

Text Content

TERSE SYSTEMS




CATEGORIES

Industry
Life
Logging
Security
Software




EXECUTIONCONTEXT.PARASITIC AND FRIENDS

20 Jun 2024 • software

There's a lot of confusion around execution contexts and how they work in Scala.
The description in Futures and Promises does a good job of explaining the
concept of how Futures work, but does not explain what the difference is between
ExecutionContext.global, ExecutionContext.parasitic, and
ExecutionContext.opportunistic, and where and when you would want to use these.


EXECUTIONCONTEXT.GLOBAL

Let's start off by explaining what execution contexts and executors actually do,
starting with the global execution context. Here's the simplest possible
program:

package example

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object Exercise1 {
  private val logger = org.slf4j.LoggerFactory.getLogger(getClass)

  def main(args: Array[String]): Unit = {
    val f = Future {
      Thread.sleep(1000) // simulate a blocking operation.
      logger.info("Executed future!")
    }(ExecutionContext.global)

    // block until the future completes
    Await.ready(f, Duration.Inf)
  }
}


The relevant bit is the Future(...)(ExecutionContext.global). This says to use
the global execution context for the block in the Future. Under the hood, the
execution context turns the block into a Runnable and calls executor.submit on
it.

If you wanted to do it in a Java style, Future would look roughly like this:

val runnable = new Runnable() {
  override def run = {
    Thread.sleep(1000)
    logger.info("Executed future!")
  }
}
globalExecutor.execute(runnable)


When you run this program, you'll get a log statement like:

13:27:48.832 example.Exercise1$ INFO  [scala-execution-context-global-21]: Executed future!


Note that scala-execution-context-global-21 thread name – the global execution
context has its own internal executor, which manages its own threads. The
executor can technically do anything it wants to execute the runnable in any
order, using any thread pool. Most executors will put the runnable on a task
queue, which can either be bounded – they have a maximum size and will reject
tasks if the queue is full – or unbounded – you can keep adding tasks until the
JVM runs out of memory.

The goal of concurrency is to use as few resources as you can get away with,
including threads. Creating threads and context switching between threads is
expensive, and so for fork/join pools you want only as many threads as you have
CPU cores. In addition, the more executor services you're using, the more work
the system has to do to manage.

Using a Future.apply here is technically an anti-pattern. In production code,
kicking off a Future is almost always done by an external service – either
you're calling a database and waiting for a result, or you're accumulating a
stream of bytes that will eventually turn into a parsable format, or you're
using a callback service like GRPC and writing to a Promise. These external
services will typically manage their own thread pools. In the majority of cases,
you'll be dealing with the transformation of an existing Future, rather than
creating your own.

Let's look at two futures:

object Exercise2 {
  private val logger = org.slf4j.LoggerFactory.getLogger(getClass)

  def main(args: Array[String]): Unit = {
    import ExecutionContext.Implicits.global

    val f1 = Future {
      logger.info("Executed future1!")
    }
    val f2 = Future {
      logger.info("Executed future2!")
    }

    val f1andf2 = f1.zip(f2)
    Await.ready(f1andf2, Duration.Inf)
  }
}


The two futures are created and placed into the task queue, and the global
execution context attempts to run them at the same time:

13:35:00.235 example.Exercise2$ INFO  [scala-execution-context-global-21]: Executed future1!
13:35:00.235 example.Exercise2$ INFO  [scala-execution-context-global-22]: Executed future2!


Note that there's no requirement that f1 and f2 execute in order: concurrency
means they can happen in any order, and parallelism means that they can both
execute at once. Here, the futures executed in parallel, each using a different
thread on a different CPU core.

Because Scala considers Future[_] to produce a value at some point in the
future, all computations have to be through operators that compose Future[_],
notably flatMap and map. Each of these also takes an implicit execution context,
and is turned into a Runnable under the hood.

import ExecutionContext.Implicits.global

val f = Future {
  val s = "f1"
  logger.info(s)
  s
}.map { s =>
  val s2 = s + " f2"
  logger.info(s2)
  s2
}.map { s2 =>
  val s3 = s2 + " f3"
  logger.info(s3)
}


When we run this, all the map statements take the global execution context, and
are executed sequentially because they depend on output.

14:12:14.406 example.Exercise3$ INFO  [scala-execution-context-global-21]: f1
14:12:14.408 example.Exercise3$ INFO  [scala-execution-context-global-21]: f1 f2
14:12:14.408 example.Exercise3$ INFO  [scala-execution-context-global-21]: f1 f2 f3


Whenever you have a Future block, you're using a Runnable. Whenever you're using
map, flatMap, and so on, that's essentially jamming a bunch of Runnable
together.



Note that these can run on the same thread, but it's entirely possible that each
of these tasks run in a different thread. This can be needlessly expensive, as a
context switch between threads incurs an overhead for what should be a
straightforward operation.

Finally, ExecutionContext.global creates a ForkJoinPool for its executor, with
some tweaks to size it for the CPU cores. A more detailed breakdown of the
internals can be found here, but that's basically it.

There is a blocking construct that can improve parallelism, but I have never
seen this used in production code – it's more maintainable to just use a
dedicated custom execution context that's tuned for IO.


EXECUTIONCONTEXT.PARASITIC

The problems with Future underperforming in some scenarios were well known, and
in 2.13 there were a bunch of changes to streamline and optimize performance.

One solution to this is to use a "synchronous" execution context, also known as
a trampoline execution context. In 2.13, this is ExecutionContext.parasitic.

The pull request for ExecutionContext.parasitic explains the underlying
mechanism: if the future is completed, it will run on the registering thread,
and if the future is not yet completed, it will run on the completing thread.

This means that parasitic – despite being "synchronous" – is still not a
solution to using execution contexts in conjunction with thread local storage.
You will run into problems if you use an unmanaged parasitic with a system that
relies on thread local storage, such as MDC or Opentelemetry.

For example, if we use parasitic here:

object Exercise4 {
  private val logger = org.slf4j.LoggerFactory.getLogger(getClass)

  def toIntFuture(f: Future[String]): Future[Int] = {
    Thread.sleep(1000)
    f.map { s =>
      logger.info(s"toIntFuture: converting $s")
      s.toInt
    }(ExecutionContext.parasitic)
  }

  def main(args: Array[String]): Unit = {
    val stringFuture = Future {
      logger.info("starting!")
      "42"
    }(ExecutionContext.global)
    val f = toIntFuture(stringFuture)
    val result = Await.result(f, Duration.Inf)
    logger.info(s"result = $result")
  }
}


Then the map runs on the main thread.

19:28:39.413 example.Exercise4$ INFO  [scala-execution-context-global-21]: starting!
19:28:40.414 example.Exercise4$ INFO  [main]: toIntFuture: converting 42
19:28:40.467 example.Exercise4$ INFO  [main]: result = 42


But move the Thread.sleep:

object Exercise4 {
  private val logger = org.slf4j.LoggerFactory.getLogger(getClass)

  def toIntFuture(f: Future[String]): Future[Int] = {
    f.map { s =>
      logger.info(s"toIntFuture: converting $s")
      s.toInt
    }(ExecutionContext.parasitic)
  }

  def main(args: Array[String]): Unit = {
    val stringFuture = Future {
      logger.info("starting!")
      Thread.sleep(1000)
      "42"
    }(ExecutionContext.global)
    val f = toIntFuture(stringFuture)
    val result = Await.result(f, Duration.Inf)
    logger.info(s"result = $result")
  }
}


And you'll see parasitic run on the global execution context.

15:05:56.776 example.Exercise4$ INFO  [scala-execution-context-global-21]: starting!
15:05:57.779 example.Exercise4$ INFO  [scala-execution-context-global-21]: toIntFuture: converting 42
15:05:57.789 example.Exercise4$ INFO  [main]: result = 42


The parasitic execution context is both useful and dangerous. Not only does it
reduce the chance of a context switch, but it can also simplify code. For
example, it's common for methods to look like this:

class FooService @Inject()(fooDAO: FooDAO)(implicit ec: ExecutionContext) {
  def getAge(id: ID): Future[Int] = {
    fooDAO.get(id).flatMap { foo =>
      foo.age
    }
  }
}


In this situation, the long-running operation to get a Foo shouldn't need an
execution context just to map from foo to foo.age. Using parasitic for trivial
maps like these and removing the class scoped implicit encourages each method to
think about how to handle calls.

class FooService @Inject()(fooDAO: FooDAO) {
  def getAge(id: ID): Future[Int] = {
    fooDAO.get(id).flatMap { foo =>
      foo.age
    }(ExecutionContext.parasitic)
  }
}


However, you must be careful about how much work you're doing in the flatMap.
The DAO will probably be using a ThreadPoolExecutor internally to manage queries
to the database. If you use parasitic to piggyback a flatMap and are tying up
the thread with complex logic, then you are taking resources away from the DAO's
internal thread pool and also losing out on the work-stealing advantages you
would gain by using a fork/join pool,

class FooService @Inject()(fooDAO: FooDAO) {
  def unsafeMethod(id: ID): Future[Int] = {
    fooDAO.get(id).flatMap { foo =>
      complexAnalytics(foo) // potentially long-running CPU-bound work
    }(ExecutionContext.parasitic) // unsafe to use parasitic here
  }
}


Similarly, you should only use parasitic when you control the executing block's
logic. You should not use parasitic where you have a function or call-by-name
parameter, for example, as these could block.

def unsafe(f: String => Int): Future[Int] = {
  // we don't control execution here, so technically f could block or be expensive.
  internalFuture().flatMap(s => f(s))(ExecutionContext.parasitic)
}


In this situation, you should be passing in an execution context as an implicit,
as only the caller will know what the function does.


MANAGING EXECUTORS

One problem with ExecutionContext is that it is a blank sheet of paper. There's
nothing to indicate what executor is actually at work, or when it's time to
switch to a different executor.

One useful technique in managing executors is to associate them with strongly
typed execution contexts. For example, Play has a CustomExecutionContext that
can be extended with your own custom types.

class DatabaseExecutionContext @Inject()(system: ActorSystem) extends CustomExecutionContext(system, "database-dispatcher")

abstract class CustomExecutionContext(system: ActorSystem, name: String) extends ExecutionContextExecutor {
  private val dispatcher: MessageDispatcher = system.dispatchers.lookup(name)

  override def execute(command: Runnable) = dispatcher.execute(command)

  override def reportFailure(cause: Throwable) = dispatcher.reportFailure(cause)
}

class DatabaseService @Inject()(implicit executionContext: DatabaseExecutionContext) {
  // ...
}


This is not only useful in making sure that you know what execution context
you're looking at, but it also helps in the logs, because the thread name will
show up as database-dispatcher-1.

Rather than using an implicit parameter, you can use singletons, and use
ArchUnit to enforce your ExecutionContext style.


BATCHINGDISPATCHER AND EXECUTIONCONTEXT.OPPORTUNISTIC

In addition to ExecutionContext.global and ExecutionContext.parasitic, there's
also ExecutionContext.opportunistic, although you'll need to do some work to dig
it out.

The official documentation for opportunistic is attached to the global scaladoc
under the "Batching short-lived nested tasks" section. You can read the source
code if it's easier.

The documentation primarily consists of the following:

> ExecutionContext.opportunistic uses the same thread pool as
> ExecutionContext.global. It attempts to batch nested task and execute them on
> the same thread as the enclosing task. This is ideally suited to execute
> short-lived tasks as it reduces the overhead of context switching.

There's a story attached to why opportunistic is not public.

The story begins with the pull request Make the global EC a BatchedExecutor
(performance). This PR made the global execution context implement
BatchingExecutor, which is what implements this batching behavior.

Then, it was discovered that Nested Future blocks do not run in parallel in
Scala 2.13.x, and it was A Thing.

This was unpopular, and the behavior was reverted in Revert
ExecutionContext.global to not be a BatchingExecutor. Then
ExecutionContext.opportunistic was added as a fallback for the batching
behavior, but kept as private[scala] so it wouldn't break binary compatibility.
And it remains in that state to this day.

ExecutionContext.opportunistic represents a middle ground between global and
parasitic where the execution will use a thread if it has to, but then try to
keep everything on that thread. This is similar to the behavior you get if you
use an Akka actor's context.dispatcher, although they are still distinct
implementations.

object Exercise5 {
  val opportunistic: scala.concurrent.ExecutionContext =
    (scala.concurrent.ExecutionContext: {def opportunistic: scala.concurrent.ExecutionContextExecutor}
      ).opportunistic

  private val logger = org.slf4j.LoggerFactory.getLogger(getClass)

  def slow(key: String): Future[String] = Future {
    logger.info(s"$key start")
    Thread.sleep(1000)
    logger.info(s"$key end")
    key
  }(opportunistic)

  def runAsyncSerial(): Future[Seq[String]] = {
    implicit val ec = opportunistic
    slow("A").flatMap { a =>
      Future.sequence(Seq(slow("B"), slow("C"), slow("D")))
    }
  }

  def main(args: Array[String]): Unit = {
    val f = runAsyncSerial()
    val result = Await.result(f, Duration.Inf)
    logger.info(s"result = $result")
  }
}


This renders all the futures on a single thread.

17:52:10.651 example.Exercise5$ INFO  [scala-execution-context-global-21]: A start
17:52:11.653 example.Exercise5$ INFO  [scala-execution-context-global-21]: A end
17:52:11.667 example.Exercise5$ INFO  [scala-execution-context-global-21]: D start
17:52:12.668 example.Exercise5$ INFO  [scala-execution-context-global-21]: D end
17:52:12.669 example.Exercise5$ INFO  [scala-execution-context-global-21]: C start
17:52:13.669 example.Exercise5$ INFO  [scala-execution-context-global-21]: C end
17:52:13.670 example.Exercise5$ INFO  [scala-execution-context-global-21]: B start
17:52:14.671 example.Exercise5$ INFO  [scala-execution-context-global-21]: B end
17:52:14.677 example.Exercise5$ INFO  [main]: result = List(B, C, D)


Would I recommend using ExecutionContext.opportunistic? Well, no.

Using ExecutionContext.opportunistic implies using ExecutionContext.global. It
assumes that you do not have your own dispatchers lying around. It also relies
heavily on the correct usage of the blocking construct on any long-running
and/or blocking tasks.

 * If you are using Akka/Pekko (or Play), you'll leverage the default
   dispatcher, which already implements BatchingExecutor.
 * If you have defined your own execution contexts, you cannot apply
   scala.concurrent.BatchingExecutor trait to it as it is private. You cannot
   make your own opportunistic executor.
 * If you know you have long running or blocking tasks, it's easier to use a
   dedicated ThreadPoolExecutor over using the blocking construct – you can see
   the thread names and manage thread allocation.
 * If you have short-lived tasks, it's easier to use ExecutionContext.parasitic.

Given all of that, the only appropriate place I can see
ExecutionContext.opportunistic being used is in a situation where the
application has enough concurrency going on that context switching is a problem,
but also not enough design in managing concurrent tasks to define and isolate
different execution contexts, while also appropriately wrapping
long-running/blocking code in a blocking construct. While also not using
Akka/Pekko, or using one of the other concurrency management libraries like Cats
IO or Monix.

Older