An asynchronous lock free ring buffer for logging

03/01/2024

In this blog post, I showcase a very simple lock-free ring buffer for logging. I'll show why it's useful and how it works.

What is asynchronous lock-free logging?

Lock-free logging is a way that doesn't block the thread doing the logging. This is useful when you want to log a lot of information but don't want to block the thread doing the logging. Imagine a case where you have a burst of messages you want to log. And now you also have a few more threads that are doing logging. If you use a lock-based logging system, the threads will block each other. This means that the threads must wait for each other to finish logging. As you see, that is not your everyday use case - so while asynchronous lock-free logging is a thing, it's not something that you will use every day.

Ring buffer

A ring buffer is a data structure that stores data in a circular buffer. It's a very simple data structure that is used in many places. For example, it's used in the Linux kernel to store data in the network stack. The advantage is that we can build a lock-free writer and reader. There is a very good graphic from here:

img

Source: https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular

So, every message producer will write to the head of the ring buffer. And every consumer will read from the tail of the ring buffer. The head and the tail will move in a circular fashion. Of course, two threads at a time could try to write to the head of the ring buffer. That's why we need to use atomic operations to make sure that only one thread at a time can write to the head of the ring buffer. The same is true for the tail of the ring buffer. Here is a basic implementation:

public class LockFreeRingBuffer<T> where T : class
{
    private readonly T[] _buffer;
    private readonly int _capacity;
    private int _head;
    private int _tail;

    public LockFreeRingBuffer(int capacity)
    {
        _capacity = capacity;
        _buffer = new T[_capacity];
        _head = 0;
        _tail = 0;
    }

    public bool TryWrite(T value)
    {
        do
        {
            var currentTail = _tail;
            var nextTail = (currentTail + 1) % _capacity;

            // Check if the buffer is full
            if (nextTail == Volatile.Read(ref _head))
            {
                return false;
            }

            // Attempt to update the _tail index atomically
            if (Interlocked.CompareExchange(ref _tail, nextTail, currentTail) == currentTail)
            {
                _buffer[currentTail] = value;
                return true;
            }
        }
        while (true);
    }

    public bool TryRead(out T? value)
    {
        do
        {
            var currentHead = _head;
            if (currentHead == Volatile.Read(ref _tail))
            {
                value = default;
                return false;
            }

            // Attempt to update the _head index atomically
            var item = _buffer[currentHead];
            if (Interlocked.CompareExchange(ref _head, (currentHead + 1) % _capacity, currentHead) == currentHead)
            {
                value = item;
                return true;
            }
        }
        while (true);
    }
}

The atomicity is done via the Interlocked.CompareExchange method. This method will compare the value of the variable with the expected value. If two threads try to update the same variable simultaneously, then only one of them will succeed. The other one will have to try again.

Logging

Now, on top of that, we can build our logger. The logger will have a background thread that will read from the ring buffer and write to the console (but it could also be a file).

public sealed class AsyncLogger : IAsyncDisposable
{
    private readonly LockFreeRingBuffer<string> _ringBuffer;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly ManualResetEvent _newMessageEvent;
    private readonly Task _logProcessorTask;
    private bool _disposed;

    public AsyncLogger()
    {
        _ringBuffer = new LockFreeRingBuffer<string>(2);
        _cancellationTokenSource = new CancellationTokenSource();
        _newMessageEvent = new ManualResetEvent(false);
        _logProcessorTask = Task.Run(ProcessLogs);
    }

    public void Log(string message)
    {
        ObjectDisposedException.ThrowIf(_disposed, this);

        while (!_ringBuffer.TryWrite(message))
        {
            // Handle buffer being full, e.g., wait, retry, or drop the message.
        }

        _newMessageEvent.Set();
    }

    private void ProcessLogs()
    {
        while (!_cancellationTokenSource.IsCancellationRequested)
        {
            _newMessageEvent.WaitOne();
            ProcessAllAvailableMessages();
            _newMessageEvent.Reset();
        }

        // Final flush of all messages before exiting
        ProcessAllAvailableMessages();
    }

    private void ProcessAllAvailableMessages()
    {
        while (_ringBuffer.TryRead(out var logMessage))
        {
            // Process the log message
            Console.WriteLine(logMessage);
        }
    }

    public async ValueTask DisposeAsync()
    {
        await _cancellationTokenSource.CancelAsync();
        _newMessageEvent.Set(); // Ensure the log processing task wakes up to process remaining messages
        await _logProcessorTask;
        _cancellationTokenSource.Dispose();
        _newMessageEvent.Close();

        _disposed = true;
    }
}

The Log method will write to the ring buffer. The ProcessLogs method will read from the ring buffer and write to the console. We utilize the ManualResetEvent to signal that there is a new message in the ring buffer. The ProcessLogs method will wait until the ManualResetEvent is set. Then it will read all the messages from the ring buffer and write them to the console. The DisposeAsync method will cancel the background task and wait for it to finish.

The usage is then very simple:

await using var logger = new AsyncLogger();
logger.Log("Info: Application started");
logger.Log("Warning: Low memory");
logger.Log("Error: Out of memory");
logger.Log("Debug: Memory usage: 1.5 GB");
logger.Log("Info: Application stopped");

And that is it. It is a very, very simple way of achieving an asynchronous lock-free logger. The whole source code can, as always, be found in the resources.

Resources

  • Source code to this blog post: here
  • All my sample code is hosted in this repository: here
7
An error has occurred. This application may no longer respond until reloaded. Reload x