io.turbodsl: Asynchronous code in Kotlin made simple

A DSL-engine to turbo-charge your Kotlin development

Miguel T
TurboDSL

--

 - ..- .-. -... --- -.. ... .-.. - ..- .-. -... --- -.. ... .-..
___________ ______
___ ___/ __/ / ________ _____ ___
__ /__ __ _,___ / /_ ____ __ __ \ / ___/ / /
____ // / / // ___// __ \ / __ \ ____ / / / \ \ / /
__ // /_/ // / / (_/ // (_/ / __ /_/ /___\ \ / /___
_____/ \__,_//_/ /_,___/ \____/ _________//_____//______/

io.turbodsl
- ..- .-. -... --- -.. ... .-.. - ..- .-. -... --- -.. ... .-..

On any Kotlin application, working with asynchronous code is far easier than in Java. Even more, with coroutines and suspending functions it is a blast. But some challenges are found whenever you need to address several edge-cases: exceptions, retry-patterns, timeouts.

A simple example:

A task requires to execute 3 different jobs in parallel and process all three results.

Using pure Kotlin statements:

runBlocking(context = Dispatchers.IO) {
// Launch two coroutines with async
val job1 = async {
// Simulate a long-running task that will calculate some Int value
printTest("start job1")
delay(1_000L)
printTest("finish job1")
return@async 100
}
val job2 = async {
// Simulate a long-running task that will calculate some String value
printTest("start job2")
delay(3_000L)
printTest("finish job2")
return@async "test"
}
val job3 = async {
// Simulate a long-running task that will calculate some Boolean value
printTest("start job3")
delay(2_000L)
printTest("finish job3")
return@async true
}
// At this point, both job1 and job2 are started an executed
printTest("awaiting results")
// Wait for both results concurrently (non-blocking)
val value1 = job1.await()
printTest("job1 results ready")
val value2 = job2.await()
printTest("job2 results ready")
val value3 = job3.await()
printTest("job3 results ready")
printTest("all results ready - proceeding to final processing")

// Process results
// Simulate a long-running task that will process results and return a String value
printTest("start job4")
delay(2_000L)
printTest("finish job4")
printTest("Final: $value1 - $value2 - $value3")
}

This is quite simple but the problems arises when…

  • Each job could potentially take N seconds
  • Each job could potentially fail

Attempting to add logic to deal with such edge-cases will start to get convoluted — even worse: what if there are other tasks that use a similar pattern but with 4, 5, 6… N jobs running in parallel?

Yes, it can be done, but… io.turbodsl does it for you!

Using io.turbodsl statements:

io.turbodsl adds some wrappers around common Kotlin patterns to make everything much simpler by using DSL (Domain Specific Language) expressions:

TurboScope.execute<String> {
async(
job1 = asyncJob<Int>(name = "job1") {
// Simulate a long-running task that will calculate some Int value
printTest("start $name")
delay(1_000L)
printTest("finish $name")
100
},
job2 = asyncJob<String>(name = "job2") {
// Simulate a long-running task that will calculate some String value
printTest("start $name")
delay(3_000L)
printTest("finish $name")
"test"
},
job3 = asyncJob<Boolean>(name = "job3") {
// Simulate a long-running task that will calculate some Boolean value
printTest("start $name")
delay(2_000L)
printTest("finish $name")
true
}
) { ok, r1, r2, r3 ->
job(name = "job4", context = this) {
with(context) {
if (ok) {
val job1 = r1.success()
val job2 = r2.success()
val job3 = r3.success()
printTest("all results ready - proceeding to final processing")
// Simulate a long-running task that will process results and return a String value
printTest("start ${this@job.name}")
delay(2_000L)
printTest("finish ${this@job.name}")
"$job1 - $job2 - $job3"
} else {
"none"
}
}
}
}
}.let {
printTest("Final: $it")
}

This is a 1:1 implementation from the Kotlin code above.

How does this code work?

  1. TurboScope.execute allows to start writing all DSL expressions.
  2. async creates and asynchronous-scope where 3 asynchronous-jobs are executed.
  3. It triggers all 3 jobs at once and when all of them are completed, analyzes the results, executing the corresponding lambda, which is a result-scope.
  4. Flag ok indicates if any result r contains an error. If true, then it is safe to call success() on any r to access its value.

What else does io.turbodsl offer?

DSL expressions to make code much easier to write and maintain

  • Each DSL expression “creates” a scope
  • Group all related code into a job or nested jobs
  • Define async blocks receiving any number of asyncJobs
TurboDSL.execute<Unit> {
async(
job1 = asyncJob<Boolean> {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
job1 = asyncJob<Invoice>{..},
job2 = asyncJob<Customer>{..},
job3 = asuncJob<Discounts>{..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}

Timeout mechanisms

  • Any single scope supports a timeout limit
// Limit the whole execution within 2 seconds
TurboDSL.execute<Unit>(timeout = 2_000L) {
async(
// Limit condition processing to 1 second
job1 = asyncJob<Boolean>(timeout = 1_000L) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
job1 = asyncJob<Invoice>{..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
job3 = asuncJob<Discounts>{..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}

Initial delay

  • Any single scope supports an initial delay
// Limit the whole execution within 2 seconds
TurboDSL.execute<Unit>(timeout = 2_000L) {
async(
// Limit condition processing to 1 second
job1 = asyncJob<Boolean>(timeout = 1_000L) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
// `job1` will be delayed by 10millis
job1 = asyncJob<Invoice>(delay = 10L){..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
// `job3` will be delayed by 5millis
job3 = asuncJob<Discounts>(delay = 5L){..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}

3 different retry mechanisms

  • When failures are found while executing a scope, it could be retried N times
  • A delay can be added for each retry
  • When all the retries fail, a default value can be provided
  • A retry could be attempted only for timeouts, only for errors, or always (any error)
// Limit the whole execution within 2 seconds, retrying up to 2 more times
TurboDSL.execute<Unit>(
timeout = 2_000L,
retryMode = RuntimeScope.RetryMode.Always,
retry = 2,
) {
async(
// Limit condition processing to 1 second, retrying up to 3 more times
// whenever a timeout happens. If timeout continues, return false.
job1 = asyncJob<Boolean>(
timeout = 1_000L,
retryMode = RuntimeScope.RetryMode.OnTimeoutOnly,
retry = 3,
retryDefault = false,
) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
// `job1` will be delayed by 10millis
job1 = asyncJob<Invoice>(delay = 10L){..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
// `job3` will be delayed by 5millis and retried on errors up to
// 3 more times, with a delay of 10millis.
job3 = asuncJob<Discounts>(
delay = 5L,
retryMode = RuntimeScope.RetryMode.OnErrorOnly,
retry = 3,
retryDelay = 10L,
){..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}

3 different strategies when processing asynchronous jobs

  • Multiple jobs executing in parallel could have different results: success or failure
  • The parent scope executing all those jobs should wait for all the results.
  • All asynchronous results use AsyncResult which defines a strict hierarchy:
sealed class AsynResult<out T> {
sealed class Completed<out T>: AsyncResult<T> {
data class Success<out T> : Completed<T>
data class Failure : Completed<Nothing>
data object Cancelled : AsyncResult<Nothing>
}
  • Strategy CancelNone uses a coroutine supervisor-scope that will wait for each asynchronous job to be completed — the results are then collected an exposed to the parent as Failure or Success.
  • Strategy CancelFirst uses a standard coroutine-scope that will cancel any pending jobs as soon as an error is detected. Thus, results will be exposed as Failure, Success, or Cancelled
  • Strategy CancelAll uses a standard coroutine-scope that will mark as all jobs as Cancelled, except for the one that caused the error, marking it as a Failure.
// Limit the whole execution within 2 seconds, retrying up to 2 more times
TurboDSL.execute<Unit>(
timeout = 2_000L,
retryMode = RuntimeScope.RetryMode.Always,
retry = 2,
) {
async(
// Limit condition processing to 1 second, retrying up to 3 more times
// whenever a timeout happens. If timeout continues, return false.
// If at least one async-job fail, retry.
asyncMode = AsyncScope.AsyncMode.CancelAll,
job1 = asyncJob<Boolean>(
timeout = 1_000L,
retryMode = RuntimeScope.RetryMode.OnTimeoutOnly,
retry = 3,
retryDefault = false,
) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
// `job1` will be delayed by 10millis
job1 = asyncJob<Invoice>(delay = 10L){..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
// `job3` will be delayed by 5millis and retried on errors up to
// 3 more times, with a delay of 10millis.
job3 = asuncJob<Discounts>(
delay = 5L,
retryMode = RuntimeScope.RetryMode.OnErrorOnly,
retry = 3,
retryDelay = 10L,
){..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}

Other features:

  • Any scope can receive a CoroutineContext — defaults to Dispatchers.IO
  • A context can be specified within any scope to be used as a input paramater
  • Support for generics for context and return-values
  • Each scope can be defined with a custom name, for debugging / logging
  • A pluggable logging framework

io.turbodsl is not a replacement for Kotlin coroutines, but just a complement to simplify your codebase.

To include io.turbodsl into your project:

// You must include githubpackages as a maven-repo
repositories {
mavenCentral()
// or Github packages
maven {
name = "GitHubPackages"
url = uri("https://maven.pkg.github.com/migueltt/io-turbodsl")
credentials { ... }
}
:
}
dependencies {
implementation("io-turbodsl:io-turbodsl-core:+")
}

For more details, check https://www.turbodsl.io

--

--