Wednesday, 19 October 2011

Smart Batching

How often have we all heard that “batching” will increase latency?  As someone with a passion for low-latency systems this surprises me.  In my experience when batching is done correctly, not only does it increase throughput, it can also reduce average latency and keep it consistent.

Well then, how can batching magically reduce latency?  It comes down to what algorithm and data structures are employed.  In a distributed environment we are often having to batch up messages/events into network packets to achieve greater throughput.  We also employ similar techniques in buffering writes to storage to reduce the number of IOPS. That storage could be a block device backed file-system or a relational database.  Most IO devices can only handle a modest number of IO operations per second, so it is best to fill those operations efficiently.  Many approaches to batching involve waiting for a timeout to occur and this will by its very nature increase latency.  The batch can also get filled before the timeout occurs making the latency even more unpredictable.

Figure 1.

Figure 1. above depicts decoupling the access to an IO device, and therefore the contention for access to it, by introducing a queue like structure to stage the messages/events to be sent and a thread doing the batching for writing to the device.

The Algorithm

An approach to batching uses the following algorithm in Java pseudo code:
public final class NetworkBatcher
    implements Runnable
{
    private final NetworkFacade network;
    private final Queue<Message> queue;
    private final ByteBuffer buffer;

    public NetworkBatcher(final NetworkFacade network,
                          final int maxPacketSize,
                          final Queue<Message> queue)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
        this.queue = queue;
    }

    public void run()
    {
        while (!Thread.currentThread().isInterrupted())
        {
            while (null == queue.peek())
            {
                employWaitStrategy(); // block, spin, yield, etc.
            }

            Message msg;
            while (null != (msg = queue.poll()))
            {
                if (msg.size() > buffer.remaining())
                {
                    sendBuffer();
                }

                buffer.put(msg.getBytes());
            }

            sendBuffer();
        }
    }

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }
}

Basically, wait for data to become available and as soon as it is, send it right away.  While sending a previous message or waiting on new messages, a burst of traffic may arrive which can all be sent in a batch, up to the size of the buffer, to the underlying resource.  This approach can use ConcurrentLinkedQueue which provides low-latency and avoid locks.  However it has an issue in not creating back pressure to stall producing/publishing threads if they are outpacing the batcher whereby the queue could grow out of control because it is unbounded.  I’ve often had to wrap ConcurrentLinkedQueue to track its size and thus create back pressure.  This size tracking can add 50% to the processing cost of using this queue in my experience.

This algorithm respects the single writer principle and can often be employed when writing to a network or storage device, and thus avoid lock contention in third party API libraries.  By avoiding the contention we avoid the J-Curve latency profile normally associated with contention on resources, due to the queuing effect on locks.  With this algorithm, as load increases, latency stays constant until the underlying device is saturated with traffic resulting in a more "bathtub" profile than the J-Curve.

Let’s take a worked example of handling 10 messages that arrive as a burst of traffic.  In most systems traffic comes in bursts and is seldom uniformly spaced out in time.  One approach will assume no batching and the threads write to device API directly as in Figure 1. above.  The other will use a lock free data structure to collect the messages plus a single thread consuming messages in a loop as per the algorithm above.   For the example let’s assume it takes 100µs to write a single buffer to the network device as a synchronous operation and have it acknowledged.  The buffer will ideally be less than the MTU of the network in size when latency is critical.  Many network sub-systems are asynchronous and support pipelining but we will make the above assumption to clarify the example.  If the network operation is using a protocol like HTTP under REST or Web Services then this assumption matches the underlying implementation.

Best (µs)Average (µs)Worst (µs)Packets Sent
Serial1005001,00010
Smart Batching1001502001-2

The absolute lowest latency will be achieved if a message is sent from the thread originating the data directly to the resource, if the resource is un-contended.   The table above shows what happens when contention occurs and a queuing effect kicks in. With the serial approach 10 individual packets will have to be sent and these typically need to queue on a lock managing access to the resource, therefore they get processed sequentially.  The above figures assume the locking strategy works perfectly with no perceivable overhead which is unlikely in a real application.

For the batching solution it is likely all 10 packets will be picked up in first batch if the concurrent queue is efficient, thus giving the best case latency scenario.  In the worst case only one message is sent in the first batch with the other nine following in the next.  Therefore in the worst case scenario one message has a latency of 100µs and the following 9 have a latency of 200µs thus giving a worst case average of 190µs which is significantly better than the serial approach.

This is one good example when the simplest solution is just a bit too simple because of the contention.  The batching solution helps achieve consistent low-latency under burst conditions and is best for throughput.  It also has a nice effect across the network on the receiving end in that the receiver has to process fewer packets and therefore makes the communication more efficient both ends.

Most hardware handles data in buffers up to a fixed size for efficiency.  For a storage device this will typically be a 4KB block.  For networks this will be the MTU and is typically 1500 bytes for Ethernet.  When batching, it is best to understand the underlying hardware and write batches down in ideal buffer size to be optimally efficient.  However keep in mind that some devices need to envelope the data, e.g. the Ethernet and IP headers for network packets so the buffer needs to allow for this.

There will always be an increased latency from a thread switch and the cost of exchange via the data structure.  However there are a number of very good non-blocking structures available using lock-free techniques.  For the Disruptor this type of exchange can be achieved in as little as 50-100ns thus making the choice of taking the smart batching approach a no brainer for low-latency or high-throughput distributed systems.

This technique can be employed for many problems and not just IO.  The core of the Disruptor uses this technique to help rebalance the system when the publishers burst and outpace the EventProcessors.  The algorithm can be seen inside the BatchEventProcessor.

Note: For this algorithm to work the queueing structure must handle the contention better than the underlying resource.  Many queue implementations are extremely poor at managing contention.  Use science and measure before coming to a conclusion.

Batching with the Disruptor

The code below shows the same algorithm in action using the Disruptor's EventHandler mechanism.  In my experience, this is a very effective technique for handling any IO device efficiently and keeping latency low when dealing with load or burst traffic.
public final class NetworkBatchHandler
    implements EventHander<Message>
{
    private final NetworkFacade network;
    private final ByteBuffer buffer;

    public NetworkBatchHandler(final NetworkFacade network,
                               final int maxPacketSize)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
    }
    
    public void onEvent(Message msg, long sequence, boolean endOfBatch) 
        throws Exception
    {
        if (msg.size() > buffer.remaining())
        {
            sendBuffer();
        }

        buffer.put(msg.getBytes());
        
        if (endOfBatch)
        {
            sendBuffer();
        }
    } 

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }    
}
The endOfBatch parameter greatly simplifies the handling of the batch compared to the double loop in the algorithm above.

I have simplified the examples to illustrate the algorithm.  Clearly error handling and other edge conditions need to be considered.

Separation of IO from Work Processing

There is another very good reason to separate the IO from the threads doing the work processing.  Handing off the IO to another thread means the worker thread, or threads, can continue processing without blocking in a nice cache friendly manner. I've found this to be critical in achieving high-performance throughput.

If the underlying IO device or resource becomes briefly saturated then the messages can be queued for the batcher thread allowing the work processing threads to continue.  The batching thread then feeds the messages to the IO device in the most efficient way possible allowing the data structure to handle the burst and if full apply the necessary back pressure, thus providing a good separation of concerns in the workflow.

Conclusion

So there you have it.  Smart Batching can be employed in concert with the appropriate data structures to achieve consistent low-latency and maximum throughput.

20 comments:

  1. (unrelated to this blog post)
    Is there any news on the LMAX collections library that was mentioned in the InfoQ presentation?

    ReplyDelete
  2. I have one question that may be related to batching:
    Assume we want to write 1M bytes to a tcp socket:
    method1: split it into small packets with the size close to MTU and send them one by one.
    method2: try to send the whole buffer and let API figure out how much can be sent.
    Which method is better?
    How about writing 1M bytes to disk?

    ReplyDelete
  3. Taras,

    LMAX currently have no plans to open source its collections library.

    ReplyDelete
  4. Xin,

    The advantages from batching come when the messages are smaller than the MTU or block size. If greater than MTU or block size then it is best to let the library just write as normal.

    ReplyDelete
  5. Xin,

    I also meant to point out that even if your messages are larger than the block/MTU size you may still want to use this technique to avoid contention and allow the work thread to continue processing without waiting on the IO device.

    ReplyDelete
  6. Why would rolling your own wait strategy with CLQ do better than LinkedBlockingQueue which designs it in?

    ReplyDelete
    Replies
    1. Not sure I'm answering the right question here but what I think you are asking I'll have a go at. LBQ has only a wait strategy using locks which causes contention and huge latencies when the OS is used to signal on the condition variables. Rolling your own wait strategy is more development work but can scale significantly better and in some cases achieve lower latencies. Wait strategies are often a conflated concern I like to factor out in a design.

      Delete
  7. Wouldn't it be better for the network writer thread to round robin from one ring-buffer per producer instead of having them contend on a single ring-buffer?

    ReplyDelete
    Replies
    1. If your single buffer is well designed and lock-free it tends to not to be an issue for contention. It can also lend itself better than bulk sequential reads whereas multiple buffer would require scatter/gather operations.

      I also find a single buffer allows for a more predictable memory footprint.

      To key to all this is having a good lock-free buffer implementation.

      Delete
  8. Hi Martin,

    I have a use-case for something like this in an Akka system. Are you aware of anyone else who's tried this (in Akka) before?

    (I've posted on the Akka user group too!)

    ReplyDelete
    Replies
    1. For the thread and replies on the Akka user group see this thread and links therein: https://groups.google.com/d/msg/akka-user/P0PMxj5zhwM/vv51gncjwTAJ

      Delete
  9. I'm going through every entry on this blog and it seems this strategy seems to conflict with the recommendation to use asynchronous I/O in other places. Wouldn't we be forced to stick to synchronous I/O here? That way if there is an error in the network batcher, we have consumed the next events from the disruptor? I'm also thinking of the recommendation to use asynchronous database drivers here too.

    This would allow to retry without too much effort.

    ReplyDelete
    Replies
    1. This technique is very useful when dealing with synchronous IO and "hiding" it.

      To deal with asynchronous IO and handling failure it is better to have higher level application replay. When it comes to distributed systems I just expect all remote calls to fail occasionally and be able to cope with it as a first class design principle.

      I either case it allows for very fast non-blocking interactions and the ability to use the IO efficiently by batching.

      Delete
    2. For the replay to work, reporting on success or failure of operations is needed. When it comes to the disruptor, it's still not clear to me how you report this. Would a second disruptor be used to signal what succeeded or failed back to the higher level components?

      Equally unclear is what if sequences of data have to be honored, sub-streams within the stream. In such a scenario an occasional failure I worry I would send later data first.

      Delete
    3. The Disruptor is just a data structure, not a complete system :-)

      Let's consider what can go wrong.

      If the IO fails it is usually something very bad and cannot be retried, e.g. hardware failure, disk full, etc. So you better have a resilient system and fail over to a secondary or be running multi-master. If your storage is remote NFS then you have asked for a world of hurt.

      If something can be retried, such as a when a receiving network node was restarting, then things need to be replayed. With the Disruptor this can be achieved with a gating Sequence that keeps a reserve. However what if you go beyond the reserve? For this you need a persistent log to replay from. Even better if you are replaying messages containing CRDTs so ordering does not matter. If sequence matters then the receiver needs to be able to cope with idempotent delivery.

      In a distributed world you need a higher level protocol to report on the success or failure of operations. The network, sync or async sends, cannot alone provide that. What if a failure happens before you receive the reply from a sync call? The world is really async in hardware and the lower layers.

      Delete
    4. I know it's not a complete system and that's the reason I'm asking these questions. Looking at articles and wikis on it, there aren't that many recommendations on how to best interact between the disruptor and other systems.

      The reason I was pointing out asynchronous I/O and errors is I'm not sure how to keep this reserve in the first place. A sequence immediately advances upon return from the event handler. If I need to retry it seems the best way so far to keep this reserve is to make sure the event handler does not return before it's successful or it has exhausted the number of retries.

      The other thing I can do with these sequences is to chain event handler B on the result of A, etc. Again, it's still not clear to me how a number of retries could be modelled like this.

      Looking around at the code, I see there is a polling consumer that manipulates the sequences directly and calls another event handler itself. If nothing else, it does seem to indicate it's possible to do creative things here.

      Are you saying this cursor, the sequence, can be stopped from advancing under certain conditions?

      Delete
    5. It is possible to take control of the cursor and sequence advancement with the Disruptor. I do not contribute to, or support, this project any longer. To get advice I'd recommend you ask in the Disruptor Google Group.

      https://groups.google.com/forum/#!forum/lmax-disruptor

      These days I use more flexible and higher performance data structures for such requirements. Examples can be found in the Aeron messaging system.

      https://github.com/real-logic/Aeron

      Delete
    6. I apologize if this went too far in the direction of support for the disruptor. I was trying to best understand the smart batch implementation you highlighted and got carried away.

      I'm in the process of reading Agrona, I'll get to Aeron eventually.

      Again, thanks for your time, it's really appreciated.

      Delete
  10. Amazing article.
    I have a question: in this article you are talking about IO.
    I was thinking about other situation: for example, imagine if you have a REST api that need to store some data in a database (very simple).
    Usually, you'll find a rest service working in sync storing row by row. I made some tests and basically collecting in async all requests in a queue and doing just one batch update on database improves strongly the performance (let's say 10x on my dumb MacOs).

    What do you think about using this in the scenario I mentioned?
    I find very difficult to pass this idea, because all devs seem in love with "sync approach"

    ReplyDelete
    Replies
    1. This is a good technique for increasing throughput to a DB, especially when doing inserts.

      Delete