How is Chronicle Queue being used for Big Data solutions in Java, and how does it work under the covers?
Chronicle Queue is a persisted journal of messages which supports concurrent writers and readers even across multiple JVMs on the same machine. Every reader sees every message, and a reader can join at any time and still see every message. We assume that you can read/scan through messages fast enough that, even if you are not interested in most messages, it will still be fast enough. These are not consumers as such, and messages do not disappear after reading them.
Retaining every message has a number of advantages:
-
a message can be replayed as many times as is needed.
-
a day of production messages can be replayed in a testing environment months later.
-
reduces the requirement for logging almost entirely, speeding up your application.
-
greater transparency leads to optimisations that you might have missed without a detailed record of every event that you process.
One of our clients, first implemented their trading system without Chronicle Queue and achieved an average latency of 35 micro-seconds tick to trade. However, after switching to Chronicle Queue, the latency was reduced to 23 micro-seconds.
Chronicle Queue is designed to comfortably support hundreds of thousands of messages per second. It can also handle multi-second bursts into the millions of messages per second. One of our clients reported they are handling bursts of 24 million messages per second with a cluster of 6 servers.
Chronicle Queue is an unbounded queue without flow control between the producer and the consumer. Replication supports flow control, but this is not used by default. A lack of flow control is known as a "producer-centric" solution.
To put that in context, most messaging solutions have flow control; this is a "consumer-centric" solution. Consumer-centric solutions make sense in many applications, especially between servers and client GUIs. Any data that you send to a client GUI is for display to a human. Desktop applications can be run on a variety of machines, over a variety of networks, and a human can only see a limited rate of data. Updates of more than about 20 times a second just appear as a blur, and are counter productive.
It makes sense for a desktop application to receive only as much data as it can handle, and to push back on the server when it cannot handle the data rate.
Reactive Streams are an excellent way to model such client centric solutions.
Chronicle Queue deliberately does not use flow control, as flow control is not always possible, or even desirable.
Some examples of systems where Chronicle Queue is often used are:
-
Market data gateways. You cannot slow down an exchange because you are not keeping up.
-
Compliance systems. You have to feed information to them, but you never want to be slowed down by them.
-
Core trading systems. As these feed from Market Data gateways, you want them to be as fast as possible, all the time.
In a Producer-centric solution, the producer is not slowed down by a slow consumer, and the consumer is never more than main memory behind the producer. The consumer might be an overnight batch job, and be a whole day, or week behind.
The producer-centric solution has a number of advantages:
-
You can reproduce a bug, even if it only occurs once in a million messages, by replaying all the messages which led to that bug triggering. More importantly, you can have confidence that the bug, rather than just a bug has been fixed.
-
You can test every micro-service independently, as there is no flow control interaction between them. If you have flow control, say 20 services, any one of those services could slow down any producer, until your entire system locks up. This means the only real performance test is a complete system test.
-
You can test a micro-service, replaying from the same input file repeatedly, without the producer or down stream consumers running.
-
You can restart and upgrade a service, by replaying its outputs to ensure it is committed to the decisions/outcomes it has produced in the past. This allows you to change your service and know that, even those your new version might have made different decision, it can honour those already made.
-
Flow control is designed to smooth out burst and jitter in the system; but it can also hide such jitter as result.
The main disadvantage is that this assumes disk space is cheap. If you look at retail prices for enterprise grade disks, you can get TBs of disk for a few hundred dollars. Many investment banks do not take this approach, and internal charge rates for managed storage can be orders of magnitude higher. I have worked on systems that have more free memory than free disk space.
What might be surprising is that Chronicle Queue is written entirely in pure Java
.
It can outperform many data storage solutions written in C
.
You might be wondering, how is this possible given that well written C
is usually faster than Java.
You need a degree of protection between your application and your data storage to minimise the risk of corruption.
As Java
uses a JVM
, it already has an abstraction layer, and a degree of protection.If an application throws an exception, this does not mean the data structure is corrupted.
To get a degree of isolation in C
, many data storage solutions use TCP
.
The overhead of using TCP
, even over loopback, can exceed the benefit of using C
, and the throughput/latencies of Chronicle Queue can be 10x
greater by being able to use the data in-process.
Chronicle Queue supports sharing of the data structure in memory for multiple JVMs, avoiding the need to use TCP to share data.
One of our key objectives is transparency.We have worked hard to make sure you can see exactly what has been written to this data structure.
We use YAML
as this is a format which is designed to be readable.
JSON
and XML
, we see, as a subset of Javascript
and SGML
, and not as readable.We support JSON
for messaging with browsers.
For performance reasons we use a binary form of YAML
, which can be automatically translated to YAML
for viewing.
import net.openhft.chronicle.wire.SelfDescribingMarshallable;
public class Order extends SelfDescribingMarshallable {
String symbol;
Side side;
double limitPrice, quantity;
public Order(String symbol, Side side, double limitPrice, double quantity) {
this.symbol = symbol;
this.side = side;
this.limitPrice = limitPrice;
this.quantity = quantity;
}
}
SelfDescribingMarshallable
provides efficient, generic toString
, equals
, and hashCode
, as well as readMarshallable
and writeMarshallable
for serialisation.
If you cannot extend this class, you can implement the Marshallable
interface.
File dir = new File(OS.getTarget() + "/deleteme-" + Time.uniqueId());
try (ChronicleQueue queue = ChronicleQueue.singleBuilder(dir).build()) {
ExcerptAppender appender = queue.acquireAppender();
appender.writeDocument(new Order("Symbol", Side.Buy, 1.2345, 1e6)); // (1)
appender.writeDocument(w -> w.write("newOrder").object(new Order("Symbol2", Side.Sell, 2.999, 10e6))); // (2)
// System.out.print(queue.dump());
}
-
Written as
keys
andvalues
. -
Written as a command message with a "typed" payload.
In a real unit test we would do an assertEquals(expectedString, queue.dump());
--- !!meta-data #binary
header: !SCQStore {
wireType: !WireType BINARY,
writePosition: 413,
roll: !SCQSRoll {
length: !int 86400000,
format: yyyyMMdd,
epoch: 0
},
indexing: !SCQSIndexing {
indexCount: !short 16384,
indexSpacing: 16,
index2Index: 0,
lastIndex: 0
},
lastAcknowledgedIndexReplicated: 0
}
# position: 268
--- !!data #binary
symbol: Symbol
side: Buy
limitPrice: 1.2345
quantity: 1000000.0
# position: 329
--- !!data #binary
newOrder: !Order {
symbol: Symbol2,
side: Sell,
limitPrice: 2.999,
quantity: 10000000.0
}
...
# 83885663 bytes remaining
You will note that YAML
supports typed data
, enumerated values
, comments
, and message start
and end
markers.
Chronicle Queue is design for sequential writes and reads. It also supports random access, and updates in-place. Although you cannot change the size of an existing entry, you can pad an entry for future use.
This append-only structure is more efficient for passing data between threads using the CPU L2 cache
coherence bus, It can also be faster than attempting to pass an object between threads, as it avoids random access which can be
common in Java
objects where there can be a lot of reference chasing.
It is also more efficient for persistence to disk; HDD and SSD are much more efficient when being accessed sequentially.
The append-only structure makes replication much simpler as well.
Chronicle Queue is built on a class called MappedBytes
in Chronicle-Bytes
.
This visualises the file to act as an unbounded array of bytes mapped to a file.
As you append data, it will add memory mappings transparently. The file grows as you write more data.
The key benefit of using memory-mapped files, is that you are no longer limited by the size of your JVM
, or even the size of your main memory. You are only limited by the amount of disk space you have. If you want to load 100
TB into a JVM
for replay the operating system does all the heavy lifting for you.
Another benefit of using a memory-mapped file is the ability to bind a portion of memory to an object.
The key attributes in the header are bound when first loading, and after that they work like a normal object, updating off-heap memory and the file in a thread-safe manner.
You can perform operations like compareAndSet
, atomic add
, or set max value
(a set which only ever increases the value).
As the data access is thread-safe, it can be shared between threads, or processes, as fast as the time it takes for an L2 cache miss; up to 25
nano-seconds.
Each record is a "Size Prefixed Blob", where the first four bytes contain a 30
bit length of the message. The top two bits are used to record:
-
whether this message is user-data, or meta-data required to support the queue itself,
-
a bit to flag whether the message is complete or not.
When the message is not complete, it cannot be read. However, if the length is known, a writer can skip such messages, and attempt to write after it.
For example:
Thread1
is in the middle of writing a message, but it knows the message length; it can write 4
bytes which indicate the length.
Thread2
can see that there will be a message, and skip over it looking for a place to write.
In this way, multiple threads can be writing to the queue concurrently. Any message which is detected as bad (for example, the thread died), can be marked as meta-data and skipped by the reader.
There is a special value ("poison pill" value), which indicates the file has been rolled. This ensures that all writers and readers roll at the same point, in a timely manner.
Header example:
--- !!meta-data #binary # (1)
header: !SCQStore { # (2)
wireType: !WireType BINARY,
writePosition: 413, # (3)
roll: !SCQSRoll { # (4)
length: !int 86400000,
format: yyyyMMdd,
epoch: 0
},
indexing: !SCQSIndexing { # (5)
indexCount: !short 16384,
indexSpacing: 16,
index2Index: 0,
lastIndex: 0
},
lastAcknowledgedIndexReplicated: 0 # (6)
}
-
First message is meta-data written in binary
-
Type of header is aliased as the name
SCQStore
. -
writePosition
is the first bound value. It is the highest known byte which has been written to, and is updated atomically. -
Roll cycle is daily.
-
This class controls how it will be indexed on-demand. This adds meta-data entries for indexed lookup.
-
Highest message index which was acknowledged by a replica.
For us a key feature of Chronicle Queue is not just how the data structure is arranged, but also how transparently this binary data structure can be inspected.
Note
|
The SCQStore "bootstraps" the queue itself. If you provided another, custom implementation, the queue could behave as you wish, provided it support the same interface. The rolling and indexing strategies can also be customized.
|
If we look at the last message, you can see the message type, the type of the payload, and the value of all the fields.
--- !!data #binary
newOrder: !Order {
symbol: Symbol2,
side: Sell,
limitPrice: 2.999,
quantity: 10000000.0
}
For the most latency-sensitive systems, you may want to keep your allocation rate to below 300
KB/s.
At this rate you will produce less than 24
GB of garbage a day, and
if your Eden space
is larger than this, you can run all day without a minor collection. A GC is something that you can do as an overnight maintainence task.
Reduce your garbage-per-day to less than 5
GB, and you might be able to run all week without a GC.
We have a number of strategies to minimise garbage; the key one being that we translate directly between on-heap and native memory without intermediate temporary objects.
We use object pools where appropriate, and we support reading into mutable objects.
For text data we support both a String
pool and reading to/from StringBuilder
.