io.turbodsl: Asynchronous code in Kotlin made simple
A DSL-engine to turbo-charge your Kotlin development
- ..- .-. -... --- -.. ... .-.. - ..- .-. -... --- -.. ... .-..
___________ ______
___ ___/ __/ / ________ _____ ___
__ /__ __ _,___ / /_ ____ __ __ \ / ___/ / /
____ // / / // ___// __ \ / __ \ ____ / / / \ \ / /
__ // /_/ // / / (_/ // (_/ / __ /_/ /___\ \ / /___
_____/ \__,_//_/ /_,___/ \____/ _________//_____//______/
io.turbodsl
- ..- .-. -... --- -.. ... .-.. - ..- .-. -... --- -.. ... .-..
On any Kotlin application, working with asynchronous code is far easier than in Java. Even more, with coroutines and suspending functions it is a blast. But some challenges are found whenever you need to address several edge-cases: exceptions, retry-patterns, timeouts.
A simple example:
A task requires to execute 3 different jobs in parallel and process all three results.
Using pure Kotlin statements:
runBlocking(context = Dispatchers.IO) {
// Launch two coroutines with async
val job1 = async {
// Simulate a long-running task that will calculate some Int value
printTest("start job1")
delay(1_000L)
printTest("finish job1")
return@async 100
}
val job2 = async {
// Simulate a long-running task that will calculate some String value
printTest("start job2")
delay(3_000L)
printTest("finish job2")
return@async "test"
}
val job3 = async {
// Simulate a long-running task that will calculate some Boolean value
printTest("start job3")
delay(2_000L)
printTest("finish job3")
return@async true
}
// At this point, both job1 and job2 are started an executed
printTest("awaiting results")
// Wait for both results concurrently (non-blocking)
val value1 = job1.await()
printTest("job1 results ready")
val value2 = job2.await()
printTest("job2 results ready")
val value3 = job3.await()
printTest("job3 results ready")
printTest("all results ready - proceeding to final processing")
// Process results
// Simulate a long-running task that will process results and return a String value
printTest("start job4")
delay(2_000L)
printTest("finish job4")
printTest("Final: $value1 - $value2 - $value3")
}
This is quite simple but the problems arises when…
- Each job could potentially take N seconds
- Each job could potentially fail
Attempting to add logic to deal with such edge-cases will start to get convoluted — even worse: what if there are other tasks that use a similar pattern but with 4, 5, 6… N jobs running in parallel?
Yes, it can be done, but… io.turbodsl
does it for you!
Using io.turbodsl statements:
io.turbodsl
adds some wrappers around common Kotlin patterns to make everything much simpler by using DSL (Domain Specific Language) expressions:
TurboScope.execute<String> {
async(
job1 = asyncJob<Int>(name = "job1") {
// Simulate a long-running task that will calculate some Int value
printTest("start $name")
delay(1_000L)
printTest("finish $name")
100
},
job2 = asyncJob<String>(name = "job2") {
// Simulate a long-running task that will calculate some String value
printTest("start $name")
delay(3_000L)
printTest("finish $name")
"test"
},
job3 = asyncJob<Boolean>(name = "job3") {
// Simulate a long-running task that will calculate some Boolean value
printTest("start $name")
delay(2_000L)
printTest("finish $name")
true
}
) { ok, r1, r2, r3 ->
job(name = "job4", context = this) {
with(context) {
if (ok) {
val job1 = r1.success()
val job2 = r2.success()
val job3 = r3.success()
printTest("all results ready - proceeding to final processing")
// Simulate a long-running task that will process results and return a String value
printTest("start ${this@job.name}")
delay(2_000L)
printTest("finish ${this@job.name}")
"$job1 - $job2 - $job3"
} else {
"none"
}
}
}
}
}.let {
printTest("Final: $it")
}
This is a 1:1 implementation from the Kotlin code above.
How does this code work?
TurboScope.execute
allows to start writing all DSL expressions.async
creates and asynchronous-scope where 3 asynchronous-jobs are executed.- It triggers all 3 jobs at once and when all of them are completed, analyzes the results, executing the corresponding lambda, which is a result-scope.
- Flag
ok
indicates if any resultr
contains an error. Iftrue
, then it is safe to callsuccess()
on anyr
to access its value.
What else does io.turbodsl
offer?
DSL expressions to make code much easier to write and maintain
- Each DSL expression “creates” a scope
- Group all related code into a
job
or nestedjob
s - Define
async
blocks receiving any number ofasyncJob
s
TurboDSL.execute<Unit> {
async(
job1 = asyncJob<Boolean> {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
job1 = asyncJob<Invoice>{..},
job2 = asyncJob<Customer>{..},
job3 = asuncJob<Discounts>{..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}
Timeout mechanisms
- Any single scope supports a timeout limit
// Limit the whole execution within 2 seconds
TurboDSL.execute<Unit>(timeout = 2_000L) {
async(
// Limit condition processing to 1 second
job1 = asyncJob<Boolean>(timeout = 1_000L) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
job1 = asyncJob<Invoice>{..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
job3 = asuncJob<Discounts>{..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}
Initial delay
- Any single scope supports an initial delay
// Limit the whole execution within 2 seconds
TurboDSL.execute<Unit>(timeout = 2_000L) {
async(
// Limit condition processing to 1 second
job1 = asyncJob<Boolean>(timeout = 1_000L) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
// `job1` will be delayed by 10millis
job1 = asyncJob<Invoice>(delay = 10L){..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
// `job3` will be delayed by 5millis
job3 = asuncJob<Discounts>(delay = 5L){..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}
3 different retry mechanisms
- When failures are found while executing a scope, it could be retried N times
- A delay can be added for each retry
- When all the retries fail, a default value can be provided
- A retry could be attempted only for timeouts, only for errors, or always (any error)
// Limit the whole execution within 2 seconds, retrying up to 2 more times
TurboDSL.execute<Unit>(
timeout = 2_000L,
retryMode = RuntimeScope.RetryMode.Always,
retry = 2,
) {
async(
// Limit condition processing to 1 second, retrying up to 3 more times
// whenever a timeout happens. If timeout continues, return false.
job1 = asyncJob<Boolean>(
timeout = 1_000L,
retryMode = RuntimeScope.RetryMode.OnTimeoutOnly,
retry = 3,
retryDefault = false,
) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
// `job1` will be delayed by 10millis
job1 = asyncJob<Invoice>(delay = 10L){..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
// `job3` will be delayed by 5millis and retried on errors up to
// 3 more times, with a delay of 10millis.
job3 = asuncJob<Discounts>(
delay = 5L,
retryMode = RuntimeScope.RetryMode.OnErrorOnly,
retry = 3,
retryDelay = 10L,
){..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}
3 different strategies when processing asynchronous jobs
- Multiple jobs executing in parallel could have different results: success or failure
- The parent scope executing all those jobs should wait for all the results.
- All asynchronous results use
AsyncResult
which defines a strict hierarchy:
sealed class AsynResult<out T> {
sealed class Completed<out T>: AsyncResult<T> {
data class Success<out T> : Completed<T>
data class Failure : Completed<Nothing>
data object Cancelled : AsyncResult<Nothing>
}
- Strategy
CancelNone
uses a coroutine supervisor-scope that will wait for each asynchronous job to be completed — the results are then collected an exposed to the parent asFailure
orSuccess
. - Strategy
CancelFirst
uses a standard coroutine-scope that will cancel any pending jobs as soon as an error is detected. Thus, results will be exposed asFailure
,Success
, orCancelled
- Strategy
CancelAll
uses a standard coroutine-scope that will mark as all jobs asCancelled
, except for the one that caused the error, marking it as aFailure
.
// Limit the whole execution within 2 seconds, retrying up to 2 more times
TurboDSL.execute<Unit>(
timeout = 2_000L,
retryMode = RuntimeScope.RetryMode.Always,
retry = 2,
) {
async(
// Limit condition processing to 1 second, retrying up to 3 more times
// whenever a timeout happens. If timeout continues, return false.
// If at least one async-job fail, retry.
asyncMode = AsyncScope.AsyncMode.CancelAll,
job1 = asyncJob<Boolean>(
timeout = 1_000L,
retryMode = RuntimeScope.RetryMode.OnTimeoutOnly,
retry = 3,
retryDefault = false,
) {
// Evaluate some condition that requires another job result
job<Rules>(context = job<Int> {..}){..}.last
},
job2 = asyncJob<BigDecimal> {
// Trigger multiple jobs to gather and summarize all the data
async(
// `job1` will be delayed by 10millis
job1 = asyncJob<Invoice>(delay = 10L){..},
// Limit customer processing to half a second
job2 = asyncJob<Customer>(timeout = 500L){..},
// `job3` will be delayed by 5millis and retried on errors up to
// 3 more times, with a delay of 10millis.
job3 = asuncJob<Discounts>(
delay = 5L,
retryMode = RuntimeScope.RetryMode.OnErrorOnly,
retry = 3,
retryDelay = 10L,
){..}
) { ok, r1, r2, r3 ->
if (ok) {
r1.success().total - r2.success().credit - r3.success().amount
} else {
BigDecimal.ZERO
}
}
}
) { ok, r1, r2 ->
if (ok) {
val condition = r1.success() // this is already a Boolean
val balance = r2.success() // this is already a BigDecimal
// do something with both results
:
}
}
}
Other features:
- Any scope can receive a
CoroutineContext
— defaults toDispatchers.IO
- A
context
can be specified within any scope to be used as a input paramater - Support for generics for
context
and return-values - Each scope can be defined with a custom name, for debugging / logging
- A pluggable logging framework
io.turbodsl
is not a replacement for Kotlin coroutines, but just a complement to simplify your codebase.
To include io.turbodsl
into your project:
// You must include githubpackages as a maven-repo
repositories {
mavenCentral()
// or Github packages
maven {
name = "GitHubPackages"
url = uri("https://maven.pkg.github.com/migueltt/io-turbodsl")
credentials { ... }
}
:
}
dependencies {
implementation("io-turbodsl:io-turbodsl-core:+")
}
For more details, check https://www.turbodsl.io