6 Key considerations in processing large files in Java

Q1. What are the key considerations in processing large files?
A1. Before jumping into coding, get the requirements.

#1 Trade-offs among CPU, Memory Usage & Disk I/O

Processing a file involves reading from the disk, processing (e.g. parsing an XML and transforming), and writing back to the disk. It is also a trade off in terms of what is more important to you like having better I/O, better CPU usage, and better memory usage. It is important to conduct profiling to monitor CPU usage, memory usage, and I/O efficiency.

1) Reading the data from the disk can be I/O-heavy.
2) Storing the read data in the Java heap memory to process them can be memory-heavy.
3) Parsing (e.g. XML/JSON to Java POJOs) & transforming (E.g. XML to AVRO) the data can be CPU & Memory heavy.
4) Writing the processed data back to the disk can be I/O-heavy.
5) Serialization of structured data is a key process to transmit information over networks or to store data. Serialization & deserialization are CPU-heavy.

CPU bound & I/O bound are two opposites. CPU bound means the program is bottle-necked by CPU. I/O (i.e. Input/Output) bound means the program is bottle-necked by reading from or writing to a disk or network.

#2 File Types considerations

File types to process. Only ASCII, only binary, or both ASCII and binary.

If you need to handle splittable ASCII files like comma delimited or tab delimited text files, you could write a simple bash script that divide files into smaller pieces (i.e chunks) and read them as usual.

But if you have a requirement to handle binary formats like PDFs, then splitting the files approach won’t work. If it is an XML file, then favor streaming using a parser like StAX (Streaming API for XML). StAX can be used for reading & writing with good CPU & memory efficiency.

When working with Big Data, the use of XML File and JSON File formats is a common mistake as they are not splittable. Big Data supports container file formats like Sequence Files, AVRO, Parquet, ORC, etc.

Files stored in ORC, Parquet, and Avro formats can be split across multiple disks, which lend themselves to scalability and parallel processing. You cannot split JSON and XML files, and that limits their scalability and parallelism.

All three formats carry the data schema in the files themselves, which is to say they’re self-described. You can take an ORC, Parquet, or Avro file from one cluster and load it on a completely different machine, and the machine will know what the data is and be able to process it.

Choosing an appropriate file format can have some significant benefits like:

1) Faster read times.
2) Faster write times.
3) Splittable files.
4) Schema evolution support.
5) Advanced compression support.

Hadoop file formats and how to choose. These file formats are splittable & compressible to take advantage of the distributed computing. You can give each split chunk to an executor to process.

#3 Processing in chunks

In order to better utilize I/O, CPU, and memory, you will have to read, parse, and write in chunks. You need to process regions of data incrementally using memory mapped files. The good thing about the memory mapped files is that they do not consume virtual memory or paging space since it is backed by file data on disk. But, you can get OutOfMemory errors for very large files. For example, spring-batch framework allows you to read, process, and write data in chunks. If your processing requires talking to many systems via different protocols like ftp, http, etc then make use of the spring integration with spring-batch.

Apache Spark use RDDs (i.e. Resilient Distributed Datasets). RDDs are split into partitions to be processed and written in parallel. These partitions are logical chunks of data comprised of records. Inside a partition, data is processed sequentially. You can control the number of partitions of a RDD using repartition or coalesce transformations.

#4 Parallel processing

A message queue/topic allows a bunch of subscribers to pull messages and process them in parallel. You can scale out with multiple consumers. For example, with Apache Kafka you can publish messages/events to topics, and they get persisted. They don’t get removed when consumers receive them. This allows you to replay messages, but more importantly, it allows a multitude of consumers to process logic based on the same messages/events. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. A message in kafka is a key-value pair with a small amount of associated metadata.

Parallel processing technologies like MapReduce & Apache Spark can read a file into RDDs (i.e. Resilient Distributed Datasets) and then to be processed and written in parallel across multiple distributed worker machines (E.g. 100s to 1000s) in a cluster encompassing the share-nothing-architecture, which means dedicated CPU, memory & I/O on each machine for processing its own partition (i.e. RDD).

You can also introduce multi-threading with a pool of finite number of threads to improve CPU and I/O efficiency at the cost of memory. Measure the performance of a single-threaded job to see if it meets your need before adding complexities with multi-threading. Each thread can process a separate chunk.

#5 Serialization

Verbose serialization formats such as XML or JSON can lead to poor performance. Protocol Buffers (aka Protobuf), Apache Thrift, Avro, and Fast Buffers are more efficient data formats that generate serialization and deserialization code from a data structure definition. Developers should define the data structures using an Interface Definition Language (IDL) in a file and a tool parses this file to generate the serialization and deserialization code. The protobuf has the following advantages:

1) Numbered fields in proto definitions remove the need for ugly version checks, and are backward compatible.
2) encoding and decoding is faster than JSON.
3) Less boiler plate code than JSON, has a good schema to define types like int32, etc and the required, optional, and repeated keywords in Protocol Buffers definitions are extremely powerful.

#6 Multi passes

When transforming some files, it may need to run multi-passes over the file contents as there could be relationships across the records within the same file. For example, a JSON file containing thousands of “employee” records may have manager and sub-ordinates (i.e. a parent/child) relationships. So,

Pass 1: You need to read all the records and create a list of POJOs or persist into a staging area.

Pass 2: Loop through the POJOs or read staging data to enrich with additional information. For example, grouping with group ids, evaluating parent/child relationship with parent id, etc.

When there are relationships across different files,the order in which the files are processed is important. This may require persisting the key employee information to a SQL or NoSQL database in the first pass for all the files, and then computing the group ids, hierarchy levels, etc in the second pass by reading from the database. Finally the enriched data can be written to an Avro or Parquet file.

Processing large files with examples

Example 1:

Producer/Consumer paradigm is used via a BlockingQueue. The main thread spawns 1 producer thread and x number of consumer threads in a fixed thread pool. The producer is responsible for reading a line at a time from the file, and insert in to a “BlockingQueue”. The Consumers are responsible for removing the inserted line from the “BlockingQueue” and processing them.

This is covered in detail – Processing large files efficiently in Java – multi-threaded code.

Example 2:

Spring-batch allows you to write multi-threaded-steps. Spring also supports “remote partitioning” and “remote chunking“.

Remote Partitioning is a master/slave step configuration that allows for partitions of data to be processed in parallel. For example, if you were processing a database table, partition 1 may be account ids 0-100, partition 2 being account ids 101-200, etc.

Remote chunking is also a master/slave configuration, but the data is read by the master and sent over the wire to the slave for processing.

Example 3:

Apache Spark can read files from either a Unix file system or an HDFS (i.e. Hadoop Distrubuted File System) and convert the contents to RDDs (i.e. Resilient Distributed Datasets), which are like chunks and pass each chunk to an executor to process in parallel. Executors can reside in separate machines in a distributed architecture.

Q2. What are the different ways to read a small data file in Java?
A2. There are many ways. Here are some examples and timings using a 5MB file, 250MB file, and a 1GB file.

1. Read a 5MB file line by line with a scanner class

Total elapsed time: 271 ms. Light on memory usage, but heavy on I/O.

2. Reading a 5MB file with Java NIO using memory mapped files

Total elapsed time: 43 ms. Efficient on I/O. More work is required to process the buffer.

3. Reading a 5MB file line by line with Java 8 Stream

Total elapsed time: 160 ms.

4. Reading a 5MB file with Java 7 Files & Paths classes

Total elapsed time: 150 ms.

Reading a 250.0MB file

1. Scanner approach: Total elapsed time: 7062 ms

2. Maped Byte Buffer: Total elpased time: 1220 ms

3. Java 8 Stream: Total elapsed time: 1024 ms

4. Java 7 Files: Total elapsed time: 3400 ms

Reading a 1.0GB file

1. Sanner approach: Total elapsed time: 15627 ms

2. Maped Byte Buffer: Exception in thread “main” java.lang.OutOfMemoryError: Java heap space

3. Java 8 Stream: Total elapsed time: 3124 ms

4. Java 7 Files: Total elapsed time: 13657 ms

The Approach #2 OutOfMemoryError was due to loading the whole file into memory with ” buffer.load();”. This can be fixed. Let’s revise the code with “buffer.get()“.

Total elapsed time: 460 ms.

Conclusion – MappedByteBuffer wins for file sizes up to 1 GB

Java nio MappedByteBuffer performs the best for large files, and followed by Java 8. Monitor your application to see if it is more I/O bound, memory bound, or CPU bound.

When reading a file larger than 1.0 GB into memory

You can get “OutOfMemoryError“s. You need to stream your reading & writing to prevent “OutOfMemoryError”s. For example, using

#1. Apache commons IO library’s LineIterator

To read from “System.in”

#2. java.util.Scanner streaming

#3. Java 8 onward has a reader.lines()

#4. JAXB with StAX for streaming large XMLs

1) JAXB with StAX Tutorial step by step for unmarshalling.

2) JAXB with StAX Tutorial step by step for marshalling

Q3. What are the different data sizes, and what technologies can be used to process them?
A3. In general, data sizes can be classified as shown below.

1) Small size data is < 10 GB in multiple files. It fits in a single machine’s memory when you process them by streaming to conserve memory. Java’s file processing APIs, Apache commons File APIs, Spring batch framework or Java EE 7 batch processing framework can be used.

2) Medium size data is 10 GB to 1 TB in multiple files. Fits in a single machine’s disk space. Process them by splitting or streaming as you won’t be able read all the contents into memory. Spring batch framework or Java EE 7 batch processing framework can be used.

3) Big data is > 1 TB in multiple files. Stored across multiple machines and processed in distributed fashion. E.g. run a map reduce or a Spark job.

Processing medium size data? Look at Spring-batch. Spring batch industrial strength tutorial – part1

Processing big data? Look at Apache Spark – parallel processing. Apache Spark interview questions & answers | Apache Spark Tutorials

Reading a file ~2GB into memory and its limitations

String & Array limitations: Java String array limitations and OutOfMemoryError.

What is next?

In the next post will introduce file parsing and multi-threading to improve efficiency.

Why Big Data?

1. Why low latency & Big Data skills are more sought-after?

2. 150+ Big Data using Hadoop/Spark/Kafka Interview Q&As with diagrams, examples & code


Java FAQs to Fast-track & Go places

Java Interview Q&As

Top