Showing posts with label Java. Show all posts
Showing posts with label Java. Show all posts

Monday 26 August 2013

Lock-Based vs Lock-Free Concurrent Algorithms

Last week I attended a review session of the new JSR166 StampedLock run by Heinz Kabutz at the excellent JCrete unconference. StampedLock is an attempt to address the contention issues that arise in a system when multiple readers concurrently access shared state. StampedLock is designed to perform better than ReentrantReadWriteLock by taking an optimistic read approach.

While attending the session a couple of things occurred to me. Firstly, I thought it was about time I reviewed the current status of Java lock implementations. Secondly, that although StampedLock looks like a good addition to the JDK, it seems to miss the fact that lock-free algorithms are often a better solution to the multiple reader case.

Test Case

To compare implementations I needed an API test case that would not favour a particular approach. For example, the API should be garbage free and allow the methods to be atomic. A simple test case is to design a spaceship that can be moved around a 2-dimensional space with the coordinates of its position available to be read atomically. At least 2 fields need to be read, or written, per transaction to make the concurrency interesting.
/**
 * Interface to a concurrent representation of a ship that can move around
 * a 2 dimensional space with updates and reads performed concurrently.
 */
public interface Spaceship
{
    /**
     * Read the position of the spaceship into the array of coordinates provided.
     *
     * @param coordinates into which the x and y coordinates should be read.
     * @return the number of attempts made to read the current state.
     */
    int readPosition(final int[] coordinates);

    /**
     * Move the position of the spaceship by a delta to the x and y coordinates.
     *
     * @param xDelta delta by which the spaceship should be moved in the x-axis.
     * @param yDelta delta by which the spaceship should be moved in the y-axis.
     * @return the number of attempts made to write the new coordinates.
     */
    int move(final int xDelta, final int yDelta);
}
The above API would be cleaner by factoring out an immutable Position object but I want to keep it garbage free and create the need to update multiple internal fields with minimal indirection. This API could easily be extended for a 3-dimensional space and require the implementations to be atomic.

Multiple implementations are built for each spaceship and exercised by a test harness. All the code and results for this blog can be found here.

The test harness will run each of the implementations in turn by using a megamorphic dispatch pattern to try and prevent inlining, lock-coarsening, and loop unrolling when accessing the concurrent methods.

Each implementation is subjected to 4 distinct threading scenarios that result in different contention profiles:
  • 1 reader - 1 writer
  • 2 readers - 1 writer
  • 3 readers - 1 writer
  • 2 readers - 2 writers
All tests are run with 64-bit Java 1.7.0_25, Linux 3.6.30, and a quad core 2.2GHz Ivy Bridge i7-3632QM. Throughput is measured over 5 second periods for each implementation with the tests repeated 5 times to ensure sufficient warm up. The results below are throughputs averaged per second over 5 runs. To approximate a typical Java deployment, no thread affinity or core isolation has been employed which would have reduced variance significantly.

Note: Other CPUs and operating systems can produce very different results.

Results

Figure 1.
Figure 2.
Figure 3.
Figure 4.

The raw data for the above charts can be found here.

Analysis

The real surprise for me from the results is the performance of ReentrantReadWriteLock.  I cannot see a use for this implementation beyond a case whereby there is a huge balance of reads and very little writes. My main takeaways are:
  1. StampedLock is a major improvement over existing lock implementations especially with increasing numbers of reader threads.
  2. StampedLock has a complex API. It is very easy to mistakenly call the wrong method for locking actions.
  3. Synchronised is a good general purpose lock implementation when contention is from only 2 threads.
  4. ReentrantLock is a good general purpose lock implementation when thread counts grow as previously discovered.
  5. Choosing to use ReentrantReadWriteLock should be based on careful and appropriate measurement. As with all major decisions, measure and make decisions based on data.
  6. Lock-free implementations can offer significant throughput advantages over lock-based algorithms.
Conclusion

It is nice seeing the influence of lock-free techniques appearing in lock-based algorithms. The optimistic strategy employed on read is effectively a lock-free algorithm at the times when a writer is not updating.

In my experience of teaching and developing lock-free algorithms, not only do they provide significant throughput advantages as evidenced here, they also provide much lower and less variance in latency.

Tuesday 16 July 2013

Java Garbage Collection Distilled

Serial, Parallel, Concurrent, CMS, G1, Young Gen, New Gen, Old Gen, Perm Gen, Eden, Tenured, Survivor Spaces, Safepoints, and the hundreds of JVM startup flags. Does this all baffle you when trying to tune the garbage collector while trying to get the required throughput and latency from your Java application? If it does then do not worry, you are not alone. Documentation describing garbage collection feels like man pages for an aircraft. Every knob and dial is detailed and explained but nowhere can you find a guide on how to fly. This article will attempt to explain the tradeoffs when choosing and tuning garbage collection algorithms for a particular workload.

The focus will be on Oracle Hotspot JVM and OpenJDK collectors as those are the ones in most common usage. Towards the end other commercial JVMs will be discussed to illustrate alternatives.

The Tradeoffs

Wise folk keep telling us, “You do not get something for nothing”. When we get something we usually have to give up something in return. When it comes to garbage collection we play with 3 major variables that set targets for the collectors:
  1. Throughput: The amount of work done by an application as a ratio of time spent in GC. Target throughput with ‑XX:GCTimeRatio=99 ; 99 is the default equating to 1% GC time.
  2. Latency: The time taken by systems in responding to events which is impacted by pauses introduced by garbage collection. Target latency for GC pauses with ‑XX:MaxGCPauseMillis=<n>.
  3. Memory: The amount of memory our systems use to store state, which is often copied and moved around when being managed. The set of active objects retained by the application at any point in time is known as the Live Set. Maximum heap size –Xmx<n> is a tuning parameter for setting the heap size available to an application.
Note: Often Hotspot cannot achieve these targets and will silently continue without warning, having missed its target by a great margin.

Latency is a distribution across events. It may be acceptable to have an increased average latency to reduce the worst-case latency, or make it less frequent. We should not interpret the term “real-time” to mean the lowest possible latency; rather real-time refers to having deterministic latency regardless of throughput.

For some application workloads, throughput is the most important target. An example would be a long running batch-processing job; it does not matter if a batch job is occasionally paused for a few seconds while garbage collection takes place, as long as the overall job can be completed sooner.

For virtually all other workloads, from human facing interactive applications to financial trading systems, if a system goes unresponsive for anything more than a few seconds or even milliseconds in some cases, it can spell disaster. In financial trading it is often worthwhile to trade off some throughput in return for consistent latency. We may also have applications that are limited by the amount of physical memory available and have to maintain a footprint, in which case we have to give up performance on both latency and throughput fronts.

Tradeoffs often play out as follows:
  • To a large extent the cost of garbage collection, as an amortized cost, can be reduced by providing the garbage collection algorithms with more memory.
  • The observed worst-case latency-inducing pauses due to garbage collecting can be reduced by containing the live set and keeping the heap size small.
  • The frequency with which pauses occur can be reduced by managing the heap and generation sizes, and by controlling the application’s object allocation rate.
  • The frequency of large pauses can be reduced by concurrently running the GC with the application, sometimes at the expense of throughput.

Object Lifetimes

Garbage collection algorithms are often optimised with the expectation that most objects live for a very short period of time, while relatively few live for very long. In most applications, objects that live for a significant period of time tend to constitute a very small percentage of objects allocated over time. In garbage collection theory this observed behavior is often known as “infant mortality” or the “weak generational hypothesis”. For example, loop Iterators are mostly short lived whereas static Strings are effectively immortal.

Experimentation has shown that generational garbage collectors can usually support an order-of-magnitude greater throughput than non-generational collectors do, and thus are almost ubiquitously used in server JVMs. By separating the generations of objects, we know that a region of newly allocated objects is likely to be very sparse for live objects. Therefore a collector that scavenges for the few live objects in this new region and copies them to another region for older objects can be very efficient. Hotspot garbage collectors record the age of an object in terms of the number of GC cycles survived.

Note: If your application consistently generates a lot of objects that live for a fairly long time then expect your application to be spending a significant portion of its time garbage collecting, and expect to be spending a significant portion of your time tuning the Hotspot garbage collectors. This is due to the reduced GC efficiency that happens when the generational “filter” is less effective, and resulting cost of collecting the longer living generations more frequently. Older generations are less sparse, and as a result the efficiency of older generation collection algorithms tends to be much lower. Generational garbage collectors tend to operate in two distinct collection cycles: Minor collections, when short-lived objects are collected, and the less frequent Major collections, when the older regions are collected.

Stop-The-World Events

The pauses that applications suffer during garbage collection are due to what are known as stop-the-world events. For garbage collectors to operate it is necessary, for practical engineering reasons, to periodically stop the running application so that memory can be managed. Depending on the algorithms, different collectors will stop-the-world at specific points of execution for varying durations of time. To bring an application to a total stop it is necessary to pause all the running threads. Garbage collectors do this by signaling the threads to stop when they come to a “safepoint”, which is a point during program execution at which all GC roots are known and all heap object contents are consistent. Depending on what a thread is doing it may take some time to reach a safepoint. Safepoint checks are normally performed on method returns and loop back edges, but can be optimized away in some places making them more dynamically rare. For example, if a thread is copying a large array, cloning a large object, or executing a monotonic counted loop with a finite bound, it may be many milliseconds before a safepoint is reached. Time To Safepoint (TTSP) is an important consideration in low-latency applications. This time can be surfaced by enabling the ‑XX:+PrintGCApplicationStoppedTime flag in addition to the other GC flags.

Note: For applications with a large number of running threads, when a stop-the-world event occurs a system will undergo significant scheduling pressure as the threads resume when released. Therefore algorithms with less reliance on stop-the-world events can potentially be more efficient.

Heap Organisation in Hotspot

To understand how the different collectors operate it is best to explore how the Java heap is organised to support generational collectors.

Eden is the region where most objects are initially allocated. The survivor spaces are a temporary store for objects that have survived a collection of the Eden space. Survivor space usage will be described when minor collections are discussed. Collectively Eden and the survivor spaces are known as the “young” or “new” generation.

Objects that live long enough are eventually promoted to the tenured space.

The perm generation is where the runtime stores objects it “knows” to be effectively immortal, such as Classes and static Strings. Unfortunately the common use of class loading on an ongoing basis in many applications makes the motivating assumption behind the perm generation wrong, i.e. that classes are immortal. In Java 7 interned Strings were moved from permgen to tenured, and from Java 8 the perm generation is no more and will not be discussed in this article. Most other commercial collectors do not use a separate perm space and tend to treat all long living objects as tenured.

Note: The Virtual spaces allow the collectors to adjust the size of regions to meet throughput and latency targets. Collectors keep statistics for each collection phase and adjust the region sizes accordingly in an attempt to reach the targets.

Object Allocation

To avoid contention each thread is assigned a Thread Local Allocation Buffer (TLAB) from which it allocates objects. Using TLABs allows object allocation to scale with number of threads by avoiding contention on a single memory resource. Object allocation via a TLAB is a very cheap operation; it simply bumps a pointer for the object size which takes roughly 10 instructions on most platforms. Heap memory allocation for Java is even cheaper than using malloc from the C runtime.


Note: Whereas individual object allocation is very cheap, the rate at which minor collections must occur is directly proportional to the rate of object allocation.

When a TLAB is exhausted a thread simply requests a new one from the Eden space. When Eden has been filled a minor collection commences.

Large objects (-XX:PretenureSizeThreshold=<n>) may fail to be accommodated in the young generation and thus have to be allocated in the old generation, e.g. a large array. If the threshold is set below TLAB size then objects that fit in the TLAB will not be created in the old generation. The new G1 collector handles large objects differently and will be discussed later in its own section.

Minor Collections

A minor collection is triggered when Eden becomes full. This is done by copying all the live objects in the new generation to either a survivor space or the tenured space as appropriate. Copying to the tenured space is known as promotion or tenuring. Promotion occurs for objects that are sufficiently old (– XX:MaxTenuringThreshold=<n>), or when the survivor space overflows.

Live objects are objects that are reachable by the application; any other objects cannot be reached and can therefore be considered dead. In a minor collection, the copying of live objects is performed by first following what are known as GC Roots, and iteratively copying anything reachable to the survivor space. GC Roots normally include references from application and JVM-internal static fields, and from thread stack-frames, all of which effectively point to the application’s reachable object graphs.

In generational collection, the GC Roots for the new generation’s reachable object graph also include any references from the old generation to the new generation. These references must also be processed to make sure all reachable objects in the new generation survive the minor collection. Identifying these cross-generational references is achieved by use of a “card table”. The Hotspot card table is an array of bytes in which each byte is used to track the potential existence of cross-generational references in a corresponding 512 byte region of the old generation. As references are stored to the heap, “store barrier” code will mark cards to indicate that a potential reference from the old generation to the new generation may exist in the associated 512 byte heap region. At collection time, the card table is used to scan for such cross-generational references, which effectively represent additional GC Roots into the new generation. Therefore a significant fixed cost of minor collections is directly proportional to the size of the old generation.

There are two survivor spaces in the Hotspot new generation, which alternate in their “to-space” and “from-space” roles. At the beginning of a minor collection, the to-space survivor space is always empty, and acts as a target copy area for the minor collection. The previous minor collection’s target survivor space is part of the from-space, which also includes Eden, where live objects that need to be copied may be found.

The cost of a minor GC collection is usually dominated by the cost of copying objects to the survivor and tenured spaces. Objects that do not survive a minor collection are effectively free to be dealt with. The work done during a minor collection is directly proportional to the number of live objects found, and not to the size of the new generation. The total time spent doing minor collections can be almost be halved each time the Eden size is doubled. Memory can therefore be traded for throughput. A doubling of Eden size will result in an increase in collection time per-collection cycle, but this is relatively small if both the number of objects being promoted and size of the old generation is constant.

Note: In Hotspot minor collections are stop-the-world events. This is rapidly becoming a major issue as our heaps get larger with more live objects. We are already starting to see the need for concurrent collection of the young generation to reach pause-time targets.

Major Collections

Major collections collect the old generation so that objects can be promoted from the young generation. In most applications, the vast majority of program state ends up in the old generation. The greatest variety of GC algorithms exists for the old generation. Some will compact the whole space when it fills, whereas others will collect concurrently with the application in an effort to prevent it from filling up.

The old generation collector will try to predict when it needs to collect to avoid a promotion failure from the young generation. The collectors track a fill threshold for the old generation and begin collection when this threshold is passed. If this threshold is not sufficient to meet promotion requirements then a “FullGC” is triggered. A FullGC involves promoting all live objects from the young generations followed by a collection and compaction of the old generation. Promotion failure is a very expensive operation as state and promoted objects from this cycle must be unwound so the FullGC event can occur.

Note: To avoid promotion failure you will need to tune the padding that the old generation allows to accommodate promotions (‑XX:PromotedPadding=<n>).

Note: When the Heap needs to grow a FullGC is triggered. These heap-resizing FullGCs can be avoided by setting –Xms and –Xmx to the same value.

Other than a FullGC, a compaction of the old generation is likely to be the largest stop-the-world pause an application will experience. The time for this compaction tends to grow linearly with the number of live objects in the tenured space.

The rate at which the tenured space fills up can sometimes be reduced by increasing the size of the survivor spaces and the age of objects before being promoted to the tenured generation. However, increasing the size of the survivor spaces and object age in Minor collections (–XX:MaxTenuringThreshold=<n>) before promotion can also increase the cost and pause times in the minor collections due to the increased copy cost between survivor spaces on minor collections.

Serial Collector

The Serial collector (-XX:+UseSerialGC) is the simplest collector and is a good option for single processor systems. It also has the smallest footprint of any collector. It uses a single thread for both minor and major collections. Objects are allocated in the tenured space using a simple bump the pointer algorithm. Major collections are triggered when the tenured space is full.

Parallel Collector

The Parallel collector comes in two forms. The Parallel collector (‑XX:+UseParallelGC) which uses multiple threads to perform minor collections of the young generation and a single thread for major collections on the old generation. The Parallel Old collector (‑XX:+UseParallelOldGC) , the default since Java 7u4, uses multiple threads for minor collections and multiple threads for major collections. Objects are allocated in the tenured space using a simple bump the pointer algorithm. Major collections are triggered when the tenured space is full.

On multiprocessor systems the Parallel Old collector will give the greatest throughput of any collector. It has no impact on a running application until a collection occurs, and then will collect in parallel using multiple threads using the most efficient algorithm. This makes the Parallel Old collector very suitable for batch applications.

The cost of collecting the old generations is affected by the number of objects to retain to a greater extent than by the size of the heap. Therefore the efficiency of the Parallel Old collector can be increased to achieve greater throughput by providing more memory and accepting larger, but fewer, collection pauses.

Expect the fastest minor collections with this collector because the promotion to tenured space is a simple bump the pointer and copy operation.

For server applications the Parallel Old collector should be the first port-of-call. However if the major collection pauses are more than your application can tolerate then you need to consider employing a concurrent collector that collects the tenured objects concurrently while the application is running.

Note: Expect pauses in the order of one to five seconds per GB of live data on modern hardware while the old generation is compacted.

Note: The parallel collector can sometimes gain performance benefits from -XX:+UseNUMA on multi-socket CPU server applications by allocating Eden memory for threads local to the CPU socket. It is a shame this feature is not available to the other collectors.

Concurrent Mark Sweep (CMS) Collector

The CMS (-XX:+UseConcMarkSweepGC) collector runs in the Old generation collecting tenured objects that are no longer reachable during a major collection. It runs concurrently with the application with the goal of keeping sufficient free space in the old generation so that a promotion failure from the young generation does not occur.

Promotion failure will trigger a FullGC. CMS follows a multistep process:
  1. Initial Mark : Find GC Roots.
  2. Concurrent Mark: Mark all reachable objects from the GC Roots.
  3. Concurrent Pre-clean: Check for object references that have been updated and objects that have been promoted during the concurrent mark phase by remarking.
  4. Re-mark : Capture object references that have been updated since the Pre-clean stage.
  5. Concurrent Sweep: Update the free-lists by reclaiming memory occupied by dead objects.
  6. Concurrent Reset: Reset data structures for next run.
As tenured objects become unreachable, the space is reclaimed by CMS and put on free-lists. When promotion occurs, the free-lists must be searched for a suitable sized hole for the promoted object. This increases the cost of promotion and thus increases the cost of the Minor collections compared to the Parallel Collector.

Note: CMS is not a compacting collector, which over time can result in old generation fragmentation. Object promotion can fail because a large object may not fit in the available holes in the old generation. When this happens a “promotion failed” message is logged and a FullGC is triggered to compact the live tenured objects. For such compaction-driven FullGCs, expect pauses to be worse than major collections using the Parallel Old collector because CMS uses only a single thread for compaction.

CMS is mostly concurrent with the application, which has a number of implications. First, CPU time is taken by the collector, thus reducing the CPU available to the application. The amount of time required by CMS grows linearly with the amount of object promotion to the tenured space. Second, for some phases of the concurrent GC cycle, all application threads have to be brought to a safepoint for marking GC Roots and performing a parallel re-mark to check for mutation.

Note: If an application sees significant mutation of tenured objects then the re-mark phase can be significant, at the extremes it may take longer than a full compaction with the Parallel Old collector.

CMS makes FullGC a less frequent event at the expense of reduced throughput, more expensive minor collections, and greater footprint. The reduction in throughput can be anything from 10%-40% compared to the Parallel collector, depending on promotion rate. CMS also requires a 20% greater footprint to accommodate additional data structures and “floating garbage” that can be missed during the concurrent marking that gets carried over to the next cycle.

High promotion rates and resulting fragmentation can sometimes be reduced by increasing the size of both the young and old generation spaces.

Note: CMS can suffer “concurrent mode failures”, which can be seen in the logs, when it fails to collect at a sufficient rate to keep up with promotion. This can be caused when the collection commences too late, which can sometimes be addressed by tuning. But it can also occur when the collection rate cannot keep up with the high promotion rate or with the high object mutation rate of some applications. If the promotion rate, or mutation rate, of the application is too high then your application might require some changes to reduce the promotion pressure. Adding more memory to such a system can sometimes make the situation worse, as CMS would then have more memory to scan.

Garbage First (G1) Collector

G1 (-XX:+UseG1GC) is a new collector introduced in Java 6 and now officially supported as of Java 7u4. It is a partially concurrent collecting algorithm that also tries to compact the tenured space in smaller incremental stop-the-world pauses to try and minimize the FullGC events that plague CMS because of fragmentation. G1 is a generational collector that organizes the heap differently from the other collectors by dividing it into a large number (~2000) of fixed size regions of variable purpose, rather than contiguous regions for the same purpose.


G1 takes the approach of concurrently marking regions to track references between regions, and to focus collection on the regions with the most free space. These regions are then collected in stop-the-world pause increments by evacuating the live objects to an empty region, thus compacting in the process.  The regions to be collected in a cycle are known as the Collection Set.

Objects larger than 50% of a region are allocated in humongous regions, which are a multiple of region size. Allocation and collection of humongous objects can be very costly under G1, and to date has had little or no optimisation effort applied.

The challenge with any compacting collector is not the moving of objects but the updating of references to those objects. If an object is referenced from many regions then updating those references can take significantly longer than moving the object. G1 tracks which objects in a region have references from other regions via the “Remembered Sets”. Remember Sets are collections of cards that have been marked for mutation. If the Remembered Sets become large then G1 can significantly slow down. When evacuating objects from one region to another, the length of the associated stop-the-world event tends to be proportional to the number of regions with references that need to be scanned and potentially patched.

Maintaining the Remembered Sets increases the cost of minor collections resulting in pauses greater than those seen with Parallel Old or CMS collectors for Minor collections.

G1 is target driven on latency –XX:MaxGCPauseMillis=<n>, default value = 200ms. The target will influence the amount of work done on each cycle on a best-efforts only basis. Setting targets in tens of milliseconds is mostly futile, and as of this writing targeting tens of milliseconds has not been a focus of G1.

G1 is a good general-purpose collector for larger heaps that have a tendency to become fragmented when an application can tolerate pauses in the 0.5-1.0 second range for incremental compactions. G1 tends to reduce the frequency of the worst-case pauses seen by CMS because of fragmentation at the cost of extended minor collections and incremental compactions of the old generation. Most pauses end up being constrained to regional rather than full heap compactions.

Like CMS, G1 can also fail to keep up with promotion rates, and will fall back to a stop-the-world FullGC. Just like CMS has “concurrent mode failure”, G1 can suffer an evacuation failure, seen in the logs as “to-space overflow”. This occurs when there are no free regions into which objects can be evacuated, which is similar to a promotion failure. If this occurs, try using a larger heap and more marking threads, but in some cases application changes may be necessary to reduce allocation rates.

A challenging problem for G1 is dealing with popular objects and regions. Incremental stop-the-world compaction works well when regions have live objects that are not heavily referenced from other regions. If an object or region is popular then the Remembered Set will be large, and G1 will try to avoid collecting those objects. Eventually it can have no choice, which results in very frequent mid-length pauses as the heap gets compacted.

Alternative Concurrent Collectors

CMS and G1 are often called mostly concurrent collectors. When you look at the total work performed it is clear that the young generation, promotion and even much of the old generation work is not concurrent at all. CMS is mostly concurrent for the old generation; G1 is much more of a stop-the-world incremental collector. Both CMS and G1 have significant and regularly occurring stop-the-world events, and worst-case scenarios that often make them unsuitable for strict low-latency applications, such a financial trading or reactive user interfaces.

Alternative collectors are available such as Oracle JRockit Real Time, IBM Websphere Real Time, and Azul Zing. The JRockit and Websphere collectors have latency advantages in most cases over CMS and G1 but often see throughput limitations and still suffer significant stop-the-world events. Zing is the only Java collector know to this author that can be truly concurrent for collection and compaction while maintaining a high-throughput rate for all generations. Zing does have some sub-millisecond stop-the-world events but these are for phase shifts in the collection cycle that are not related to live object set size.

JRockit RT can achieve typical pause times in the tens of milliseconds for high allocation rates at contained heap sizes but occasionally has to fail back to full compaction pauses. Websphere RT can achieve single-digit millisecond pause times via constrained allocation rates and live set sizes. Zing can achieve sub-millisecond pauses with high allocation rates by being concurrent for all phases, including during minor collections. Zing is able to maintain this consistent behavior regardless of heap size, allowing the user to apply large heap sizes as needed for keeping up with application throughput or object model state needs, without fear of increased pause times.

For all the concurrent collectors targeting latency you have to give up some throughput and gain footprint. Depending on the efficiency of the concurrent collector you may give up a little throughput but you are always adding significant footprint. If truly concurrent, with few stop-the-world events, then more CPU cores are needed to enable the concurrent operation and maintain throughput.

Note: All the concurrent collectors tend to function more efficiently when sufficient space is allocated. As a starting point rule of thumb, you should budget a heap of at least two to three times the size of the live set for efficient operation. However, space requirements for maintaining concurrent operation grows with application throughput, and the associated allocation and promotion rates. So for higher throughput applications a higher heap-size to live set ratio may be warranted. Given the huge memory spaces available to today’s systems footprint is seldom an issue on the server side.

Garbage Collection Monitoring & Tuning

To understand how your application and garbage collector are behaving, start your JVM with at least the following settings:
-verbose:gc
-Xloggc:
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
-XX:+PrintTenuringDistribution
-XX:+PrintGCApplicationConcurrentTime 
-XX:+PrintGCApplicationStoppedTime

Then load the logs into a tool like Chewiebug for analysis.

To see the dynamic nature of GC, launch JVisualVM and install the Visual GC plugin. This will enable you to see the GC in action for your application as below.


To get an understanding of your applcations’ GC needs, you need representative load tests that can be executed repeatedly. As you get to grips with how each of the collectors work then run your load tests with different configurations as experiments until you reach your throughput and latency targets. It is important to measure latency from the end user perspective. This can be achieved by capturing the response time of every test request in a histogram, e.g. HdrHistogram or Disruptor Histogram. If you have latency spikes that are outside your acceptable range, then try and correlate these with the GC logs to determine if GC is the issue. It is possible other issues may be causing latency spikes. Another useful tool to consider is jHiccup which can be used to track pauses within the JVM and across a system as a whole. Measure your idle systems for a few hours with jHiccup and you will often be very surprised.

If latency spikes are due to GC then invest in tuning CMS or G1 to see if your latency targets can be meet. Sometimes this may not be possible because of high allocation and promotion rates combined with low-latency requirements. GC tuning can become a highly skilled exercise that often requires application changes to reduce object allocation rates or object lifetimes. If this is the case then a commercial trade-off between time and resource spent on GC tuning and application changes, verses, purchasing one of the commercial concurrent compacting JVMs such as JRockit Real Time or Azul Zing may be required.

Thursday 27 June 2013

Printing Generated Assembly Code From The Hotspot JIT Compiler

Sometimes when profiling a Java application it is necessary to understand the assembly code generated by the Hotspot JIT compiler. This can be useful in determining what optimisation decisions have been made and how our code changes can affect the generated assembly code. It is also useful at times knowing what instructions are emitted when debugging a concurrent algorithm to ensure visibility rules have been applied as expected. I have found quite a few bugs in various JVMs this way.

This blog illustrates how to install a Disassembler Plugin and provides command line options for targeting a particular method.

Installation

Previously it was necessary to obtain a debug build for printing the assembly code generated by the Hotspot JIT for the Oracle/SUN JVM. Since Java 7, it has been possible to print the generated assembly code if a Disassembler Plugin is installed in a standard Oracle Hotspot JVM. To install the plugin for 64-bit Linux follow the steps below:
  1. Download the appropriate binary, or build from source, from https://kenai.com/projects/base-hsdis/downloads
  2. On Linux rename linux-hsdis-amd64.so to libhsdis-amd64.so
  3. Copy the shared library to $JAVA_HOME/jre/lib/amd64/server
You now have the plugin installed!

Test Program

To test the plugin we need some code that is both interesting to a programmer and executes sufficiently hot to be optimised by the JIT. Some details of when the JIT will optimise can be found here. The code below can be used to measure the average latency between two threads by reading and writing volatile fields. These volatile fields are interesting because they require associated hardware fences to honour the Java Memory Model.
import static java.lang.System.out;

public class InterThreadLatency
{
    private static final int REPETITIONS = 100 * 1000 * 1000;

    private static volatile int ping = -1;
    private static volatile int pong = -1;

    public static void main(final String[] args)
        throws Exception
    {
        for (int i = 0; i < 5; i++)
        {
            final long duration = runTest();

            out.printf("%d - %dns avg latency - ping=%d pong=%d\n",
                       i,
                       duration / (REPETITIONS * 2),
                       ping,
                       pong);
        }
    }

    private static long runTest() throws InterruptedException
    {
        final Thread pongThread = new Thread(new PongRunner());
        final Thread pingThread = new Thread(new PingRunner());
        pongThread.start();
        pingThread.start();

        final long start = System.nanoTime();
        pongThread.join();

        return System.nanoTime() - start;
    }

    public static class PingRunner implements Runnable
    {
        public void run()
        {
            for (int i = 0; i < REPETITIONS; i++)
            {
                ping = i;

                while (i != pong)
                {
                    // busy spin
                }
            }
        }
    }

    public static class PongRunner implements Runnable
    {
        public void run()
        {
            for (int i = 0; i < REPETITIONS; i++)
            {
                while (i != ping)
                {
                    // busy spin
                }

                pong = i;
            }
        }
    }
}
Printing Assembly Code

It is possible to print all generated assembly code with the following statement.

java -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly InterThreadLatency

However this can put you in the situation of not being able to see the forest for the trees. It is generally much more useful to target a particular method. For this test, the run() method will be optimised and generated twice by Hotspot. Once for the OSR version, and then again for the standard JIT version. The standard JIT version follows.

java -XX:+UnlockDiagnosticVMOptions '-XX:CompileCommand=print,*PongRunner.run' InterThreadLatency

Compiled method (c2)   10531    5             InterThreadLatency$PongRunner::run (30 bytes)
 total in heap  [0x00007fed81060850,0x00007fed81060b30] = 736
 relocation     [0x00007fed81060970,0x00007fed81060980] = 16
 main code      [0x00007fed81060980,0x00007fed81060a00] = 128
 stub code      [0x00007fed81060a00,0x00007fed81060a18] = 24
 oops           [0x00007fed81060a18,0x00007fed81060a30] = 24
 scopes data    [0x00007fed81060a30,0x00007fed81060a78] = 72
 scopes pcs     [0x00007fed81060a78,0x00007fed81060b28] = 176
 dependencies   [0x00007fed81060b28,0x00007fed81060b30] = 8
Decoding compiled method 0x00007fed81060850:
Code:
[Entry Point]
[Constants]
  # {method} 'run' '()V' in 'InterThreadLatency$PongRunner'
  #           [sp+0x20]  (sp of caller)
  0x00007fed81060980: mov    0x8(%rsi),%r10d
  0x00007fed81060984: shl    $0x3,%r10
  0x00007fed81060988: cmp    %r10,%rax
  0x00007fed8106098b: jne    0x00007fed81037a60  ;   {runtime_call}
  0x00007fed81060991: xchg   %ax,%ax
  0x00007fed81060994: nopl   0x0(%rax,%rax,1)
  0x00007fed8106099c: xchg   %ax,%ax
[Verified Entry Point]
  0x00007fed810609a0: sub    $0x18,%rsp
  0x00007fed810609a7: mov    %rbp,0x10(%rsp)    ;*synchronization entry
                                                ; - InterThreadLatency$PongRunner::run@-1 (line 58)
  0x00007fed810609ac: xor    %r11d,%r11d
  0x00007fed810609af: mov    $0x7ad0fcbf0,%r10  ;   {oop(a 'java/lang/Class' = 'InterThreadLatency')}
  0x00007fed810609b9: jmp    0x00007fed810609d0
  0x00007fed810609bb: nopl   0x0(%rax,%rax,1)   ; OopMap{r10=Oop off=64}
                                                ;*goto
                                                ; - InterThreadLatency$PongRunner::run@15 (line 60)
  0x00007fed810609c0: test   %eax,0xaa1663a(%rip)        # 0x00007fed8ba77000
                                                ;*goto
                                                ; - InterThreadLatency$PongRunner::run@15 (line 60)
                                                ;   {poll}
  0x00007fed810609c6: nopw   0x0(%rax,%rax,1)   ;*iload_1
                                                ; - InterThreadLatency$PongRunner::run@8 (line 60)
  0x00007fed810609d0: mov    0x74(%r10),%r9d    ;*getstatic ping
                                                ; - InterThreadLatency::access$000@0 (line 3)
                                                ; - InterThreadLatency$PongRunner::run@9 (line 60)
  0x00007fed810609d4: cmp    %r9d,%r11d
  0x00007fed810609d7: jne    0x00007fed810609c0
  0x00007fed810609d9: mov    %r11d,0x78(%r10)
  0x00007fed810609dd: lock addl $0x0,(%rsp)     ;*putstatic pong
                                                ; - InterThreadLatency::access$102@2 (line 3)
                                                ; - InterThreadLatency$PongRunner::run@19 (line 65)
  0x00007fed810609e2: inc    %r11d              ;*iinc
                                                ; - InterThreadLatency$PongRunner::run@23 (line 58)
  0x00007fed810609e5: cmp    $0x5f5e100,%r11d
  0x00007fed810609ec: jl     0x00007fed810609d0  ;*if_icmpeq
                                                ; - InterThreadLatency$PongRunner::run@12 (line 60)
  0x00007fed810609ee: add    $0x10,%rsp
  0x00007fed810609f2: pop    %rbp
  0x00007fed810609f3: test   %eax,0xaa16607(%rip)        # 0x00007fed8ba77000
                                                ;   {poll_return}
  0x00007fed810609f9: retq                      ;*iload_1
                                                ; - InterThreadLatency$PongRunner::run@8 (line 60)
  0x00007fed810609fa: hlt    
  0x00007fed810609fb: hlt    
  0x00007fed810609fc: hlt    
  0x00007fed810609fd: hlt    
  0x00007fed810609fe: hlt    
  0x00007fed810609ff: hlt    
[Exception Handler]
[Stub Code]
  0x00007fed81060a00: jmpq   0x00007fed8105eaa0  ;   {no_reloc}
[Deopt Handler Code]
  0x00007fed81060a05: callq  0x00007fed81060a0a
  0x00007fed81060a0a: subq   $0x5,(%rsp)
  0x00007fed81060a0f: jmpq   0x00007fed81038c00  ;   {runtime_call}
  0x00007fed81060a14: hlt    
  0x00007fed81060a15: hlt    
  0x00007fed81060a16: hlt    
  0x00007fed81060a17: hlt    
OopMapSet contains 1 OopMaps

#0 
OopMap{r10=Oop off=64}

An Interesting Observation

The red highlighted lines of assembly code above are very interesting. When a volatile field is written, under the Java Memory Model the write must be sequentially consistent, i.e. not appear to be reordered due to optimisations normally applied such as staging the write to the store buffer. This can be achieved by inserting the appropriate memory barriers. In the case above, Hotspot has chosen to enforce the ordering by issuing a MOV instruction (register to memory address - i.e. the write) followed by a LOCK ADD instruction (no op to the stack pointer as a fence idiom) that has ordering semantics. This could be less than ideal on an x86 processor. The same action could have been performed more efficiently and correctly with a single LOCK XCHG instruction for the write. This makes me wonder if there are some significant compromises in the JVM to make it portable across many architectures, rather than be the best it can on x86.

Wednesday 17 October 2012

Compact Off-Heap Structures/Tuples In Java

In my last post I detailed the implications of the access patterns your code takes to main memory.  Since then I've had a lot of questions about what can be done in Java to enable more predictable memory layout.  There are patterns that can be applied using array backed structures which I will discuss in another post.   This post will explore how to simulate a feature sorely missing in Java - arrays of structures similar to what C has to offer.

Structures are very useful, both on the stack and the heap.  To my knowledge it is not possible to simulate this feature on the Java stack.  Not being able to do this on the stack is such as shame because it greatly limits the performance of some parallel algorithms, however that is a rant for another day.

In Java, all user defined types have to exist on the heap.  The Java heap is managed by the garbage collector in the general case, however there is more to the wider heap in a Java process.  With the introduction of direct ByteBuffer, memory can be allocated which is not tracked by the garbage collector because it can be available to native code for tasks like avoiding the copying of data to and from the kernel for IO.  So one method of managing structures is to fake them within a ByteBuffer as a reasonable approach.  This can allow compact data representations, but has performance and size limitations.  For example, it is not possible to have a ByteBuffer greater than 2GB, and all access is bounds checked which impacts performance.  An alternative exists using Unsafe that is both faster and and not size constrained like ByteBuffer.

The approach I'm about to detail is not traditional Java.  If your problem space is dealing with big data, or extreme performance, then there are benefits to be had.  If your data sets are small, and performance is not an issue, then run away now to avoid getting sucked into the dark arts of native memory management.

The benefits of the approach I'm about to detail are:
  1. Significantly improved performance 
  2. More compact data representation
  3. Ability to work with very large data sets while avoiding nasty GC pauses[1]
With all choices there are consequences.  By taking the approach detailed below you take responsibility for some of the memory managment yourself.  Getting it wrong can lead to memory leaks, or worse, you can crash the JVM!  Proceed with caution...

Suitable Example - Trade Data

A common challenge faced in finance applications is capturing and working with very large volumes of order and trade data.  For the example I will create a large table of in-memory trade data that can have analysis queries run against it.  This table will be built using 2 contrasting approaches.  Firstly, I'll take the traditional Java approach of creating a large array and reference individual Trade objects.  Secondly, I keep the usage code identical but replace the large array and Trade objects with an off-heap array of structures that can be manipulated via a Flyweight pattern.

If for the traditional Java approach I used some other data structure, such as a Map or Tree, then the memory footprint would be even greater and the performance lower.

Traditional Java Approach
public class TestJavaMemoryLayout
{
    private static final int NUM_RECORDS = 50 * 1000 * 1000;

    private static JavaMemoryTrade[] trades;

    public static void main(final String[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            System.gc();
            perfRun(i);
        }
    }

    private static void perfRun(final int runNum)
    {
        long start = System.currentTimeMillis();

        init();

        System.out.format("Memory %,d total, %,d free\n",
                          Runtime.getRuntime().totalMemory(),
                          Runtime.getRuntime().freeMemory());

        long buyCost = 0;
        long sellCost = 0;

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            final JavaMemoryTrade trade = get(i);

            if (trade.getSide() == 'B')
            {
                buyCost += (trade.getPrice() * trade.getQuantity());
            }
            else
            {
                sellCost += (trade.getPrice() * trade.getQuantity());
            }
        }

        long duration = System.currentTimeMillis() - start;
        System.out.println(runNum + " - duration " + duration + "ms");
        System.out.println("buyCost = " + buyCost + " sellCost = " + sellCost);
    }

    private static JavaMemoryTrade get(final int index)
    {
        return trades[index];
    }

    public static void init()
    {
        trades = new JavaMemoryTrade[NUM_RECORDS];

        final byte[] londonStockExchange = {'X', 'L', 'O', 'N'};
        final int venueCode = pack(londonStockExchange);

        final byte[] billiton = {'B', 'H', 'P'};
        final int instrumentCode = pack( billiton);

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            JavaMemoryTrade trade = new JavaMemoryTrade();
            trades[i] = trade;

            trade.setTradeId(i);
            trade.setClientId(1);
            trade.setVenueCode(venueCode);
            trade.setInstrumentCode(instrumentCode);

            trade.setPrice(i);
            trade.setQuantity(i);

            trade.setSide((i & 1) == 0 ? 'B' : 'S');
        }
    }

    private static int pack(final byte[] value)
    {
        int result = 0;
        switch (value.length)
        {
            case 4:
                result = (value[3]);
            case 3:
                result |= ((int)value[2] << 8);
            case 2:
                result |= ((int)value[1] << 16);
            case 1:
                result |= ((int)value[0] << 24);
                break;

            default:
                throw new IllegalArgumentException("Invalid array size");
        }

        return result;
    }

    private static class JavaMemoryTrade
    {
        private long tradeId;
        private long clientId;
        private int venueCode;
        private int instrumentCode;
        private long price;
        private long quantity;
        private char side;

        public long getTradeId()
        {
            return tradeId;
        }

        public void setTradeId(final long tradeId)
        {
            this.tradeId = tradeId;
        }

        public long getClientId()
        {
            return clientId;
        }

        public void setClientId(final long clientId)
        {
            this.clientId = clientId;
        }

        public int getVenueCode()
        {
            return venueCode;
        }

        public void setVenueCode(final int venueCode)
        {
            this.venueCode = venueCode;
        }

        public int getInstrumentCode()
        {
            return instrumentCode;
        }

        public void setInstrumentCode(final int instrumentCode)
        {
            this.instrumentCode = instrumentCode;
        }

        public long getPrice()
        {
            return price;
        }

        public void setPrice(final long price)
        {
            this.price = price;
        }

        public long getQuantity()
        {
            return quantity;
        }

        public void setQuantity(final long quantity)
        {
            this.quantity = quantity;
        }

        public char getSide()
        {
            return side;
        }

        public void setSide(final char side)
        {
            this.side = side;
        }
    }
}
Compact Off-Heap Structures
import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class TestDirectMemoryLayout
{
    private static final Unsafe unsafe;
    static
    {
        try
        {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe)field.get(null);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    private static final int NUM_RECORDS = 50 * 1000 * 1000;

    private static long address;
    private static final DirectMemoryTrade flyweight = new DirectMemoryTrade();

    public static void main(final String[] args)
    {
        for (int i = 0; i < 5; i++)
        {
            System.gc();
            perfRun(i);
        }
    }

    private static void perfRun(final int runNum)
    {
        long start = System.currentTimeMillis();

        init();

        System.out.format("Memory %,d total, %,d free\n",
                          Runtime.getRuntime().totalMemory(),
                          Runtime.getRuntime().freeMemory());

        long buyCost = 0;
        long sellCost = 0;

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            final DirectMemoryTrade trade = get(i);

            if (trade.getSide() == 'B')
            {
                buyCost += (trade.getPrice() * trade.getQuantity());
            }
            else
            {
                sellCost += (trade.getPrice() * trade.getQuantity());
            }
        }

        long duration = System.currentTimeMillis() - start;
        System.out.println(runNum + " - duration " + duration + "ms");
        System.out.println("buyCost = " + buyCost + " sellCost = " + sellCost);

        destroy();
    }

    private static DirectMemoryTrade get(final int index)
    {
        final long offset = address + (index * DirectMemoryTrade.getObjectSize());
        flyweight.setObjectOffset(offset);
        return flyweight;
    }

    public static void init()
    {
        final long requiredHeap = NUM_RECORDS * DirectMemoryTrade.getObjectSize();
        address = unsafe.allocateMemory(requiredHeap);

        final byte[] londonStockExchange = {'X', 'L', 'O', 'N'};
        final int venueCode = pack(londonStockExchange);

        final byte[] billiton = {'B', 'H', 'P'};
        final int instrumentCode = pack( billiton);

        for (int i = 0; i < NUM_RECORDS; i++)
        {
            DirectMemoryTrade trade = get(i);

            trade.setTradeId(i);
            trade.setClientId(1);
            trade.setVenueCode(venueCode);
            trade.setInstrumentCode(instrumentCode);

            trade.setPrice(i);
            trade.setQuantity(i);

            trade.setSide((i & 1) == 0 ? 'B' : 'S');
        }
    }

    private static void destroy()
    {
        unsafe.freeMemory(address);
    }

    private static int pack(final byte[] value)
    {
        int result = 0;
        switch (value.length)
        {
            case 4:
                result |= (value[3]);
            case 3:
                result |= ((int)value[2] << 8);
            case 2:
                result |= ((int)value[1] << 16);
            case 1:
                result |= ((int)value[0] << 24);
                break;

            default:
                throw new IllegalArgumentException("Invalid array size");
        }

        return result;
    }

    private static class DirectMemoryTrade
    {
        private static long offset = 0;

        private static final long tradeIdOffset = offset += 0;
        private static final long clientIdOffset = offset += 8;
        private static final long venueCodeOffset = offset += 8;
        private static final long instrumentCodeOffset = offset += 4;
        private static final long priceOffset = offset += 4;
        private static final long quantityOffset = offset += 8;
        private static final long sideOffset = offset += 8;

        private static final long objectSize = offset += 2;

        private long objectOffset;

        public static long getObjectSize()
        {
            return objectSize;
        }

        void setObjectOffset(final long objectOffset)
        {
            this.objectOffset = objectOffset;
        }

        public long getTradeId()
        {
            return unsafe.getLong(objectOffset + tradeIdOffset);
        }

        public void setTradeId(final long tradeId)
        {
            unsafe.putLong(objectOffset + tradeIdOffset, tradeId);
        }

        public long getClientId()
        {
            return unsafe.getLong(objectOffset + clientIdOffset);
        }

        public void setClientId(final long clientId)
        {
            unsafe.putLong(objectOffset + clientIdOffset, clientId);
        }

        public int getVenueCode()
        {
            return unsafe.getInt(objectOffset + venueCodeOffset);
        }

        public void setVenueCode(final int venueCode)
        {
            unsafe.putInt(objectOffset + venueCodeOffset, venueCode);
        }

        public int getInstrumentCode()
        {
            return unsafe.getInt(objectOffset + instrumentCodeOffset);
        }

        public void setInstrumentCode(final int instrumentCode)
        {
            unsafe.putInt(objectOffset + instrumentCodeOffset, instrumentCode);
        }

        public long getPrice()
        {
            return unsafe.getLong(objectOffset + priceOffset);
        }

        public void setPrice(final long price)
        {
            unsafe.putLong(objectOffset + priceOffset, price);
        }

        public long getQuantity()
        {
            return unsafe.getLong(objectOffset + quantityOffset);
        }

        public void setQuantity(final long quantity)
        {
            unsafe.putLong(objectOffset + quantityOffset, quantity);
        }

        public char getSide()
        {
            return unsafe.getChar(objectOffset + sideOffset);
        }

        public void setSide(final char side)
        {
            unsafe.putChar(objectOffset + sideOffset, side);
        }
    }
}
Results
Intel i7-860 @ 2.8GHz, 8GB RAM DDR3 1333MHz, 
Windows 7 64-bit, Java 1.7.0_07
=============================================
java -server -Xms4g -Xmx4g TestJavaMemoryLayout
Memory 4,116,054,016 total, 1,108,901,104 free
0 - duration 19334ms
Memory 4,116,054,016 total, 1,109,964,752 free
1 - duration 14295ms
Memory 4,116,054,016 total, 1,108,455,504 free
2 - duration 14272ms
Memory 3,817,799,680 total, 815,308,600 free
3 - duration 28358ms
Memory 3,817,799,680 total, 810,552,816 free
4 - duration 32487ms

java -server TestDirectMemoryLayout
Memory 128,647,168 total, 126,391,384 free
0 - duration 983ms
Memory 128,647,168 total, 126,992,160 free
1 - duration 958ms
Memory 128,647,168 total, 127,663,408 free
2 - duration 873ms
Memory 128,647,168 total, 127,663,408 free
3 - duration 886ms
Memory 128,647,168 total, 127,663,408 free
4 - duration 884ms

Intel i7-2760QM @ 2.40GHz, 8GB RAM DDR3 1600MHz, 
Linux 3.4.11 kernel 64-bit, Java 1.7.0_07
=================================================
java -server -Xms4g -Xmx4g TestJavaMemoryLayout
Memory 4,116,054,016 total, 1,108,912,960 free
0 - duration 12262ms
Memory 4,116,054,016 total, 1,109,962,832 free
1 - duration 9822ms
Memory 4,116,054,016 total, 1,108,458,720 free
2 - duration 10239ms
Memory 3,817,799,680 total, 815,307,640 free
3 - duration 21558ms
Memory 3,817,799,680 total, 810,551,856 free
4 - duration 23074ms

java -server TestDirectMemoryLayout 
Memory 123,994,112 total, 121,818,528 free
0 - duration 634ms
Memory 123,994,112 total, 122,455,944 free
1 - duration 619ms
Memory 123,994,112 total, 123,103,320 free
2 - duration 546ms
Memory 123,994,112 total, 123,103,320 free
3 - duration 547ms
Memory 123,994,112 total, 123,103,320 free
4 - duration 534ms
Analysis

Let's compare the results to the 3 benefits promised above.

1.  Significantly improved performance

The evidence here is pretty clear cut.  Using the off-heap structures approach is more than an order of magnitude faster.  At the most extreme, look at the 5th run on a Sandy Bridge processor, we have 43.2 times difference in duration to complete the task.  It is also a nice illustration of how well Sandy Bridge does with predictable access patterns to data.  Not only is the performance significantly better it is also more consistent.  As the heap becomes fragmented, and thus access patterns become more random, the performance degrades as can be seen in the later runs with standard Java approach.

2.  More compact data representation

For our off-heap representation each object requires 42-bytes.  To store 50 million of these, as in the example, we require 2,100,000,000 bytes.  The memory required by the JVM heap is:

   memory required = total memory - free memory - base JVM needs 

     2,883,248,712 = 3,817,799,680 - 810,551,856 - 123,999,112

This implies the JVM needs ~40% more memory to represent the same data.  The reason for this overhead is the array of references to the Java objects plus the object headers.  In a previous post I discussed object layout in Java.

When working with very large data sets this overhead can become a significant limiting factor.

3.  Ability to work with very large data sets while avoiding nasty GC pauses

The sample code above forces a GC cycle before each run and can improve the consistency of the results in some cases.  Feel free to remove the call to System.gc() and observe the implications for yourself.  If you run the tests adding the following command line arguments then the garbage collector will output in painful detail what happened.

-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCApplicationStoppedTime -XX:+PrintSafepointStatistics

From analysing the output I can see the application underwent a total of 29 GC cycles.  Pause times are listed below by extracting the lines from the output indicating when the application threads are stopped.
With System.gc() before each run
================================
Total time for which application threads were stopped: 0.0085280 seconds
Total time for which application threads were stopped: 0.7280530 seconds
Total time for which application threads were stopped: 8.1703460 seconds
Total time for which application threads were stopped: 5.6112210 seconds
Total time for which application threads were stopped: 1.2531370 seconds
Total time for which application threads were stopped: 7.6392250 seconds
Total time for which application threads were stopped: 5.7847050 seconds
Total time for which application threads were stopped: 1.3070470 seconds
Total time for which application threads were stopped: 8.2520880 seconds
Total time for which application threads were stopped: 6.0949910 seconds
Total time for which application threads were stopped: 1.3988480 seconds
Total time for which application threads were stopped: 8.1793240 seconds
Total time for which application threads were stopped: 6.4138720 seconds
Total time for which application threads were stopped: 4.4991670 seconds
Total time for which application threads were stopped: 4.5612290 seconds
Total time for which application threads were stopped: 0.3598490 seconds
Total time for which application threads were stopped: 0.7111000 seconds
Total time for which application threads were stopped: 1.4426750 seconds
Total time for which application threads were stopped: 1.5931500 seconds
Total time for which application threads were stopped: 10.9484920 seconds
Total time for which application threads were stopped: 7.0707230 seconds

Without System.gc() before each run
===================================
Test run times
0 - duration 12120ms
1 - duration 9439ms
2 - duration 9844ms
3 - duration 20933ms
4 - duration 23041ms

Total time for which application threads were stopped: 0.0170860 seconds
Total time for which application threads were stopped: 0.7915350 seconds
Total time for which application threads were stopped: 10.7153320 seconds
Total time for which application threads were stopped: 5.6234650 seconds
Total time for which application threads were stopped: 1.2689950 seconds
Total time for which application threads were stopped: 7.6238170 seconds
Total time for which application threads were stopped: 6.0114540 seconds
Total time for which application threads were stopped: 1.2990070 seconds
Total time for which application threads were stopped: 7.9918480 seconds
Total time for which application threads were stopped: 5.9997920 seconds
Total time for which application threads were stopped: 1.3430040 seconds
Total time for which application threads were stopped: 8.0759940 seconds
Total time for which application threads were stopped: 6.3980610 seconds
Total time for which application threads were stopped: 4.5572100 seconds
Total time for which application threads were stopped: 4.6193830 seconds
Total time for which application threads were stopped: 0.3877930 seconds
Total time for which application threads were stopped: 0.7429270 seconds
Total time for which application threads were stopped: 1.5248070 seconds
Total time for which application threads were stopped: 1.5312130 seconds
Total time for which application threads were stopped: 10.9120250 seconds
Total time for which application threads were stopped: 7.3528590 seconds
It can been seen from the output that a significant proportion of the time is spent in the garbage collector.  When your threads are stopped your application is not responsive.  These tests have been done with default GC settings.  It is possible to tune the GC for better results but this can be a highly skilled and significant effort.  The only JVM I know that copes well by not imposing long pause times, even under high-throughput conditions, is the Azul concurrent compacting collector.

When profiling this application, I can see that the majority of the time is spent allocating the objects and promoting them to the old generation because they do not fit in the young generation.  The initialisation costs can be removed from the timing but that is not realistic.  If the traditional Java approach is taken the state needs to be built up before the query can take place.  The end user of an application has to wait for the state to be built up and the query executed.

This test is really quite trivial.  Imagine working with similar data sets but at the 100 GB scale.

Note: When the garbage collector compacts a region, then objects that were next to each other can be moved far apart.  This can result in TLB and other cache misses.

Side Note On Serialization

A huge benefit of using off-heap structures in this manner is how they can be very easily serialised to network, or storage, by a simple memory copy as I have shown in the previous post.  This way we can completely bypass intermediate buffer and object allocation.

Conclusion

If you are willing to do some C style programming for large datasets it is possible to control the memory layout in Java by going off-heap.  If you do, the benefits in performance, compactness, and avoiding GC issues are significant.  However this is an approach that should not be used for all applications.  Its benefits are only noticable for very large datasets, or the extremes of performance in throughput and/or latency. 

I hope the Java community can collectively realise the importance of supporting structures both on the heap and the stack.  John Rose has done some excellent work in this area defining how tuples could be added to the JVM.  His talk on Arrays 2.0 from the JVM Language Summit this year is really worth a watch.  John discusses options for arrays of structures, and structures of arrays, in his talk.  If the tuples, as proposed by John, were available then the test described here could have comparable performance and be a more pleasant programming style.  The whole array of structures could be allocated in a single action thus bypassing the copy of individual objects across generations, and it would be stored in a compact contiguous fashion.  This would remove the significant GC issues for this class of problem.

Lately, I was comparing standard data structures between Java and .Net.  In some cases I observed a 6-10X performance advantage to .Net for things like maps and dictionaries when .Net used native structure support.  Let's get this into Java as soon as possible!

It is also pretty obvious from the results that if we are to use Java for real-time analysis on big data, then our standard garbage collectors need to significantly improve and support true concurrent operations.

[1] - To my knowledge the only JVM that deals well with very large heaps is Azul Zing

Thursday 5 July 2012

Native C/C++ Like Performance For Java Object Serialisation

Do you ever wish you could turn a Java object into a stream of bytes as fast as it can be done in a native language like C++?  If you use standard Java Serialization you could be disappointed with the performance.  Java Serialization was designed for a very different purpose than serialising objects as quickly and compactly as possible.

Why do we need fast and compact serialisation?  Many of our systems are distributed and we need to communicate by passing state between processes efficiently.  This state lives inside our objects.  I've profiled many systems and often a large part of the cost is the serialisation of this state to-and-from byte buffers.  I've seen a significant range of protocols and mechanisms used to achieve this.  At one end of the spectrum are the easy to use but inefficient protocols likes Java Serialisation, XML and JSON.  At the other end of this spectrum are the binary protocols that can be very fast and efficient but they require a deeper understanding and skill.

In this article I will illustrate the performance gains that are possible when using simple binary protocols and introduce a little known technique available in Java to achieve similar performance to what is possible with native languages like C or C++.

The three approaches to be compared are:
  1. Java Serialization: The standard method in Java of having an object implement Serializable.
  2. Binary via ByteBuffer: A simple protocol using the ByteBuffer API to write the fields of an object in binary format.  This is our baseline for what is considered a good binary encoding approach.
  3. Binary via Unsafe: Introduction to Unsafe and its collection of methods that allow direct memory manipulation.  Here I will show how to get similar performance to C/C++.
The Code
import sun.misc.Unsafe;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;

public final class TestSerialisationPerf
{
    public static final int REPETITIONS = 1 * 1000 * 1000;

    private static ObjectToBeSerialised ITEM =
        new ObjectToBeSerialised(
            1010L, true, 777, 99,
            new double[]{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
            new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});


    public static void main(final String[] arg) throws Exception
    {
        for (final PerformanceTestCase testCase : testCases)
        {
            for (int i = 0; i < 5; i++)
            {
                testCase.performTest();

                System.out.format("%d %s\twrite=%,dns read=%,dns total=%,dns\n",
                                  i,
                                  testCase.getName(),
                                  testCase.getWriteTimeNanos(),
                                  testCase.getReadTimeNanos(),
                                  testCase.getWriteTimeNanos() + 
                                  testCase.getReadTimeNanos());

                if (!ITEM.equals(testCase.getTestOutput()))
                {
                    throw new IllegalStateException("Objects do not match");
                }

                System.gc();
                Thread.sleep(3000);
            }
        }
    }

    private static final PerformanceTestCase[] testCases =
    {
        new PerformanceTestCase("Serialisation", REPETITIONS, ITEM)
        {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    baos.reset();

                    ObjectOutputStream oos = new ObjectOutputStream(baos);
                    oos.writeObject(item);
                    oos.close();
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    ByteArrayInputStream bais = 
                        new ByteArrayInputStream(baos.toByteArray());
                    ObjectInputStream ois = new ObjectInputStream(bais);
                    object = (ObjectToBeSerialised)ois.readObject();
                }

                return object;
            }
        },

        new PerformanceTestCase("ByteBuffer", REPETITIONS, ITEM)
        {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    byteBuffer.clear();
                    item.write(byteBuffer);
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    byteBuffer.flip();
                    object = ObjectToBeSerialised.read(byteBuffer);
                }

                return object;
            }
        },

        new PerformanceTestCase("UnsafeMemory", REPETITIONS, ITEM)
        {
            UnsafeMemory buffer = new UnsafeMemory(new byte[1024]);

            public void testWrite(ObjectToBeSerialised item) throws Exception
            {
                for (int i = 0; i < REPETITIONS; i++)
                {
                    buffer.reset();
                    item.write(buffer);
                }
            }

            public ObjectToBeSerialised testRead() throws Exception
            {
                ObjectToBeSerialised object = null;
                for (int i = 0; i < REPETITIONS; i++)
                {
                    buffer.reset();
                    object = ObjectToBeSerialised.read(buffer);
                }

                return object;
            }
        },
    };
}

abstract class PerformanceTestCase
{
    private final String name;
    private final int repetitions;
    private final ObjectToBeSerialised testInput;
    private ObjectToBeSerialised testOutput;
    private long writeTimeNanos;
    private long readTimeNanos;

    public PerformanceTestCase(final String name, final int repetitions,
                               final ObjectToBeSerialised testInput)
    {
        this.name = name;
        this.repetitions = repetitions;
        this.testInput = testInput;
    }

    public String getName()
    {
        return name;
    }

    public ObjectToBeSerialised getTestOutput()
    {
        return testOutput;
    }

    public long getWriteTimeNanos()
    {
        return writeTimeNanos;
    }

    public long getReadTimeNanos()
    {
        return readTimeNanos;
    }

    public void performTest() throws Exception
    {
        final long startWriteNanos = System.nanoTime();
        testWrite(testInput);
        writeTimeNanos = (System.nanoTime() - startWriteNanos) / repetitions;

        final long startReadNanos = System.nanoTime();
        testOutput = testRead();
        readTimeNanos = (System.nanoTime() - startReadNanos) / repetitions;
    }

    public abstract void testWrite(ObjectToBeSerialised item) throws Exception;
    public abstract ObjectToBeSerialised testRead() throws Exception;
}

class ObjectToBeSerialised implements Serializable
{
    private static final long serialVersionUID = 10275539472837495L;

    private final long sourceId;
    private final boolean special;
    private final int orderCode;
    private final int priority;
    private final double[] prices;
    private final long[] quantities;

    public ObjectToBeSerialised(final long sourceId, final boolean special,
                                final int orderCode, final int priority,
                                final double[] prices, final long[] quantities)
    {
        this.sourceId = sourceId;
        this.special = special;
        this.orderCode = orderCode;
        this.priority = priority;
        this.prices = prices;
        this.quantities = quantities;
    }

    public void write(final ByteBuffer byteBuffer)
    {
        byteBuffer.putLong(sourceId);
        byteBuffer.put((byte)(special ? 1 : 0));
        byteBuffer.putInt(orderCode);
        byteBuffer.putInt(priority);

        byteBuffer.putInt(prices.length);
        for (final double price : prices)
        {
            byteBuffer.putDouble(price);
        }

        byteBuffer.putInt(quantities.length);
        for (final long quantity : quantities)
        {
            byteBuffer.putLong(quantity);
        }
    }

    public static ObjectToBeSerialised read(final ByteBuffer byteBuffer)
    {
        final long sourceId = byteBuffer.getLong();
        final boolean special = 0 != byteBuffer.get();
        final int orderCode = byteBuffer.getInt();
        final int priority = byteBuffer.getInt();

        final int pricesSize = byteBuffer.getInt();
        final double[] prices = new double[pricesSize];
        for (int i = 0; i < pricesSize; i++)
        {
            prices[i] = byteBuffer.getDouble();
        }

        final int quantitiesSize = byteBuffer.getInt();
        final long[] quantities = new long[quantitiesSize];
        for (int i = 0; i < quantitiesSize; i++)
        {
            quantities[i] = byteBuffer.getLong();
        }

        return new ObjectToBeSerialised(sourceId, special, orderCode, 
                                        priority, prices, quantities);
    }

    public void write(final UnsafeMemory buffer)
    {
        buffer.putLong(sourceId);
        buffer.putBoolean(special);
        buffer.putInt(orderCode);
        buffer.putInt(priority);
        buffer.putDoubleArray(prices);
        buffer.putLongArray(quantities);
    }

    public static ObjectToBeSerialised read(final UnsafeMemory buffer)
    {
        final long sourceId = buffer.getLong();
        final boolean special = buffer.getBoolean();
        final int orderCode = buffer.getInt();
        final int priority = buffer.getInt();
        final double[] prices = buffer.getDoubleArray();
        final long[] quantities = buffer.getLongArray();

        return new ObjectToBeSerialised(sourceId, special, orderCode, 
                                        priority, prices, quantities);
    }

    public boolean equals(final Object o)
    {
        if (this == o)
        {
            return true;
        }
        if (o == null || getClass() != o.getClass())
        {
            return false;
        }

        final ObjectToBeSerialised that = (ObjectToBeSerialised)o;

        if (orderCode != that.orderCode)
        {
            return false;
        }
        if (priority != that.priority)
        {
            return false;
        }
        if (sourceId != that.sourceId)
        {
            return false;
        }
        if (special != that.special)
        {
            return false;
        }
        if (!Arrays.equals(prices, that.prices))
        {
            return false;
        }
        if (!Arrays.equals(quantities, that.quantities))
        {
            return false;
        }

        return true;
    }
}

class UnsafeMemory
{
    private static final Unsafe unsafe;
    static
    {
        try
        {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe)field.get(null);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    private static final long byteArrayOffset = unsafe.arrayBaseOffset(byte[].class);
    private static final long longArrayOffset = unsafe.arrayBaseOffset(long[].class);
    private static final long doubleArrayOffset = unsafe.arrayBaseOffset(double[].class);

    private static final int SIZE_OF_BOOLEAN = 1;
    private static final int SIZE_OF_INT = 4;
    private static final int SIZE_OF_LONG = 8;

    private int pos = 0;
    private final byte[] buffer;

    public UnsafeMemory(final byte[] buffer)
    {
        if (null == buffer)
        {
            throw new NullPointerException("buffer cannot be null");
        }

        this.buffer = buffer;
    }

    public void reset()
    {
        this.pos = 0;
    }

    public void putBoolean(final boolean value)
    {
        unsafe.putBoolean(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_BOOLEAN;
    }

    public boolean getBoolean()
    {
        boolean value = unsafe.getBoolean(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_BOOLEAN;

        return value;
    }

    public void putInt(final int value)
    {
        unsafe.putInt(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_INT;
    }

    public int getInt()
    {
        int value = unsafe.getInt(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_INT;

        return value;
    }

    public void putLong(final long value)
    {
        unsafe.putLong(buffer, byteArrayOffset + pos, value);
        pos += SIZE_OF_LONG;
    }

    public long getLong()
    {
        long value = unsafe.getLong(buffer, byteArrayOffset + pos);
        pos += SIZE_OF_LONG;

        return value;
    }

    public void putLongArray(final long[] values)
    {
        putInt(values.length);

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(values, longArrayOffset,
                          buffer, byteArrayOffset + pos,
                          bytesToCopy);
        pos += bytesToCopy;
    }

    public long[] getLongArray()
    {
        int arraySize = getInt();
        long[] values = new long[arraySize];

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(buffer, byteArrayOffset + pos,
                          values, longArrayOffset,
                          bytesToCopy);
        pos += bytesToCopy;

        return values;
    }

    public void putDoubleArray(final double[] values)
    {
        putInt(values.length);

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(values, doubleArrayOffset,
                          buffer, byteArrayOffset + pos,
                          bytesToCopy);
        pos += bytesToCopy;
    }

    public double[] getDoubleArray()
    {
        int arraySize = getInt();
        double[] values = new double[arraySize];

        long bytesToCopy = values.length << 3;
        unsafe.copyMemory(buffer, byteArrayOffset + pos,
                          values, doubleArrayOffset,
                          bytesToCopy);
        pos += bytesToCopy;

        return values;
    }
}

Results
2.8GHz Nehalem - Java 1.7.0_04
==============================
0 Serialisation  write=2,517ns read=11,570ns total=14,087ns
1 Serialisation  write=2,198ns read=11,122ns total=13,320ns
2 Serialisation  write=2,190ns read=11,011ns total=13,201ns
3 Serialisation  write=2,221ns read=10,972ns total=13,193ns
4 Serialisation  write=2,187ns read=10,817ns total=13,004ns
0 ByteBuffer     write=264ns   read=273ns    total=537ns
1 ByteBuffer     write=248ns   read=243ns    total=491ns
2 ByteBuffer     write=262ns   read=243ns    total=505ns
3 ByteBuffer     write=300ns   read=240ns    total=540ns
4 ByteBuffer     write=247ns   read=243ns    total=490ns
0 UnsafeMemory   write=99ns    read=84ns     total=183ns
1 UnsafeMemory   write=53ns    read=82ns     total=135ns
2 UnsafeMemory   write=63ns    read=66ns     total=129ns
3 UnsafeMemory   write=46ns    read=63ns     total=109ns
4 UnsafeMemory   write=48ns    read=58ns     total=106ns

2.4GHz Sandy Bridge - Java 1.7.0_04
===================================
0 Serialisation  write=1,940ns read=9,006ns total=10,946ns
1 Serialisation  write=1,674ns read=8,567ns total=10,241ns
2 Serialisation  write=1,666ns read=8,680ns total=10,346ns
3 Serialisation  write=1,666ns read=8,623ns total=10,289ns
4 Serialisation  write=1,715ns read=8,586ns total=10,301ns
0 ByteBuffer     write=199ns   read=198ns   total=397ns
1 ByteBuffer     write=176ns   read=178ns   total=354ns
2 ByteBuffer     write=174ns   read=174ns   total=348ns
3 ByteBuffer     write=172ns   read=183ns   total=355ns
4 ByteBuffer     write=174ns   read=180ns   total=354ns
0 UnsafeMemory   write=38ns    read=75ns    total=113ns
1 UnsafeMemory   write=26ns    read=52ns    total=78ns
2 UnsafeMemory   write=26ns    read=51ns    total=77ns
3 UnsafeMemory   write=25ns    read=51ns    total=76ns
4 UnsafeMemory   write=27ns    read=50ns    total=77ns

Analysis

To write and read back a single relatively small object on my fast 2.4 GHz Sandy Bridge laptop can take ~10,000ns using Java Serialization, whereas when using Unsafe this can come down to well less than 100ns even accounting for the test code itself.  To put this in context, when using Java Serialization the costs are on par with a network hop!  Now that would be very costly if your transport is a fast IPC mechanism on the same system.

There are numerous reasons why Java Serialisation is so costly.  For example it writes out the fully qualified class and field names for each object plus version information.  Also ObjectOutputStream keeps a collection of all written objects so they can be conflated when close() is called.   Java Serialisation requires 340 bytes for this example object, yet we only require 185 bytes for the binary versions.  Details for the Java Serialization format can be found here.  If I had not used arrays for the majority of data, then the serialised object would have been significantly larger with Java Serialization because of the field names.  In my experience text based protocols like XML and JSON can be even less efficient than Java Serialization.  Also be aware that Java Serialization is the standard mechanism employed for RMI.

The real issue is the number of instructions to be executed.  The Unsafe method wins by a significant margin because in Hotspot, and many other JVMs, the optimiser treats these operations as intrinsics and replaces the call with assembly instructions to perform the memory manipulation.  For primitive types this results in a single x86 MOV instruction which can often happen in a single cycle.  The details can be seen by having Hotspot output the optimised code as I described in a previous article.

Now it has to be said that "with great power comes great responsibility" and if you use Unsafe it is effectively the same as programming in C, and with that can come memory access violations when you get offsets wrong.

Adding Some Context

"What about the likes of Google Protocol Buffers?", I hear you cry out.  These are very useful libraries and can often offer better performance and more flexibility than Java Serialisation.  However they are not remotely close to the performance of using Unsafe like I have shown here.  Protocol Buffers solve a different problem and provide nice self-describing messages which work well across languages.  Please test with different protocols and serialisation techniques to compare results.

Also the astute among you will be asking, "What about Endianness (byte-ordering) of the integers written?"  With Unsafe the bytes are written in native order.  This is great for IPC and between systems of the same type.  When systems use differing formats then conversion will be necessary.

How do we deal with multiple versions of a class or determining what class an object belongs to?  I want to keep this article focused but let's say a simple integer to indicate the implementation class is all that is required for a header.  This integer can be used to look up the appropriately implementation for the de-serialisation operation.

An argument I often hear against binary protocols, and for text protocols, is what about being human readable and debugging?  There is an easy solution to this.  Develop a tool for reading the binary format!

Conclusion

In conclusion it is possible to achieve the same native C/C++ like levels of performance in Java for serialising an object to-and-from a byte stream by effectively using the same techniques.  The UnsafeMemory class, for which I've provided a skeleton implementation, could easily be expanded to encapsulate this behaviour and thus protect oneself from many of the potential issues when dealing with such a sharp tool.

Now for the burning question.  Would it not be so much better if Java offered an alternative Marshallable interface to Serializable by offering natively what I've effectively done with Unsafe???