Showing posts with label IO. Show all posts
Showing posts with label IO. Show all posts

Monday, 26 December 2011

Java Sequential IO Performance

Many applications record a series of events to file-based storage for later use.  This can be anything from logging and auditing, through to keeping a transaction redo log in an event sourced design or its close relative CQRS

Java has a number of means by which a file can be sequentially written to, or read back again.  This article explores some of these mechanisms to understand their performance characteristics.  For the scope of this article I will be using pre-allocated files because I want to focus on performance.  Constantly extending a file imposes a significant performance overhead and adds jitter to an application resulting in highly variable latency.  "Why is a pre-allocated file better performance?", I hear you ask.  Well, on disk a file is made up from a series of blocks/pages containing the data.  Firstly, it is important that these blocks are contiguous to provide fast sequential access.   Secondly, meta-data must be allocated to describe this file on disk and saved within the file-system.  A typical large file will have a number of "indirect" blocks allocated to describe the chain of data-blocks containing the file contents that make up part of this meta-data.   I'll leave it as an exercise for the reader, or maybe a later article, to explore the performance impact of not preallocating the data files.  If you have used a database you may have noticed that it preallocates the files it will require.

The Test

I want to experiment with 2 file sizes.  One that is sufficiently large to test sequential access, but can easily fit in the file-system cache, and another that is much larger so that the cache subsystem is forced to retire pages so that new ones can be loaded.  For these two cases I'll use 400MB and 8GB respectively.  I'll also loop over the files a number of times to show the pre and post warm-up characteristics.

I'll test 4 means of writing and reading back files sequentially:
  1. RandomAccessFile using a vanilla byte[] of page size.
  2. Buffered FileInputStream and FileOutputStream.
  3. NIO FileChannel with ByteBuffer of page size.
  4. Memory mapping a file using NIO and direct MappedByteBuffer.
The tests are run on a 2.0Ghz Sandy Bridge CPU with 8GB RAM, an Intel 320 SSD on Fedora Core 15 64-bit Linux with an ext4 file system, and Oracle JDK 1.6.0_30.

The Code
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

import static java.lang.Integer.MAX_VALUE;
import static java.lang.System.out;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
import static java.nio.channels.FileChannel.MapMode.READ_WRITE;

public final class TestSequentialIoPerf
{
    public static final int PAGE_SIZE = 1024 * 4;
    public static final long FILE_SIZE = PAGE_SIZE * 2000L * 1000L;
    public static final String FILE_NAME = "test.dat";
    public static final byte[] BLANK_PAGE = new byte[PAGE_SIZE];

    public static void main(final String[] arg) throws Exception
    {
        preallocateTestFile(FILE_NAME);

        for (final PerfTestCase testCase : testCases)
        {
            for (int i = 0; i < 5; i++)
            {
                System.gc();
                long writeDurationMs = testCase.test(PerfTestCase.Type.WRITE,
                                                     FILE_NAME);

                System.gc();
                long readDurationMs = testCase.test(PerfTestCase.Type.READ,
                                                    FILE_NAME);

                long bytesReadPerSec = (FILE_SIZE * 1000L) / readDurationMs;
                long bytesWrittenPerSec = (FILE_SIZE * 1000L) / writeDurationMs;

                out.format("%s\twrite=%,d\tread=%,d bytes/sec\n",
                           testCase.getName(),
                           bytesWrittenPerSec, bytesReadPerSec);
            }
        }

        deleteFile(FILE_NAME);
    }

    private static void preallocateTestFile(final String fileName)
        throws Exception
    {
        RandomAccessFile file = new RandomAccessFile(fileName, "rw");

        for (long i = 0; i < FILE_SIZE; i += PAGE_SIZE)
        {
            file.write(BLANK_PAGE, 0, PAGE_SIZE);
        }

        file.close();
    }

    private static void deleteFile(final String testFileName) throws Exception
    {
        File file = new File(testFileName);
        if (!file.delete())
        {
            out.println("Failed to delete test file=" + testFileName);
            out.println("Windows does not allow mapped files to be deleted.");
        }
    }

    public abstract static class PerfTestCase
    {
        public enum Type { READ, WRITE }

        private final String name;
        private int checkSum;

        public PerfTestCase(final String name)
        {
            this.name = name;
        }

        public String getName()
        {
            return name;
        }

        public long test(final Type type, final String fileName)
        {
            long start = System.currentTimeMillis();

            try
            {
                switch (type)
                {
                    case WRITE:
                    {
                        checkSum = testWrite(fileName);
                        break;
                    }

                    case READ:
                    {
                        final int checkSum = testRead(fileName);
                        if (checkSum != this.checkSum)
                        {
                            final String msg = getName() +
                                " expected=" + this.checkSum +
                                " got=" + checkSum;
                            throw new IllegalStateException(msg);
                        }
                        break;
                    }
                }
            }
            catch (Exception ex)
            {
                ex.printStackTrace();
            }

            return System.currentTimeMillis() - start;
        }

        public abstract int testWrite(final String fileName) throws Exception;
        public abstract int testRead(final String fileName) throws Exception;
    }

    private static PerfTestCase[] testCases =
    {
        new PerfTestCase("RandomAccessFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                RandomAccessFile file = new RandomAccessFile(fileName, "rw");
                final byte[] buffer = new byte[PAGE_SIZE];
                int pos = 0;
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    byte b = (byte)i;
                    checkSum += b;

                    buffer[pos++] = b;
                    if (PAGE_SIZE == pos)
                    {
                        file.write(buffer, 0, PAGE_SIZE);
                        pos = 0;
                    }
                }

                file.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                RandomAccessFile file = new RandomAccessFile(fileName, "r");
                final byte[] buffer = new byte[PAGE_SIZE];
                int checkSum = 0;
                int bytesRead;

                while (-1 != (bytesRead = file.read(buffer)))
                {
                    for (int i = 0; i < bytesRead; i++)
                    {
                        checkSum += buffer[i];
                    }
                }

                file.close();

                return checkSum;
            }
        },

        new PerfTestCase("BufferedStreamFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                int checkSum = 0;
                OutputStream out = 
                    new BufferedOutputStream(new FileOutputStream(fileName));

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    byte b = (byte)i;
                    checkSum += b;
                    out.write(b);
                }

                out.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                int checkSum = 0;
                InputStream in = 
                    new BufferedInputStream(new FileInputStream(fileName));

                int b;
                while (-1 != (b = in.read()))
                {
                    checkSum += (byte)b;
                }

                in.close();

                return checkSum;
            }
        },


        new PerfTestCase("BufferedChannelFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE);
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    byte b = (byte)i;
                    checkSum += b;
                    buffer.put(b);

                    if (!buffer.hasRemaining())
                    {
                        buffer.flip();
                        channel.write(buffer);
                        buffer.clear();
                    }
                }

                channel.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE);
                int checkSum = 0;

                while (-1 != (channel.read(buffer)))
                {
                    buffer.flip();

                    while (buffer.hasRemaining())
                    {
                        checkSum += buffer.get();
                    }

                    buffer.clear();
                }

                return checkSum;
            }
        },

        new PerfTestCase("MemoryMappedFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                MappedByteBuffer buffer = 
                    channel.map(READ_WRITE, 0,
                                Math.min(channel.size(), MAX_VALUE));
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    if (!buffer.hasRemaining())
                    {
                        buffer = 
                            channel.map(READ_WRITE, i,
                                        Math.min(channel.size() - i , MAX_VALUE));
                    }

                    byte b = (byte)i;
                    checkSum += b;
                    buffer.put(b);
                }

                channel.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                MappedByteBuffer buffer = 
                    channel.map(READ_ONLY, 0,
                                Math.min(channel.size(), MAX_VALUE));
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    if (!buffer.hasRemaining())
                    {
                        buffer = 
                            channel.map(READ_WRITE, i,
                                        Math.min(channel.size() - i , MAX_VALUE));
                    }

                    checkSum += buffer.get();
                }

                channel.close();

                return checkSum;
            }
        },
    };
}
Results

400MB file
===========
RandomAccessFile    write=379,610,750   read=1,452,482,269 bytes/sec
RandomAccessFile    write=294,041,636   read=1,494,890,510 bytes/sec
RandomAccessFile    write=250,980,392   read=1,422,222,222 bytes/sec
RandomAccessFile    write=250,366,748   read=1,388,474,576 bytes/sec
RandomAccessFile    write=260,394,151   read=1,422,222,222 bytes/sec

BufferedStreamFile  write=98,178,331    read=286,433,566 bytes/sec
BufferedStreamFile  write=100,244,738   read=288,857,545 bytes/sec
BufferedStreamFile  write=82,948,562    read=154,100,827 bytes/sec
BufferedStreamFile  write=108,503,311   read=153,869,271 bytes/sec
BufferedStreamFile  write=113,055,478   read=152,608,047 bytes/sec

BufferedChannelFile write=228,443,948   read=356,173,913 bytes/sec
BufferedChannelFile write=265,629,053   read=374,063,926 bytes/sec
BufferedChannelFile write=223,825,136   read=1,539,849,624 bytes/sec
BufferedChannelFile write=232,992,036   read=1,539,849,624 bytes/sec
BufferedChannelFile write=212,779,220   read=1,534,082,397 bytes/sec

MemoryMappedFile    write=300,955,180   read=305,899,925 bytes/sec
MemoryMappedFile    write=313,149,847   read=310,538,286 bytes/sec
MemoryMappedFile    write=326,374,501   read=303,857,566 bytes/sec
MemoryMappedFile    write=327,680,000   read=304,535,315 bytes/sec
MemoryMappedFile    write=326,895,450   read=303,632,320 bytes/sec

8GB File
============
RandomAccessFile    write=167,402,321   read=251,922,012 bytes/sec
RandomAccessFile    write=193,934,802   read=257,052,307 bytes/sec
RandomAccessFile    write=192,948,159   read=248,460,768 bytes/sec
RandomAccessFile    write=191,814,180   read=245,225,408 bytes/sec
RandomAccessFile    write=190,635,762   read=275,315,073 bytes/sec

BufferedStreamFile  write=154,823,102   read=248,355,313 bytes/sec
BufferedStreamFile  write=152,083,913   read=253,418,301 bytes/sec
BufferedStreamFile  write=133,099,369   read=146,056,197 bytes/sec
BufferedStreamFile  write=131,065,708   read=146,217,827 bytes/sec
BufferedStreamFile  write=132,694,052   read=148,116,004 bytes/sec

BufferedChannelFile write=186,703,740   read=215,075,218 bytes/sec
BufferedChannelFile write=190,591,410   read=211,030,680 bytes/sec
BufferedChannelFile write=187,220,038   read=223,087,606 bytes/sec
BufferedChannelFile write=191,585,397   read=221,297,747 bytes/sec
BufferedChannelFile write=192,653,214   read=211,789,038 bytes/sec

MemoryMappedFile    write=123,023,322   read=231,530,156 bytes/sec
MemoryMappedFile    write=121,961,023   read=230,403,600 bytes/sec
MemoryMappedFile    write=123,317,778   read=229,899,250 bytes/sec
MemoryMappedFile    write=121,472,738   read=231,739,745 bytes/sec
MemoryMappedFile    write=120,362,615   read=231,190,382 bytes/sec

Analysis

For years I was a big fan of using RandomAccessFile directly because of the control it gives and the predictable execution.  I never found using buffered streams to be useful from a performance perspective and this still seems to be the case.

In more recent testing I've found that using NIO FileChannel and ByteBuffer are doing much better. With Java 7 the flexibility of this programming approach has been improved for random access with SeekableByteChannel.

It seems that for reading RandomAccessFile and NIO do very well with Memory Mapped files winning for writes in some cases.

I've seen these results vary greatly depending on platform.  File system, OS, storage devices, and available memory all have a significant impact.  In a few cases I've seen memory-mapped files perform significantly better than the others but this needs to be tested on your platform because your mileage may vary...

A special note should be made for the use of memory-mapped large files when pushing for maximum throughput.  I've often found the OS can become unresponsive due the the pressure put on the virtual memory sub-system.

Conclusion

There is a significant difference in performance for the different means of doing sequential file IO from Java.  Not all methods are even remotely equal.  For most IO I've found the use of ByteBuffers and Channels to be the best optimised parts of the IO libraries.  If buffered streams are your IO libraries of choice, then it is worth branching out and and getting familiar with the implementations of Channel and Buffer or even falling back and using the good old RandomAccessFile.

Wednesday, 19 October 2011

Smart Batching

How often have we all heard that “batching” will increase latency?  As someone with a passion for low-latency systems this surprises me.  In my experience when batching is done correctly, not only does it increase throughput, it can also reduce average latency and keep it consistent.

Well then, how can batching magically reduce latency?  It comes down to what algorithm and data structures are employed.  In a distributed environment we are often having to batch up messages/events into network packets to achieve greater throughput.  We also employ similar techniques in buffering writes to storage to reduce the number of IOPS. That storage could be a block device backed file-system or a relational database.  Most IO devices can only handle a modest number of IO operations per second, so it is best to fill those operations efficiently.  Many approaches to batching involve waiting for a timeout to occur and this will by its very nature increase latency.  The batch can also get filled before the timeout occurs making the latency even more unpredictable.

Figure 1.

Figure 1. above depicts decoupling the access to an IO device, and therefore the contention for access to it, by introducing a queue like structure to stage the messages/events to be sent and a thread doing the batching for writing to the device.

The Algorithm

An approach to batching uses the following algorithm in Java pseudo code:
public final class NetworkBatcher
    implements Runnable
{
    private final NetworkFacade network;
    private final Queue<Message> queue;
    private final ByteBuffer buffer;

    public NetworkBatcher(final NetworkFacade network,
                          final int maxPacketSize,
                          final Queue<Message> queue)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
        this.queue = queue;
    }

    public void run()
    {
        while (!Thread.currentThread().isInterrupted())
        {
            while (null == queue.peek())
            {
                employWaitStrategy(); // block, spin, yield, etc.
            }

            Message msg;
            while (null != (msg = queue.poll()))
            {
                if (msg.size() > buffer.remaining())
                {
                    sendBuffer();
                }

                buffer.put(msg.getBytes());
            }

            sendBuffer();
        }
    }

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }
}

Basically, wait for data to become available and as soon as it is, send it right away.  While sending a previous message or waiting on new messages, a burst of traffic may arrive which can all be sent in a batch, up to the size of the buffer, to the underlying resource.  This approach can use ConcurrentLinkedQueue which provides low-latency and avoid locks.  However it has an issue in not creating back pressure to stall producing/publishing threads if they are outpacing the batcher whereby the queue could grow out of control because it is unbounded.  I’ve often had to wrap ConcurrentLinkedQueue to track its size and thus create back pressure.  This size tracking can add 50% to the processing cost of using this queue in my experience.

This algorithm respects the single writer principle and can often be employed when writing to a network or storage device, and thus avoid lock contention in third party API libraries.  By avoiding the contention we avoid the J-Curve latency profile normally associated with contention on resources, due to the queuing effect on locks.  With this algorithm, as load increases, latency stays constant until the underlying device is saturated with traffic resulting in a more "bathtub" profile than the J-Curve.

Let’s take a worked example of handling 10 messages that arrive as a burst of traffic.  In most systems traffic comes in bursts and is seldom uniformly spaced out in time.  One approach will assume no batching and the threads write to device API directly as in Figure 1. above.  The other will use a lock free data structure to collect the messages plus a single thread consuming messages in a loop as per the algorithm above.   For the example let’s assume it takes 100µs to write a single buffer to the network device as a synchronous operation and have it acknowledged.  The buffer will ideally be less than the MTU of the network in size when latency is critical.  Many network sub-systems are asynchronous and support pipelining but we will make the above assumption to clarify the example.  If the network operation is using a protocol like HTTP under REST or Web Services then this assumption matches the underlying implementation.

Best (µs)Average (µs)Worst (µs)Packets Sent
Serial1005001,00010
Smart Batching1001502001-2

The absolute lowest latency will be achieved if a message is sent from the thread originating the data directly to the resource, if the resource is un-contended.   The table above shows what happens when contention occurs and a queuing effect kicks in. With the serial approach 10 individual packets will have to be sent and these typically need to queue on a lock managing access to the resource, therefore they get processed sequentially.  The above figures assume the locking strategy works perfectly with no perceivable overhead which is unlikely in a real application.

For the batching solution it is likely all 10 packets will be picked up in first batch if the concurrent queue is efficient, thus giving the best case latency scenario.  In the worst case only one message is sent in the first batch with the other nine following in the next.  Therefore in the worst case scenario one message has a latency of 100µs and the following 9 have a latency of 200µs thus giving a worst case average of 190µs which is significantly better than the serial approach.

This is one good example when the simplest solution is just a bit too simple because of the contention.  The batching solution helps achieve consistent low-latency under burst conditions and is best for throughput.  It also has a nice effect across the network on the receiving end in that the receiver has to process fewer packets and therefore makes the communication more efficient both ends.

Most hardware handles data in buffers up to a fixed size for efficiency.  For a storage device this will typically be a 4KB block.  For networks this will be the MTU and is typically 1500 bytes for Ethernet.  When batching, it is best to understand the underlying hardware and write batches down in ideal buffer size to be optimally efficient.  However keep in mind that some devices need to envelope the data, e.g. the Ethernet and IP headers for network packets so the buffer needs to allow for this.

There will always be an increased latency from a thread switch and the cost of exchange via the data structure.  However there are a number of very good non-blocking structures available using lock-free techniques.  For the Disruptor this type of exchange can be achieved in as little as 50-100ns thus making the choice of taking the smart batching approach a no brainer for low-latency or high-throughput distributed systems.

This technique can be employed for many problems and not just IO.  The core of the Disruptor uses this technique to help rebalance the system when the publishers burst and outpace the EventProcessors.  The algorithm can be seen inside the BatchEventProcessor.

Note: For this algorithm to work the queueing structure must handle the contention better than the underlying resource.  Many queue implementations are extremely poor at managing contention.  Use science and measure before coming to a conclusion.

Batching with the Disruptor

The code below shows the same algorithm in action using the Disruptor's EventHandler mechanism.  In my experience, this is a very effective technique for handling any IO device efficiently and keeping latency low when dealing with load or burst traffic.
public final class NetworkBatchHandler
    implements EventHander<Message>
{
    private final NetworkFacade network;
    private final ByteBuffer buffer;

    public NetworkBatchHandler(final NetworkFacade network,
                               final int maxPacketSize)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
    }
    
    public void onEvent(Message msg, long sequence, boolean endOfBatch) 
        throws Exception
    {
        if (msg.size() > buffer.remaining())
        {
            sendBuffer();
        }

        buffer.put(msg.getBytes());
        
        if (endOfBatch)
        {
            sendBuffer();
        }
    } 

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }    
}
The endOfBatch parameter greatly simplifies the handling of the batch compared to the double loop in the algorithm above.

I have simplified the examples to illustrate the algorithm.  Clearly error handling and other edge conditions need to be considered.

Separation of IO from Work Processing

There is another very good reason to separate the IO from the threads doing the work processing.  Handing off the IO to another thread means the worker thread, or threads, can continue processing without blocking in a nice cache friendly manner. I've found this to be critical in achieving high-performance throughput.

If the underlying IO device or resource becomes briefly saturated then the messages can be queued for the batcher thread allowing the work processing threads to continue.  The batching thread then feeds the messages to the IO device in the most efficient way possible allowing the data structure to handle the burst and if full apply the necessary back pressure, thus providing a good separation of concerns in the workflow.

Conclusion

So there you have it.  Smart Batching can be employed in concert with the appropriate data structures to achieve consistent low-latency and maximum throughput.