Structured Concurrency in C#

11/10/2023
C#.NETTask

Did you ever hear about "Structured Concurrency"? If not, this article is for you. We will discover what it is, why it is useful, and what it could look like in C#.

What is Structured Concurrency?

Structured Concurrency is a concept that helps to write more robust and maintainable asynchronous code. It is a concept that is not new, but it is not widely known. It is used in languages like Kotlin, Swift, and Python. The idea is to have a way to group asynchronous operations together and to make sure that all operations are completed before the group is completed. This is done by using a special construct called a Task Scope. A Task Scope is a construct that is similar to a try-catch-finally block. It is used to group asynchronous operations together and to make sure that all operations are completed before the group is completed.

Here a very easy image to grasp how it would look like from a flow-perspective:

Flow

How would such a Task Scope look like? Let's have a look at an example in Kotlin:

public class TaskScope
{
    private readonly CancellationTokenSource _cts = new();
    private readonly ConcurrentBag<Task> _tasks = new();

    private TaskScope() { }

    public static async Task Create(Func<TaskScope, Task> action)
    {
        await using var scope = new TaskScope();
        await action(scope);
        await scope.WaitForAll();
    }
    
    public static async Task Create(Action<TaskScope> action)
    {
        await using var scope = new TaskScope();
        action(scope);
        await scope.WaitForAll();
    }

    public async ValueTask DisposeAsync()
    {
        _cts.Cancel();
        await WaitForAll();
    }

    public Task Run(Func<CancellationToken, Task> action)
    {
        var task = Task.Run(async () =>
        {
            try
            {
                await action(_cts.Token);
            }
            catch (Exception ex) when (ex is not OperationCanceledException)
            {
                _cts.Cancel();
                throw;
            }
        });

        _tasks.Add(task);
        return task;
    }

    private async Task WaitForAll()
    {
        try
        {
            await Task.WhenAll(_tasks.ToArray());
        }
        catch (Exception ex) when (ex is not OperationCanceledException)
        {
            throw;
        }
    }
}

Obviously, this implementation is super simplistic and only covers the cases I wanna showcase here. The idea is that we define our scopes inside the Create method. So everything inside Create will be executed in a scope. So they belong logically together - if one exception is thrown from one of them, we cancel all Tasks alltogether. So how can we use this:

var sw = Stopwatch.StartNew();
var tasks = TaskScope.Create(group => 
{
    group.Run(async token => await Task.Delay(100, token));
    group.Run(async token => await Task.Delay(200, token));
});     

// This runs 200 ms
await tasks;
Console.WriteLine($"Elapsed: {sw.ElapsedMilliseconds} ms");

sw.Restart();
var failedTasks = TaskScope.Create(group =>
{
    group.Run(async token =>
    {
        await Task.Delay(100, token);
        throw new Exception("Boooommm!!!");
    });
    group.Run(async token => await Task.Delay(1000, token));
});

// Runs 100 ms as the first task fails and cancels the rest.
// Also bubbles up the exception.
try
{
    await failedTasks;
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
}

// This runs 100 ms
Console.WriteLine($"Elapsed: {sw.ElapsedMilliseconds} ms");

// Prints True
Console.WriteLine(failedTasks.IsFaulted);

As you can see, we can group tasks together and make sure that they are all completed before the scope is completed. If one of the tasks fails, all other tasks are canceled and the exception is bubbled up. This is a very powerful concept that helps to write more robust and maintainable asynchronous code. A more sophisticated implementation can be found here.

Resources

5
An error has occurred. This application may no longer respond until reloaded. Reload x