Monday, 26 December 2011

Java Sequential IO Performance

Many applications record a series of events to file-based storage for later use.  This can be anything from logging and auditing, through to keeping a transaction redo log in an event sourced design or its close relative CQRS

Java has a number of means by which a file can be sequentially written to, or read back again.  This article explores some of these mechanisms to understand their performance characteristics.  For the scope of this article I will be using pre-allocated files because I want to focus on performance.  Constantly extending a file imposes a significant performance overhead and adds jitter to an application resulting in highly variable latency.  "Why is a pre-allocated file better performance?", I hear you ask.  Well, on disk a file is made up from a series of blocks/pages containing the data.  Firstly, it is important that these blocks are contiguous to provide fast sequential access.   Secondly, meta-data must be allocated to describe this file on disk and saved within the file-system.  A typical large file will have a number of "indirect" blocks allocated to describe the chain of data-blocks containing the file contents that make up part of this meta-data.   I'll leave it as an exercise for the reader, or maybe a later article, to explore the performance impact of not preallocating the data files.  If you have used a database you may have noticed that it preallocates the files it will require.

The Test

I want to experiment with 2 file sizes.  One that is sufficiently large to test sequential access, but can easily fit in the file-system cache, and another that is much larger so that the cache subsystem is forced to retire pages so that new ones can be loaded.  For these two cases I'll use 400MB and 8GB respectively.  I'll also loop over the files a number of times to show the pre and post warm-up characteristics.

I'll test 4 means of writing and reading back files sequentially:
  1. RandomAccessFile using a vanilla byte[] of page size.
  2. Buffered FileInputStream and FileOutputStream.
  3. NIO FileChannel with ByteBuffer of page size.
  4. Memory mapping a file using NIO and direct MappedByteBuffer.
The tests are run on a 2.0Ghz Sandy Bridge CPU with 8GB RAM, an Intel 320 SSD on Fedora Core 15 64-bit Linux with an ext4 file system, and Oracle JDK 1.6.0_30.

The Code
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

import static java.lang.Integer.MAX_VALUE;
import static java.lang.System.out;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
import static java.nio.channels.FileChannel.MapMode.READ_WRITE;

public final class TestSequentialIoPerf
{
    public static final int PAGE_SIZE = 1024 * 4;
    public static final long FILE_SIZE = PAGE_SIZE * 2000L * 1000L;
    public static final String FILE_NAME = "test.dat";
    public static final byte[] BLANK_PAGE = new byte[PAGE_SIZE];

    public static void main(final String[] arg) throws Exception
    {
        preallocateTestFile(FILE_NAME);

        for (final PerfTestCase testCase : testCases)
        {
            for (int i = 0; i < 5; i++)
            {
                System.gc();
                long writeDurationMs = testCase.test(PerfTestCase.Type.WRITE,
                                                     FILE_NAME);

                System.gc();
                long readDurationMs = testCase.test(PerfTestCase.Type.READ,
                                                    FILE_NAME);

                long bytesReadPerSec = (FILE_SIZE * 1000L) / readDurationMs;
                long bytesWrittenPerSec = (FILE_SIZE * 1000L) / writeDurationMs;

                out.format("%s\twrite=%,d\tread=%,d bytes/sec\n",
                           testCase.getName(),
                           bytesWrittenPerSec, bytesReadPerSec);
            }
        }

        deleteFile(FILE_NAME);
    }

    private static void preallocateTestFile(final String fileName)
        throws Exception
    {
        RandomAccessFile file = new RandomAccessFile(fileName, "rw");

        for (long i = 0; i < FILE_SIZE; i += PAGE_SIZE)
        {
            file.write(BLANK_PAGE, 0, PAGE_SIZE);
        }

        file.close();
    }

    private static void deleteFile(final String testFileName) throws Exception
    {
        File file = new File(testFileName);
        if (!file.delete())
        {
            out.println("Failed to delete test file=" + testFileName);
            out.println("Windows does not allow mapped files to be deleted.");
        }
    }

    public abstract static class PerfTestCase
    {
        public enum Type { READ, WRITE }

        private final String name;
        private int checkSum;

        public PerfTestCase(final String name)
        {
            this.name = name;
        }

        public String getName()
        {
            return name;
        }

        public long test(final Type type, final String fileName)
        {
            long start = System.currentTimeMillis();

            try
            {
                switch (type)
                {
                    case WRITE:
                    {
                        checkSum = testWrite(fileName);
                        break;
                    }

                    case READ:
                    {
                        final int checkSum = testRead(fileName);
                        if (checkSum != this.checkSum)
                        {
                            final String msg = getName() +
                                " expected=" + this.checkSum +
                                " got=" + checkSum;
                            throw new IllegalStateException(msg);
                        }
                        break;
                    }
                }
            }
            catch (Exception ex)
            {
                ex.printStackTrace();
            }

            return System.currentTimeMillis() - start;
        }

        public abstract int testWrite(final String fileName) throws Exception;
        public abstract int testRead(final String fileName) throws Exception;
    }

    private static PerfTestCase[] testCases =
    {
        new PerfTestCase("RandomAccessFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                RandomAccessFile file = new RandomAccessFile(fileName, "rw");
                final byte[] buffer = new byte[PAGE_SIZE];
                int pos = 0;
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    byte b = (byte)i;
                    checkSum += b;

                    buffer[pos++] = b;
                    if (PAGE_SIZE == pos)
                    {
                        file.write(buffer, 0, PAGE_SIZE);
                        pos = 0;
                    }
                }

                file.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                RandomAccessFile file = new RandomAccessFile(fileName, "r");
                final byte[] buffer = new byte[PAGE_SIZE];
                int checkSum = 0;
                int bytesRead;

                while (-1 != (bytesRead = file.read(buffer)))
                {
                    for (int i = 0; i < bytesRead; i++)
                    {
                        checkSum += buffer[i];
                    }
                }

                file.close();

                return checkSum;
            }
        },

        new PerfTestCase("BufferedStreamFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                int checkSum = 0;
                OutputStream out = 
                    new BufferedOutputStream(new FileOutputStream(fileName));

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    byte b = (byte)i;
                    checkSum += b;
                    out.write(b);
                }

                out.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                int checkSum = 0;
                InputStream in = 
                    new BufferedInputStream(new FileInputStream(fileName));

                int b;
                while (-1 != (b = in.read()))
                {
                    checkSum += (byte)b;
                }

                in.close();

                return checkSum;
            }
        },


        new PerfTestCase("BufferedChannelFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE);
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    byte b = (byte)i;
                    checkSum += b;
                    buffer.put(b);

                    if (!buffer.hasRemaining())
                    {
                        buffer.flip();
                        channel.write(buffer);
                        buffer.clear();
                    }
                }

                channel.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE);
                int checkSum = 0;

                while (-1 != (channel.read(buffer)))
                {
                    buffer.flip();

                    while (buffer.hasRemaining())
                    {
                        checkSum += buffer.get();
                    }

                    buffer.clear();
                }

                return checkSum;
            }
        },

        new PerfTestCase("MemoryMappedFile")
        {
            public int testWrite(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                MappedByteBuffer buffer = 
                    channel.map(READ_WRITE, 0,
                                Math.min(channel.size(), MAX_VALUE));
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    if (!buffer.hasRemaining())
                    {
                        buffer = 
                            channel.map(READ_WRITE, i,
                                        Math.min(channel.size() - i , MAX_VALUE));
                    }

                    byte b = (byte)i;
                    checkSum += b;
                    buffer.put(b);
                }

                channel.close();

                return checkSum;
            }

            public int testRead(final String fileName) throws Exception
            {
                FileChannel channel = 
                    new RandomAccessFile(fileName, "rw").getChannel();
                MappedByteBuffer buffer = 
                    channel.map(READ_ONLY, 0,
                                Math.min(channel.size(), MAX_VALUE));
                int checkSum = 0;

                for (long i = 0; i < FILE_SIZE; i++)
                {
                    if (!buffer.hasRemaining())
                    {
                        buffer = 
                            channel.map(READ_WRITE, i,
                                        Math.min(channel.size() - i , MAX_VALUE));
                    }

                    checkSum += buffer.get();
                }

                channel.close();

                return checkSum;
            }
        },
    };
}
Results

400MB file
===========
RandomAccessFile    write=379,610,750   read=1,452,482,269 bytes/sec
RandomAccessFile    write=294,041,636   read=1,494,890,510 bytes/sec
RandomAccessFile    write=250,980,392   read=1,422,222,222 bytes/sec
RandomAccessFile    write=250,366,748   read=1,388,474,576 bytes/sec
RandomAccessFile    write=260,394,151   read=1,422,222,222 bytes/sec

BufferedStreamFile  write=98,178,331    read=286,433,566 bytes/sec
BufferedStreamFile  write=100,244,738   read=288,857,545 bytes/sec
BufferedStreamFile  write=82,948,562    read=154,100,827 bytes/sec
BufferedStreamFile  write=108,503,311   read=153,869,271 bytes/sec
BufferedStreamFile  write=113,055,478   read=152,608,047 bytes/sec

BufferedChannelFile write=228,443,948   read=356,173,913 bytes/sec
BufferedChannelFile write=265,629,053   read=374,063,926 bytes/sec
BufferedChannelFile write=223,825,136   read=1,539,849,624 bytes/sec
BufferedChannelFile write=232,992,036   read=1,539,849,624 bytes/sec
BufferedChannelFile write=212,779,220   read=1,534,082,397 bytes/sec

MemoryMappedFile    write=300,955,180   read=305,899,925 bytes/sec
MemoryMappedFile    write=313,149,847   read=310,538,286 bytes/sec
MemoryMappedFile    write=326,374,501   read=303,857,566 bytes/sec
MemoryMappedFile    write=327,680,000   read=304,535,315 bytes/sec
MemoryMappedFile    write=326,895,450   read=303,632,320 bytes/sec

8GB File
============
RandomAccessFile    write=167,402,321   read=251,922,012 bytes/sec
RandomAccessFile    write=193,934,802   read=257,052,307 bytes/sec
RandomAccessFile    write=192,948,159   read=248,460,768 bytes/sec
RandomAccessFile    write=191,814,180   read=245,225,408 bytes/sec
RandomAccessFile    write=190,635,762   read=275,315,073 bytes/sec

BufferedStreamFile  write=154,823,102   read=248,355,313 bytes/sec
BufferedStreamFile  write=152,083,913   read=253,418,301 bytes/sec
BufferedStreamFile  write=133,099,369   read=146,056,197 bytes/sec
BufferedStreamFile  write=131,065,708   read=146,217,827 bytes/sec
BufferedStreamFile  write=132,694,052   read=148,116,004 bytes/sec

BufferedChannelFile write=186,703,740   read=215,075,218 bytes/sec
BufferedChannelFile write=190,591,410   read=211,030,680 bytes/sec
BufferedChannelFile write=187,220,038   read=223,087,606 bytes/sec
BufferedChannelFile write=191,585,397   read=221,297,747 bytes/sec
BufferedChannelFile write=192,653,214   read=211,789,038 bytes/sec

MemoryMappedFile    write=123,023,322   read=231,530,156 bytes/sec
MemoryMappedFile    write=121,961,023   read=230,403,600 bytes/sec
MemoryMappedFile    write=123,317,778   read=229,899,250 bytes/sec
MemoryMappedFile    write=121,472,738   read=231,739,745 bytes/sec
MemoryMappedFile    write=120,362,615   read=231,190,382 bytes/sec

Analysis

For years I was a big fan of using RandomAccessFile directly because of the control it gives and the predictable execution.  I never found using buffered streams to be useful from a performance perspective and this still seems to be the case.

In more recent testing I've found that using NIO FileChannel and ByteBuffer are doing much better. With Java 7 the flexibility of this programming approach has been improved for random access with SeekableByteChannel.

It seems that for reading RandomAccessFile and NIO do very well with Memory Mapped files winning for writes in some cases.

I've seen these results vary greatly depending on platform.  File system, OS, storage devices, and available memory all have a significant impact.  In a few cases I've seen memory-mapped files perform significantly better than the others but this needs to be tested on your platform because your mileage may vary...

A special note should be made for the use of memory-mapped large files when pushing for maximum throughput.  I've often found the OS can become unresponsive due the the pressure put on the virtual memory sub-system.

Conclusion

There is a significant difference in performance for the different means of doing sequential file IO from Java.  Not all methods are even remotely equal.  For most IO I've found the use of ByteBuffers and Channels to be the best optimised parts of the IO libraries.  If buffered streams are your IO libraries of choice, then it is worth branching out and and getting familiar with the implementations of Channel and Buffer or even falling back and using the good old RandomAccessFile.

Tuesday, 22 November 2011

Biased Locking, OSR, and Benchmarking Fun

After my last post on Java Lock Implementations, I got a lot of good feedback about my results and micro-benchmark design approach.  As a result I now understand JVM warmup, On Stack Replacement (OSR) and Biased Locking somewhat better than before.  Special thanks to Dave Dice from Oracle, and Cliff Click & Gil Tene from Azul, for their very useful feedback.

In the last post I concluded, based on my experiments, that biased locking was no longer necessary on modern CPUs.  While this conclusion is understandable given the data gathered in the experiment, it was not valid because the experiment did not take account of some JVM warm up behaviour that I was unaware of.

In this post I will re-run the experiment taking into account the feedback and present some new results.  I shall also expand on the changes I've made to the test and why it is important to consider the JVM warm-up behaviour when writing micro-benchmarks, or even very lean Java applications with quick start up time.

On Stack Replacement (OSR)

Java virtual machines will compile code to achieve greater performance based on runtime profiling.  Some VMs run an interpreter for the majority of code and replace hot areas with compiled code following the 80/20 rule.  Other VMs compile all code simply at first then replace the simple code with more optimised code based on profiling.  Oracle Hotspot and Azul are examples of the first type and Oracle JRockit is an example of the second.

Oracle Hotspot will count invocations of a method return plus branch backs for loops in that method, and if this exceeds 10K in server mode the method will be compiled.  The compiled code on normal JIT'ing can be used when the method is next called.  However if a loop is still iterating it may make sense to replace the method before the loop completes, especially if it has many iterations to go.  OSR is the means by which a method gets replaced with a compiled version part way through iterating a loop.

I was under the impression that normal JIT'ing and OSR would result in similar code.  Cliff Click pointed out that it is much harder for a runtime to optimise a loop part way through, and especially difficult if nested.  For example, bounds checking within the loop may not be possible to eliminate. Cliff will blog in more detail on this shortly.

What this means is that you are likely to get better optimised code by doing a small number of shorter warm ups than a single large one.  You can see in the code below how I do 10 shorter runs in a loop before the main large run compared to the last article where I did a single large warm-up run.

Biased Locking

Dave Dice pointed out that Hotspot does not enable objects for biased locking in the first few seconds (4s at present) of JVM startup. This is because some benchmarks, and NetBeans, have a lot of thread contention on start up and the revocation cost is significant.

All objects by default are created with biased locking enabled in Oracle Hotspot after the first few seconds of start-up delay, and can be configured with -XX:BiasedLockingStartupDelay=0.

This point, combined with knowing more about OSR, is important for micro-benchmarks.  It is also important to be aware of these points if you have a lean Java application that starts in a few seconds.

The Code
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.CyclicBarrier;

import static java.lang.System.out;

public final class TestLocks implements Runnable
{
    public enum LockType {JVM, JUC}
    public static LockType lockType;

    public static final long WARMUP_ITERATIONS = 100L * 1000L;
    public static final long ITERATIONS = 500L * 1000L * 1000L;
    public static long counter = 0L;

    public static final Object jvmLock = new Object();
    public static final Lock jucLock = new ReentrantLock();
    private static int numThreads;

    private final long iterationLimit;
    private final CyclicBarrier barrier;

    public TestLocks(final CyclicBarrier barrier, final long iterationLimit)
    {
        this.barrier = barrier;
        this.iterationLimit = iterationLimit;
    }

    public static void main(final String[] args) throws Exception
    {
        lockType = LockType.valueOf(args[0]);
        numThreads = Integer.parseInt(args[1]);

        for (int i = 0; i < 10; i++)
        {
            runTest(numThreads, WARMUP_ITERATIONS);
            counter = 0L;
        }

        final long start = System.nanoTime();
        runTest(numThreads, ITERATIONS);
        final long duration = System.nanoTime() - start;

        out.printf("%d threads, duration %,d (ns)\n", numThreads, duration);
        out.printf("%,d ns/op\n", duration / ITERATIONS);
        out.printf("%,d ops/s\n", (ITERATIONS * 1000000000L) / duration);
        out.println("counter = " + counter);
    }

    private static void runTest(final int numThreads, final long iterationLimit)
        throws Exception
    {
        CyclicBarrier barrier = new CyclicBarrier(numThreads);
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < threads.length; i++)
        {
            threads[i] = new Thread(new TestLocks(barrier, iterationLimit));
        }

        for (Thread t : threads)
        {
            t.start();
        }

        for (Thread t : threads)
        {
            t.join();
        }
    }

    public void run()
    {
        try
        {
            barrier.await();
        }
        catch (Exception e)
        {
            // don't care
        }

        switch (lockType)
        {
            case JVM: jvmLockInc(); break;
            case JUC: jucLockInc(); break;
        }
    }

    private void jvmLockInc()
    {
        long count = iterationLimit / numThreads;
        while (0 != count--)
        {
            synchronized (jvmLock)
            {
                ++counter;
            }
        }
    }

    private void jucLockInc()
    {
        long count = iterationLimit / numThreads;
        while (0 != count--)
        {
            jucLock.lock();
            try
            {
                ++counter;
            }
            finally
            {
                jucLock.unlock();
            }
        }
    }
}

Script to run tests:

set -x
for i in {1..8}
do 
    java -server -XX:-UseBiasedLocking TestLocks JVM $i
done

for i in {1..8}
do 
    java -server -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 TestLocks JVM $i
done

for i in {1..8}
do 
    java -server TestLocks JUC $i
done

Results

The tests are carried out with 64-bit Linux (Fedora Core 15) and Oracle JDK 1.6.0_29.

Nehalem 2.8GHz - Ops/Sec
Threads-UseBiasedLocking+UseBiasedLockingReentrantLock
153,283,461
450,950,969
62,876,566
218,519,295
18,108,615
10,217,186
313,349,605
13,416,198
14,108,622
48,120,172
8,040,773
14,207,310
54,725,114
4,551,766
14,302,683
65,133,706
5,246,548
14,676,616
75,473,652
5,585,666
18,145,525
85,514,056
5,414,171
19,010,725


Sandy Bridge 2.0GHz - Ops/Sec
Threads-UseBiasedLocking+UseBiasedLockingReentrantLock
1
34,500,407
396,511,324
43,148,808
2
20,899,076
19,742,639
6,038,923
3
9,288,039
11,957,032
24,147,807
4
5,618,862
5,589,289
9,082,961
5
5,609,932
5,592,574
9,389,243
6
5,742,907
5,760,558
12,518,728
7
6,699,201
6,641,886
13,684,475
8
6,957,824
6,925,410
14,819,005

Observations
  1. Biased locking has a huge benefit in the un-contended single threaded case.
  2. Biased locking when un-contended, and not revoked, only adds 4-5 cycles of cost.  This is the cost when having a cache hit for the lock structures, on top of the code protected in the critical section.
  3. -XX:BiasedLockingStartupDelay=0 needs to be set for lean applications and micro-benchmarks.
  4. Avoiding OSR does not make a material difference to this set of test results.  This is likely to be because the loop is so simple or other costs are dominating.
  5. For the current implementations, ReentrantLocks scale better than synchronised locks under contention, except in the case of 2 contending threads.
Conclusion

My tests in the last post are invalid for the testing of an un-contended biased lock, because the lock was not actually biased.  If you are designing code following the single writer principle, and therefore having un-contended locks when using 3rd party libraries, then having biased locking enabled is a significant performance boost.

Saturday, 19 November 2011

Java Lock Implementations

We all use 3rd party libraries as a normal part of development.  Generally, we have no control over their internals.  The libraries provided with the JDK are a typical example.   Many of these libraries employ locks to manage contention.

JDK locks come with two implementations.  One uses atomic CAS style instructions to manage the claim process.  CAS instructions tend to be the most expensive type of CPU instructions and on x86 have memory ordering semantics.  Often locks are un-contended which gives rise to a possible optimisation whereby a lock can be biased to the un-contended thread using techniques to avoid the use of atomic instructions.  This biasing allows a lock in theory to be quickly reacquired by the same thread.  If the lock turns out to be contended by multiple threads the algorithm with revert from being biased and fall back to the standard approach using atomic instructions.  Biased locking became the default lock implementation with Java 6.

When respecting the single writer principle, biased locking should be your friend.  Lately, when using the sockets API, I decided to measure the lock costs and was surprised by the results.  I found that my un-contended thread was incurring a bit more cost than I expected from the lock.  I put together the following test to compare the cost of the current lock implementations available in Java 6.

The Test

For the test I shall increment a counter within a lock, and increase the number of contending threads on the lock.  This test will be repeated for the 3 major lock implementations available to Java:
  1. Atomic locking on Java language monitors
  2. Biased locking on Java language monitors
  3. ReentrantLock introduced with the java.util.concurrent package in Java 5.
I'll also run the tests on the 3 most recent generations of the Intel CPU.  For each CPU I'll execute the tests up to the maximum number of concurrent threads the core count will support.

The tests are carried out with 64-bit Linux (Fedora Core 15) and Oracle JDK 1.6.0_29.

The Code
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.CyclicBarrier;

import static java.lang.System.out;

public final class TestLocks implements Runnable
{
    public enum LockType { JVM, JUC }
    public static LockType lockType;

    public static final long ITERATIONS = 500L * 1000L *1000L;
    public static long counter = 0L;

    public static final Object jvmLock = new Object();
    public static final Lock jucLock = new ReentrantLock();
    private static int numThreads;
    private static CyclicBarrier barrier;

    public static void main(final String[] args) throws Exception
    {
        lockType = LockType.valueOf(args[0]);
        numThreads = Integer.parseInt(args[1]);
        
        runTest(numThreads); // warm up
        counter = 0L;

        final long start = System.nanoTime();
        runTest(numThreads);
        final long duration = System.nanoTime() - start;

        out.printf("%d threads, duration %,d (ns)\n", numThreads, duration);
        out.printf("%,d ns/op\n", duration / ITERATIONS);
        out.printf("%,d ops/s\n", (ITERATIONS * 1000000000L) / duration);
        out.println("counter = " + counter);
    }

    private static void runTest(final int numThreads) throws Exception
    {
        barrier = new CyclicBarrier(numThreads);
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < threads.length; i++)
        {
            threads[i] = new Thread(new TestLocks());
        }

        for (Thread t : threads)
        {
            t.start();
        }

        for (Thread t : threads)
        {
            t.join();
        }
    }

    public void run()
    {
        try
        {
            barrier.await();
        }
        catch (Exception e)
        {
            // don't care
        }

        switch (lockType)
        {
            case JVM: jvmLockInc(); break;
            case JUC: jucLockInc(); break;
        }
    }

    private void jvmLockInc()
    {
        long count = ITERATIONS / numThreads;
        while (0 != count--)
        {
            synchronized (jvmLock)
            {
                ++counter;
            }
        }
    }

    private void jucLockInc()
    {
        long count = ITERATIONS / numThreads;
        while (0 != count--)
        {
            jucLock.lock();
            try
            {
                ++counter;
            }
            finally
            {
                jucLock.unlock();
            }
        }
    }
}
Script the tests:

set -x
for i in {1..8}; do java -XX:-UseBiasedLocking TestLocks JVM $i; done
for i in {1..8}; do java -XX:+UseBiasedLocking TestLocks JVM $i; done
for i in {1..8}; do java TestLocks JUC $i; done

The Results

Figure 1.

Figure 2.

Figure 3.

Observations
  1. Biased locking, in the un-contended case, is ~10% more expensive than the atomic locking.  It seems that for recent CPU generations the cost of atomic instructions is less than the necessary housekeeping for biased locks.  Previous to Nehalem, lock instructions would assert a lock on the memory bus to perform the these atomic operations, each would cost more than 100 cycles.  Since Nehalem, atomic instructions can be handled local to a CPU core, and typically cost only 10-20 cycles if they do not need to wait on the store buffer to empty while enforcing memory ordering semantics.
  2. As contention increases, language monitor locks quickly reach a throughput limit regardless of thread count.
  3. ReentrantLock gives the best un-contended performance and scales significantly better with increasing contention compared to language monitors using synchronized.
  4. ReentrantLock has an odd characteristic of reduced performance when 2 threads are contending.  This deserves further investigation.
  5. Sandybridge suffers from the increased latency of atomic instructions I detailed in a previous article when contended thread count is low.  As contended thread count continues to increase, the cost of the kernel arbitration tends to dominate and Sandybridge shows its strength with increased memory throughput.
Conclusion

Biased locking should no longer be the default lock implementation on modern Intel processors.  I recommend you measure your applications and experiement with the -XX:-UseBiasedLocking JVM option to determine if you can benefit from using atomic lock based algorithm for the un-contended case.

When developing your own concurrent libraries I would recommend ReentrantLock rather than using the synchronized keyword due to the significantly better performance on x86, if a lock-free alternative algorithm is not a viable option.

Update 20-Nov-2011

Dave Dice has pointed out that biased locking is not implemented for the locks created in the first few seconds of the JVM startup.  I'll re-run my tests this week and post the results.  I've had some more quality feedback that suggests my results could be potentially invalid.  Micro benchmarks can be tricky but the advice of measuring your own application in the large still stands.

A re-run of the tests can be seen in this follow-on blog taking account of Dave's feedback.

Saturday, 5 November 2011

Locks & Condition Variables - Latency Impact

In a previous article on Inter-Thread Latency I showed how it is possible to signal a state change between 2 threads with less than 50ns of latency.  To many developers, writing concurrent code using locks is a scary experience.  Writing concurrent code using lock-free algorithms, i.e. algorithms that rely on the use of memory barriers and an intimate understanding of the underlying memory models, can be totally terrifying.  To me lock-free / non-blocking algorithms are like playing with explosives or corrosive chemicals, if you do not understand what you are doing, or show the ultimate respect, then very bad things can, and most likely will, happen!

In this article, I'd like to illustrate the impact of using locks and the resulting latency they can impose on your designs.  I want to use a very similar algorithm to that used in my previous inter-thread latency article to illustrate the ping-pong effect of handing control back and forth between 2 threads.  In this case, rather than using a couple of volatile variables, I will employ a pair of condition variables to signal a state change so control can be passed back and forth.

The Code
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.System.out;

public final class LockedSignallingLatency
{
    private static final int ITERATIONS = 10 * 1000 * 1000;

    private static final Lock lock = new ReentrantLock();
    private static final Condition sendCondition = lock.newCondition();
    private static final Condition echoCondition = lock.newCondition();

    private static long sendValue = -1L;
    private static long echoValue = -1L;

    public static void main(final String[] args)
        throws Exception
    {
        final Thread sendThread = new Thread(new SendRunner());
        final Thread echoThread = new Thread(new EchoRunner());

        final long start = System.nanoTime();

        echoThread.start();
        sendThread.start();

        sendThread.join();
        echoThread.join();

        final long duration = System.nanoTime() - start;

        out.printf("duration %,d (ns)\n", duration);
        out.printf("%,d ns/op\n", duration / (ITERATIONS * 2L));
        out.printf("%,d ops/s\n", (ITERATIONS * 2L * 1000000000L) / duration);
    }

    public static final class SendRunner implements Runnable
    {
        public void run()
        {
            for (long i = 0; i < ITERATIONS; i++)
            {
                lock.lock();
                try
                {
                    sendValue = i;
                    sendCondition.signal();
                }
                finally
                {
                    lock.unlock();
                }

                lock.lock();
                try
                {
                    while (echoValue != i)
                    {
                        echoCondition.await();
                    }
                }
                catch (final InterruptedException ex)
                {
                    break;
                }
                finally
                {
                    lock.unlock();
                }

            }
        }
    }

    public static final class EchoRunner implements Runnable
    {
        public void run()
        {
            for (long i = 0; i < ITERATIONS; i++)
            {
                lock.lock();
                try
                {
                    while (sendValue != i)
                    {
                        sendCondition.await();
                    }
                }
                catch (final InterruptedException ex)
                {
                    break;
                }
                finally
                {
                    lock.unlock();
                }

                lock.lock();
                try
                {
                    echoValue = i;
                    echoCondition.signal();
                }
                finally
                {
                    lock.unlock();
                }
            }
        }
    }
}
Test Results

Windows 7 Professional 64-bit - Oracle JDK 1.6.0 - Nehalem 2.8 GHz

$ start /AFFINITY 0x14 /B /WAIT java LockedSignallingLatency
duration 41,649,616,343 (ns)
2,082 ns/op
480,196 ops/s

$ java LockedSignallingLatency
duration 73,789,456,491 (ns)
3,689 ns/op
271,041 ops/s

Linux Fedora Core 15 64-bit - Oracle JDK 1.6.0 - Nehalem 2.8 GHz

$ taskset -c 2,4 java LockedSignallingLatency
duration 40,469,689,559 (ns)
2,023 ns/op
494,197 ops/s

$ java LockedSignallingLatency
duration 169,795,756,230 (ns)
8,489 ns/op
117,788 ops/s

Linux Fedora Core 15 64-bit - Oracle JDK 1.6.0 - Sandybridge 2.0 GHz

$ taskset -c 2,4 java LockedSignallingLatency
duration 47,209,549,484 (ns)
2,360 ns/op
423,643 ops/s

$ java LockedSignallingLatency
duration 336,168,489,093 (ns)
16,808 ns/op
59,493 ops/s

Observations

The above is a typical set of results I've seen in the middle of the range from multiple runs.  There are a couple of interesting observations I'd like to expand on.

Firstly, this is 3 orders of magnitude greater latency than what I illustrated in the previous article using just memory barriers to signal between threads.  This cost comes about because the kernel needs to get involved to arbitrate between the threads for the lock, and then manage the scheduling for the threads to awaken when the condition is signalled.  The one-way latency to signal a change is pretty much the same as what is considered current state of the art for network hops between nodes via a switch.  It is possible to get ~1µs latency with InfiniBand and less than 5µs with 10GigE and user-space IP stacks.

Secondly, the impact is clear when letting the OS choose what CPUs the threads get scheduled on rather than pinning them manually.  I've observed this same issue across many use cases whereby Linux, in default configuration for its scheduler, will greatly impact the performance of a low-latency system by scheduling threads on different cores resulting in cache pollution.   Windows by default seems to make a better job of this.

I recently had an interesting discussion with Cliff Click about using condition variables and their cost.  He pointed out a problem he was seeing.  If you look at the case where a sleeping thread gets signalled within the lock, it goes to run and then discovers it cannot get the lock because the signalling thread already has the lock, so it gets put back to sleep until the signalling thread releases the lock, thus causing more work than necessary.  Modern schedulers would benefit from being more aware of communication mechanisms between threads to have more efficient location and rescheduling logic.  As we go more concurrent and parallel our schedulers need to become more aware of IPC mechanisms.

Conclusion

When designing a low-latency system it is crucial to avoid the use of locks and condition variables for the main transaction flows.  Non-blocking or lock-free algorithms are key to achieving ultra-low latency but can be very difficult to prove correct.   I would not recommend designing lock-free algorithms for business logic but they can be very effectively employed for low-level infrastructure components.  The business logic is best run on single threads following the Single Writer Principle from my previous article.

Wednesday, 19 October 2011

Smart Batching

How often have we all heard that “batching” will increase latency?  As someone with a passion for low-latency systems this surprises me.  In my experience when batching is done correctly, not only does it increase throughput, it can also reduce average latency and keep it consistent.

Well then, how can batching magically reduce latency?  It comes down to what algorithm and data structures are employed.  In a distributed environment we are often having to batch up messages/events into network packets to achieve greater throughput.  We also employ similar techniques in buffering writes to storage to reduce the number of IOPS. That storage could be a block device backed file-system or a relational database.  Most IO devices can only handle a modest number of IO operations per second, so it is best to fill those operations efficiently.  Many approaches to batching involve waiting for a timeout to occur and this will by its very nature increase latency.  The batch can also get filled before the timeout occurs making the latency even more unpredictable.

Figure 1.

Figure 1. above depicts decoupling the access to an IO device, and therefore the contention for access to it, by introducing a queue like structure to stage the messages/events to be sent and a thread doing the batching for writing to the device.

The Algorithm

An approach to batching uses the following algorithm in Java pseudo code:
public final class NetworkBatcher
    implements Runnable
{
    private final NetworkFacade network;
    private final Queue<Message> queue;
    private final ByteBuffer buffer;

    public NetworkBatcher(final NetworkFacade network,
                          final int maxPacketSize,
                          final Queue<Message> queue)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
        this.queue = queue;
    }

    public void run()
    {
        while (!Thread.currentThread().isInterrupted())
        {
            while (null == queue.peek())
            {
                employWaitStrategy(); // block, spin, yield, etc.
            }

            Message msg;
            while (null != (msg = queue.poll()))
            {
                if (msg.size() > buffer.remaining())
                {
                    sendBuffer();
                }

                buffer.put(msg.getBytes());
            }

            sendBuffer();
        }
    }

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }
}

Basically, wait for data to become available and as soon as it is, send it right away.  While sending a previous message or waiting on new messages, a burst of traffic may arrive which can all be sent in a batch, up to the size of the buffer, to the underlying resource.  This approach can use ConcurrentLinkedQueue which provides low-latency and avoid locks.  However it has an issue in not creating back pressure to stall producing/publishing threads if they are outpacing the batcher whereby the queue could grow out of control because it is unbounded.  I’ve often had to wrap ConcurrentLinkedQueue to track its size and thus create back pressure.  This size tracking can add 50% to the processing cost of using this queue in my experience.

This algorithm respects the single writer principle and can often be employed when writing to a network or storage device, and thus avoid lock contention in third party API libraries.  By avoiding the contention we avoid the J-Curve latency profile normally associated with contention on resources, due to the queuing effect on locks.  With this algorithm, as load increases, latency stays constant until the underlying device is saturated with traffic resulting in a more "bathtub" profile than the J-Curve.

Let’s take a worked example of handling 10 messages that arrive as a burst of traffic.  In most systems traffic comes in bursts and is seldom uniformly spaced out in time.  One approach will assume no batching and the threads write to device API directly as in Figure 1. above.  The other will use a lock free data structure to collect the messages plus a single thread consuming messages in a loop as per the algorithm above.   For the example let’s assume it takes 100µs to write a single buffer to the network device as a synchronous operation and have it acknowledged.  The buffer will ideally be less than the MTU of the network in size when latency is critical.  Many network sub-systems are asynchronous and support pipelining but we will make the above assumption to clarify the example.  If the network operation is using a protocol like HTTP under REST or Web Services then this assumption matches the underlying implementation.

Best (µs)Average (µs)Worst (µs)Packets Sent
Serial1005001,00010
Smart Batching1001502001-2

The absolute lowest latency will be achieved if a message is sent from the thread originating the data directly to the resource, if the resource is un-contended.   The table above shows what happens when contention occurs and a queuing effect kicks in. With the serial approach 10 individual packets will have to be sent and these typically need to queue on a lock managing access to the resource, therefore they get processed sequentially.  The above figures assume the locking strategy works perfectly with no perceivable overhead which is unlikely in a real application.

For the batching solution it is likely all 10 packets will be picked up in first batch if the concurrent queue is efficient, thus giving the best case latency scenario.  In the worst case only one message is sent in the first batch with the other nine following in the next.  Therefore in the worst case scenario one message has a latency of 100µs and the following 9 have a latency of 200µs thus giving a worst case average of 190µs which is significantly better than the serial approach.

This is one good example when the simplest solution is just a bit too simple because of the contention.  The batching solution helps achieve consistent low-latency under burst conditions and is best for throughput.  It also has a nice effect across the network on the receiving end in that the receiver has to process fewer packets and therefore makes the communication more efficient both ends.

Most hardware handles data in buffers up to a fixed size for efficiency.  For a storage device this will typically be a 4KB block.  For networks this will be the MTU and is typically 1500 bytes for Ethernet.  When batching, it is best to understand the underlying hardware and write batches down in ideal buffer size to be optimally efficient.  However keep in mind that some devices need to envelope the data, e.g. the Ethernet and IP headers for network packets so the buffer needs to allow for this.

There will always be an increased latency from a thread switch and the cost of exchange via the data structure.  However there are a number of very good non-blocking structures available using lock-free techniques.  For the Disruptor this type of exchange can be achieved in as little as 50-100ns thus making the choice of taking the smart batching approach a no brainer for low-latency or high-throughput distributed systems.

This technique can be employed for many problems and not just IO.  The core of the Disruptor uses this technique to help rebalance the system when the publishers burst and outpace the EventProcessors.  The algorithm can be seen inside the BatchEventProcessor.

Note: For this algorithm to work the queueing structure must handle the contention better than the underlying resource.  Many queue implementations are extremely poor at managing contention.  Use science and measure before coming to a conclusion.

Batching with the Disruptor

The code below shows the same algorithm in action using the Disruptor's EventHandler mechanism.  In my experience, this is a very effective technique for handling any IO device efficiently and keeping latency low when dealing with load or burst traffic.
public final class NetworkBatchHandler
    implements EventHander<Message>
{
    private final NetworkFacade network;
    private final ByteBuffer buffer;

    public NetworkBatchHandler(final NetworkFacade network,
                               final int maxPacketSize)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
    }
    
    public void onEvent(Message msg, long sequence, boolean endOfBatch) 
        throws Exception
    {
        if (msg.size() > buffer.remaining())
        {
            sendBuffer();
        }

        buffer.put(msg.getBytes());
        
        if (endOfBatch)
        {
            sendBuffer();
        }
    } 

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }    
}
The endOfBatch parameter greatly simplifies the handling of the batch compared to the double loop in the algorithm above.

I have simplified the examples to illustrate the algorithm.  Clearly error handling and other edge conditions need to be considered.

Separation of IO from Work Processing

There is another very good reason to separate the IO from the threads doing the work processing.  Handing off the IO to another thread means the worker thread, or threads, can continue processing without blocking in a nice cache friendly manner. I've found this to be critical in achieving high-performance throughput.

If the underlying IO device or resource becomes briefly saturated then the messages can be queued for the batcher thread allowing the work processing threads to continue.  The batching thread then feeds the messages to the IO device in the most efficient way possible allowing the data structure to handle the burst and if full apply the necessary back pressure, thus providing a good separation of concerns in the workflow.

Conclusion

So there you have it.  Smart Batching can be employed in concert with the appropriate data structures to achieve consistent low-latency and maximum throughput.

Thursday, 22 September 2011

Single Writer Principle

When trying to build a highly scalable system the single biggest limitation on scalability is having multiple writers contend for any item of data or resource.  Sure, algorithms can be bad, but let’s assume they have a reasonable Big O notation so we'll focus on the scalability limitations of the systems design. 

I keep seeing people just accept having multiple writers as the norm.  There is a lot of research in computer science for managing this contention that boils down to 2 basic approaches.  One is to provide mutual exclusion to the contended resource while the mutation takes place; the other is to take an optimistic strategy and swap in the changes if the underlying resource has not changed while you created the new copy. 

Mutual Exclusion

Mutual exclusion is the means by which only one writer can have access to a protected resource at a time, and is usually implemented with a locking strategy.  Locking strategies require an arbitrator, usually the operating system kernel, to get involved when the contention occurs to decide who gains access and in what order.  This can be a very expensive process often requiring many more CPU cycles than the actual transaction to be applied to the business logic would use.  Those waiting to enter the critical section, in advance of performing the mutation must queue, and this queuing effect (Little's Law) causes latency to become unpredictable and ultimately restricts throughput.

Optimistic Concurrency Control

Optimistic strategies involve taking a copy of the data, modifying it, then copying back the changes if data has not mutated in the meantime.  If a change has happened in the meantime you repeat the process until successful.  This repeating of the process increases with contention and therefore causes a queuing effect just like with mutual exclusion.  If you work with a source code control system, such as Subversion or CVS, then you are using this algorithm every day.  Optimistic strategies can work with data but do not work so well with resources such as hardware because you cannot take a copy of the hardware!  The ability to perform the changes atomically to data is made possible by CAS instructions offered by the hardware.

Most locking strategies are composed from optimistic strategies for changing the lock state or mutual exclusion primitive.

Managing Contention vs. Doing Real Work

CPUs can typically process one or more instructions per cycle.  For example, modern Intel CPU cores each have 6 execution units that can be doing a combination of arithmetic, branch logic, word manipulation and memory loads/stores in parallel.  If while doing work the CPU core incurs a cache miss, and has to go to main memory, it will stall for hundreds of cycles until the result of that memory request returns.  To try and improve things the CPU will make some speculative guesses as to what a memory request will return to continue processing.  If a second miss occurs the CPU will no longer speculate and simply wait for the memory request to return because it cannot typically keep the state for speculative execution beyond 2 cache misses.  Managing cache misses is the single largest limitation to scaling the performance of our current generation of CPUs.

Now what does this have to do with managing contention?  Well if two or more threads are using locks to provide mutual exclusion, at best they will be going to the L3 cache, or over a socket interconnect, to access share state of the lock using CAS operations.  These lock/CAS instructions cost 10s of cycles in the best case when un-contended, plus they cause out-of-order execution for the CPU to be suspended and load/store buffers to be flushed.  At worst, collisions occur and the kernel will need to get involved and put one or more of the threads to sleep until the lock is released.  This rescheduling of the blocked thread will result in cache pollution.  The situation can be even worse when the thread is re-scheduled on another core with a cold cache resulting in many cache misses. 

For highly contended data it is very easy to get into a situation whereby the system spends significantly more time managing contention than doing real work.  The table below gives an idea of basic costs for managing contention when the program state is very small and easy to reload from the L2/L3 cache, never mind main memory. 

MethodTime (ms)
One Thread300
One Thread with Memory Barrier4,700
One Thread with CAS5,700
Two Threads with CAS18,000
One Thread with Lock10,000
Two Threads with Lock118,000

This table illustrates the costs of incrementing a 64-bit counter 500 million times using a variety of techniques on a 2.4Ghz Westmere processor.   I can hear people coming back with “but this is a trivial example and real-world applications are not that contended”.  This is true but remember real-world applications have way more state, and what do you think happens to all that state which is warm in cache when the context switch occurs???  By measuring the basic cost of contention it is possible to extrapolate the scalability limits of a system which has contention points.  As multi-core becomes ever more significant another approach is required.  My last post illustrates the micro level effects of CAS operations on modern CPUs, whereby Sandybridge can be worse for CAS and locks.

Single Writer Designs

Now, what if you could design a system whereby any item of data, or resource, is only mutated by a single writer/thread?  It is actually easier than you think in my experience.  It is OK if multiple threads, or other execution contexts, read the same data.  CPUs can broadcast read only copies of data to other cores via the cache coherency sub-system.  This has a cost but it scales very well.

If you have a system that can honour this single writer principle then each execution context can spend all its time and resources processing the logic for its purpose, and not be wasting cycles and resource on dealing with the contention problem.  You can also scale up without limitation until the hardware is saturated.  There is also a really nice benefit in that when working on architectures, such as x86/x64, where at a hardware level they have a memory model, whereby load/store memory operations have preserved order, thus memory barriers are not required if you adhere strictly to the single writer principle.  On x86/x64 "loads can be re-ordered with older stores" according to the memory model so memory barriers are required when multiple threads mutate the same data across cores.  The single writer principle avoids this issue because it never has to deal with writing the latest version of a data item that may have been written by another thread and currently in the store buffer of another core.

So how can we drive towards single writer designs?  I’ve found it is a very natural thing.  Consider how humans, or any other autonomous creatures of nature, operate with their model of the world.  We all have our own model of the world contained in our own heads, i.e. We have a copy of the world state for our own use.  We mutate the state in our heads based on inputs (events/messages) we receive via our senses.  As we process these inputs and apply them to our model we may take action that produces outputs, which others can take as their own inputs.  None of us reach directly into each other’s heads and mess with the neurons.  If we did this it would be a serious breach of encapsulation!  Originally, Object Oriented (OO) design was all about message passing, and somehow along the way we bastardised the message passing to be method calls and even allowed direct field manipulation – Yuk!  Who's bright idea was it to allow public access to fields of an object?  You deserve your own special hell. 

At university I studied transputers and interesting languages like Occam.  I thought very elegant designs appeared by having the nodes collaborate via message passing rather than mutating shared state.  I’m sure some of this has inspired the Disruptor.  My experience with the Disruptor has shown that is it possible to build systems with one or more orders of magnitude better throughput than locking or contended state based approaches.  It also gives much more predictable latency that stays constant until the hardware is saturated rather than the traditional J-curve latency profile.

It is interesting to see the emergence of numerous approaches that lend themselves to single writer solutions such as Node.js, Erlang, Actor patterns, and SEDA to name a few.  Unfortunately most use queue based implementations underneath, which breaks the single writer principle, whereas the Disruptor strives to separate the concerns so that the single writer principle can be preserved for the common cases.

Now I’m not saying locks and optimistic strategies are bad and should not be used.  They are excellent for many problems.  For example, bootstrapping a concurrent system or making major state stages in configuration or reference data.  However if the main flow of transactions act on contended data, and locks or optimistic strategies have to be employed, then the scalability is fundamentally limited. 

The Principle at Scale

This principle works at all levels of scale.  Mandelbrot got this so right.  CPU cores are just nodes of execution and the cache system provides message passing for communication.  The same patterns apply if the processing node is a server and the communication system is a local network.  If a service, in SOA architecture parlance, is the only service that can write to its data store it can be made to scale and perform much better.  Let’s say that underlying data is stored in a database and other services can go directly to that data, without sending a message to the service that owns the data, then the data is contended and requires the database to manage the contention and coherence of that data.  This prevents the service from caching copies of the data for faster response to the clients and restricts how the data can be sharded.  Encapsulation has just been broken at a more macro level when multiple different services write to the same data store.

Summary

If a system is decomposed into components that keep their own relevant state model, without a central shared model, and all communication is achieved via message passing then you have a system without contention naturally.  This type of system obeys the single writer principle if the messaging passing sub-system is not implemented as queues.  If you cannot move straight to a model like this, but are finding scalability issues related to contention, then start by asking the question, “How do I change this code to preserve the Single Writer Principle and thus avoid the contention?”

The Single Writer Principle is that for any item of data, or resource, that item of data should be owned by a single execution context for all mutations.