There are a number of technologies to ingest & run analytical queries over Big Data (i.e. large volume of data). Big Data is used in Business Intelligence (i.e. BI) reporting, Data Science, Machine Learning, and Artificial Intelligence (i.e. AI). Processing a large volume of data will be intensive on disk I/O, CPU, and memory usage. Big Data processing is based on distributed computing where you will have multiple nodes across several machines to process data in parallel. Each node will have its own dedicated hard disk, CPU, memory, etc. This is know as the “Shared-nothing architecture“. A Hadoop cluster is a collection of nodes.
Here are a few technologies you can use as depicted below:
Hadoop is suitable for massively offline batch processing of structured, semi-structured & unstructured data by building a data lake whereas MPP (i.e. Massively Parallel Processing) databases like Amazon Redshift, Azure SQL Data warehouse, GCP’s BigQuery, etc are built for online analytics of large volume of highly structured data.
The data lakes can be built on HDFS (i.e. Hadoop Distributed File System) or on cloud storages like AWS S3 storage with Amazon EMR or Databricks for compute, Azure blob storage with Azure Data factory or Databricks for compute, Google Cloud storage with Dataproc for compute. Amazon EMR, Azure Data Factory, GCP Dataproc & Databricks make use of Apache Spark as the computation engine on the respective data storages. On a Hadoop eco system the storage & compute are tightly coupled, whereas on the cloud based data lakes the storage (E.g. AWS S3, Azure Blob stoarge, Google cloude stoarge, etc) & computation (e.g. Amzon EMR, Azure Data Factory, GCP Dataproc, Databricks, etc) are loosely coupled.
Learning Hadoop is a good way to understand distributed storage & compute. The ability to store & compute (i.e. process) massive amount of data in parallel is achieved with distributed computing using shared nothing architecture where you can add more nodes/computers to scale horizontally. You also have special data formats other than csv (i.e. comma separated values) that are more efficient for processing large volumes of data as discussed below.
Q1. What is Hadoop?
A1. Hadoop is an open-source software framework for storing large amounts of data and processing/querying those data on a cluster with multiple nodes of commodity hardware (i.e. low cost hardware) for the slave nodes.
Hadoop is a master-slave architecture where a few (say 1 to 7) highly available (i.e. HA) master nodes coordinating 10s to 1000s of slave nodes (aka worker or core nodes) that store & process data in parallel. Hadoop can be easily scaled out horizontally by adding more slave nodes, hence known as linearly scalable.
Adding one additional slave node gives 1/n additional computing power where n is the total number of slave nodes. Every additional node you add you get a better throughput.
Learn more about real-time, low latency, high throughput via distributed systems.
In short, Hadoop consists of
1. HDFS (Hadoop Distributed File System): HDFS allows you to store huge amounts of data in a distributed and a redundant manner. For example, a 1 GB (i.e 1024 MB) text file can be split into 16 * 128MB files and stored on 8 different slave nodes in a Hadoop cluster. Each split can be replicated 3 times for fault tolerance so that if 1 node goes down, you have backups. HDFS is good for sequential write-once-and-read-many times type access.
2. MapReduce: A computational framework. This processes large amounts of data in a distributed and parallel manner. When you do a query on the above 1 GB file for all users with age > 18, there will be say “8 map” functions running in parallel to extract users with age > 18 within its 128MB split file, and then the “reduce” function will run to combine all the individual outputs into a single final result.
3. YARN (Yet Another Resource Nagotiator): A framework for job scheduling and cluster resource management. Apache YARN has ResourceManagers, which are deployed on master nodes & NodeManagers, which are deployed on the slave nodes. The NodeManagers are coordinated by the ResourceManagers as to what tasks (E.g. MapReduce, Spark executor, etc) are run on which nodes. The NodeManagers report CPU, memory, disk, and network usage to the ResourceManagers so that the new tasks can be directed based on the resource availability. The ResourceManager makes use of Scheduler & ApplicationManager components of YARN. Even though Apache YARN is part of Hadoop, it can also be used outside Hadoop as a stand-alone resource manager.
4. Hadoop eco system, with 15+ frameworks & tools like Sqoop, Flume, Kafka, Pig, Hive, Spark, Impala, etc to ingest data into HDFS, to wrangle data (i.e. transform, enrich, aggregate, etc) within HDFS, and to query data from HDFS for business intelligence & analytics. Some tools like Pig & Hive are abstraction layers on top of MapReduce, whilst the other tools like Spark & Impala are improved architecture/design from MapReduce for much improved latencies to support near real-time (i.e. NRT) & real-time processing.
Q2. Why are organizations moving from traditional data warehouse tools to smarter data hubs based on Hadoop eco systems?
A2. Organizations are investing on enhancing their
Existing data infrastructure:
- predominantly using “structured data” stored in high-end & expensive hardwares
- predominantly processed as ETL batch jobs for ingesting data into RDBMS and data warehouse systems for data mining, analysis & reporting to make key business decisions.
- predominantly handle data volumes in gigabytes to terabytes
Smarter data infrastructure based on Hadoop where
- structured (e.g. RDBMS), unstructured (e.g, images, PDFs, docs ), & semi-structured (e.g. logs, XMLs) data can be stored in cheaper commodity machines for the slave nodes in a scalable and fault tolerant manner.
- data can be ingested via batch jobs and near real time (i.e. NRT, 200ms to 2 seconds) streaming (e.g. Flume & Kafka).
- data can be queried with low latency (i.e under 100ms) capabilities with tools like Spark & Impala.
- larger data volumes in terabytes to petabytes can be stored.
which empowers organizations to make better business decisions with smarter & bigger data with more powerful tools to ingest data, to wrangle stored data (e.g. aggregate, enrich, transform, etc), and to query the wrangled data with low-latency capabilities for reporting & business intelligence.
Q3. How does a smarter & bigger data hub architectures differ from a traditional data warehouse architectures?
Traditional Enterprise Data Warehouse Architecture
Hadoop based Data Hub Architecture
A typical Hadoop cluster will have a number of master and slave nodes. The slave nodes are also called worker or core nodes.
A typical Hadoop cluster will have a few master nodes (say 3 to 5) and 10’s to 100’s of slave (aka worker or core) nodes. HDFS is for data storage & YARN is for data compute. For example, Spark & Hive jobs can be run on YARN. So, storage & compute are co-located to reduce I/O. Name Node, Standby Name Node, Journal node, Resource Manager, Data Node, Node Manager, etc are software daemon processes that run on nodes.
Note: In Hadoop 2, the secondary name node is called a “standby node“, which forms an active-active high availability via a locking mechanism with the help of zookeeper locks.
Resource manager used to be called JobTracker, and Node Managers were used to be called Task Managers in Map Reduce version 1. A Resource Manager receives the requests for MapReduce execution from the client (E.g. from an edge node). A Resource Manager talks to the NameNode to determine the location of the data. It finds the best Node Managers to execute the tasks based on the data locality (proximity of the data) and the available slots to execute a task on a given node. It also monitors the individual Node Managers and the submits back the overall status of the job back to the client. When the Resource Manager is down, HDFS will still be functional, but the MapReduce execution cannot be started and the existing MapReduce jobs will be halted.
Mapper and Reducer tasks are executed on Data Nodes administered by the Node Managers. Node managers will be assigned Mapper and Reducer tasks to be executed by the Resource Manager, and will be in constant communication with the Resource Manager signalling the progress of the task in execution. A Node Manager failure is not considered fatal as a node manager becomes unresponsive, the Resource Manager will assign the task executed by the node manager to another node.
Client nodes (aka Edge node or Gateway node)
Client node(s) contain Hadoop binaries and configurations, but will not be running any daemon processes. They run Hadoop based applications to ingest data into the Hadoop cluster or process data in the HDFS. For example, running map reduce jobs like a Spark job via “spark-submit” command. The jobs are submitted from the client node to the Resource Manager running on the cluster. Data can also be staged on client node(s) before ingesting into HDFS. So, the client nodes need to have enough storage.
Q. Why not submit jobs to the cluster from one of the node managers?
A. Submitting jobs from the node managers themselves as opposed to from a separate client nodes can cause contention issues, especially when you try to submit say 100’s of jobs. The job submissions spawn new JVMs, and they need to compete with map reduce jobs that are scheduled or running.
Q. What services are deployed on the master nodes?
A. The following diagram depicts the horizontally scalable Hadoop cluster with master, slave, and edge nodes.
Hadoop will not function without the master nodes. So the master nodes need to be designed with the high availability & reliability in mind.
Journal Nodes receive changes to files in HDFS from the NameNode. Odd number of journal nodes must be running in a cluster.
Zookeeper coordinates the distributed components. For example, the journal nodes, name nodes, HMasters, etc. For example, detecting the error in the current active name node, and electing the stand by name node to be the new leader. Same can be said for the HMaster. This gives high availability.
100s if not 1000s of slave nodes can be added to horizontally scale with just say 3, 5 or 7 master nodes.
Q. Why odd number of master nodes are used?
A. It is because Zookeeper requires a majority to function. If you had 3 nodes in your cluster, you would need at least 2 nodes to be up for the zookeeper to function.
Learn more about Zookeeper at Apache Zookeeper interview Questions & Answers.
Q. What are journal nodes? What is a Quorum Journal Manager (i.e. QJM)
A. The NameNode stores the file & directory information of the HDFS. It stores the HDFS file to HDFS block mapping. The NameNode is a single point of failure in an HDFS cluster. If each cluster had a single NameNode, and if that host or process became unavailable, the cluster as a whole is unavailable until the NameNode is either restarted or brought up on a new host. So, we have a stand by NameNode to provide high availability (i.e. HA).
The standby NameNode keeps its state synchronized with the active NameNode by communicating with a group of separate daemons called JournalNodes. When any namespace modification is performed by the active NameNode, it durably logs a record of the modification to a majority of the JournalNodes.
Apache ZooKeeper is a highly available service for maintaining small amounts of coordination data, and HDFS relies on Zookeeper for failure detection & active NameNode election. The HDFS implements HA (i.e. High Availability) via Quorum Journal Manager (i.e. QJM).
Q. What happens in Hadoop when you copy a large file say 1 GB from a local Unix file system to HDFS mount point like /data?
A. The file copied to HDFS /data will exist as blocks of say 128MB on different DataNodes on different slave nodes in the cluster. An HDFS is nothing but a collection of DataNodes across 10s to 1000s of slave nodes.
Q. How would you add a new slave node to a Hadoop cluster?
A. It is as easy as installing or copying the Hadoop installation & main configuration files – core-site.xml, hdfs-site.xml, and yarn-site.xml to the new node and then copying the new server name to all the $HADOOP_HOME/conf/slaves file and then starting the DataNode daemon on the new node and restarting all the other existing DataNode daemons for the config change to take effect.
All nodes must be able to communicate with each other. For each node to communicate with its names, edit the /etc/hosts file to add the IP addresses of the master & slave servers.
You also need to make sure that the fire wall ports are open & passwordless ssh is set up by generating public/private keys.
The main Hadoop configuration files – core-site.xml and hdfs-site.xml is where you set all the port numbers, the data replication factor for the fault tolerance, the location of the FSImage, which keeps track of the changes to the data files, etc.
Q. How do you manage the Hadoop cluster?
A. When you have 100s to 1000s of nodes, it can be cumbersome to individually modify, copy and manage config files across all the nodes. Hadoop only has a command-line interface and an API. This is where third-party tools like Apache Ambari & Cloudera Manager comes in handy to manage nodes & configurations via a web interface.
Q4. What are the benefits of Hadoop based Data cluster?
1) Improves the overall SLAs (i.e. Service Level Agreements) as the data volume & complexity grows. E.g. “Shared Nothing” architecture, parallel processing, memory intensive processing frameworks like Spark & Impala, and resource preemption in YARN’s capacity scheduler.
2) Scaling data warehouses can be expensive. Adding additional high-end hardware capacities & licensing of data warehouse tools can cost significantly more. Hadoop based solutions can not only be cheaper with commodity hardware used for the slave nodes & open-source tools, but also can compliment the data warehouse solution by offloading data transformations to Hadoop tools like Spark & Impala for more efficient parallel processing of BigData. This will also free up the data warehouse resources.
3) Exploration of new avenues & leads. Hadoop can provide an exploratory sandbox for the data scientists to discover potentially valuable data from social media, log files, emails, etc that are not normally available in data warehouses.
4) Better flexibility. Often business requirements change, and this requires changes to schema & reports. Hadoop based solutions are not only flexible to handle evolving schemas, but also can handle semi-structured & unstructured data from disparate sources like social media, application log files, images, PDFs, and document files.
Q5. What are key steps in BigData solutions?
A5. Ingesting Data, Storing Data (i.e. Data Modelling), and processing data (i.e data wrangling, data transformations & querying data).
1) Ingesting Data:
Extracting data from various sources like
1. RDBMs Relational Database Management Systems like Oracle, MySQL, etc.
2. ERPs Enterprise Resource Planning (i.e. ERP) systems like SAP.
3. CRM Customer Relationships Management systems like Siebel, Salesforce, etc.
4. Social Media feeds & log files.
5. Flat files, docos, and images.
and storing them on data hub based on “Hadoop Distributed File System”, which is abbreviated as HDFS. Data can be ingested via batch jobs (e.g. running every 15 minutes, once every night, etc), streaming near-real-time (i.e 100ms to 2 minutes) and streaming in real-time (i.e. under 100ms).
One common term used in Hadoop is “Schema-On-Read“. This means unprocessed (aka raw) data can be loaded into HDFS with a structure applied at processing time based on the requirements of the processing application. This is different from “Schema-On-Write”, which is used in RDBMs where schema need to be defined before the data can be loaded.
2) Storing Data:
Data can be stored on HDFS or NoSQL databases like HBase. HDFS is optimized for sequential access & the usage pattern of “Write-Once & Read-Many”. HDFS has high read & write rates as it can parallelize I/O s to multiple drives. HBase sits on top of HDFS and stores data as key/value pairs in a columnar fashion. Columns are clubbed together as column families. HBase is suited for random read/write access. Before data can be stored into Hadoop, you need consider the following
1) Data Storage Formats: There are a number of file formats (e.g CSV, JSON, sequence, AVRO, Parquet, etc) & data compression algorithms (e.g snappy, LZO, gzip, bzip2, etc) that can be applied. Each has particular strengths. Compression algorithms like LZO and bzip2 are splittable.
2) Data Modelling: Despite the schema-less nature of Hadoop, schema design is an important consideration. This includes directory structures and schema of objects stored in HBase, Hive and Impala. Hadoop often serves as a data hub for the entire organization, and the data is intended to be shared. Hence, carefully structured & organized storage of your data is important.
3) Metadata management: Metadata related to stored data.
4) Multitenancy: As smarter data hubs host multiple users, groups, and applications. This often results in challenges relating to governance, standardization, and management.
3) Processing Data:
Hadoop’s processing framework uses the HDFS. It uses the “Shared Nothing” architecture, which in distributed systems each node is completely independent of other nodes in the system. There are no shared resources like CPU, memory, and disk storage that can become a bottle-neck. Hadoop’s processing frameworks like Spark, Pig, Hive, Impala, etc processes distinct subset of the data and there is no need to manage access to the shared data. “Sharing nothing” architectures are very
1. scalable as more nodes can be added without further contention.
2. fault tolerant as each node is independent, and there are no single points of failure, and the system can quickly recover from a failure of an individual node.
Q6. How would you go about choosing among the different file formats for storing and processing data?
A6. One of the key design decisions is regarding file formats based on the
1) Usage patterns like accessing 5 columns out of 50 columns vs accessing most of the columns.
2) Splittability to be processed in parallel.
3) Block compression saving storage space vs read/write/transfer performance
4) Schema evolution to add fields, modify fields, and rename fields.
1. CSV Files:
CSV files are common for exchanging data between Hadoop & external systems. CSVs are readable & parsable. CSVs are handy for bulk loading from databases to Hadoop or into an analytic database. When using CSV files in Hadoop never include header or footer lines. Each line of the file should contain records. CSV files limited support for schema evolution as new fields can only be appended to the end of a record and existing fields can never be deleted. The CSV files do not support block compression, hence compressing a CSV file comes at a significant read performance cost.
2. JSON Files:
JSON records are different from JSON files, and each line is its own JSON record, and as JSON stores both schema & data together for each record enabling full schema evolution & splittability. JSON files do not support block level compression.
3. Sequence Files:
Sequence files store data in binary format with a similar structure to CSV files. Like CSV, Sequence files do not store meta data, hence only schema evolution is appending new fields to the end of the record. Unlike CSV files, Sequence files do support block compression. Sequence files are also splittable. Sequence files can be used to solve “small files problem” by combining smaller XML files by storing the filename as the key and the file contents as the value. Due to complexity in reading sequence files, they are more suited for in-flight (i.e. intermediate) data storage.
Note: A SequenceFile is Java centric and cannot be used cross-platform.
4. Avro Files:
are suited for long term storage with schema. Avro files store meta data with data, but also allow specification of independent schema for reading the file. This enables full schema evolution support allowing you to rename, add, and delete fields and change data types of fields by defining a new independent schema. Avro file defines the schema in JSON format, and the data will be in binary JSON format. Avro files are also splittable & support block compression. More suited in usage patterns where row level access is required. This means all the columns in the row are queried. Not suited when a row has 50+ columns and the usage pattern requires only 10 or less columns to be accessed. Parquet file format is more suited for this columnar access usage pattern.
5. Columnar Formats: e.g. RCFile, ORC
RDBMs store records in a row-oriented fashion as this is efficient for cases where many columns of a record need to be fetched. Row oriented writing is also efficient if all the column values are known at the time of writing a record to the disk. But this approach would not be efficient to fetch just 10% of the columns in a row or if all the column values are not known at the time of writing. This is where columnar files make more sense. So columnar format works well
- skipping I/O and decompression on columns that are not part of the query
- for queries that only access a small subset of columns.
- for data-warehousing-type applications where users want to aggregate certain columns over a large collection of records.
RC & ORC formats are specifically written Hive, and not general purpose as Parquet.
6. Parquet Files:
Parquet file is a columnar file like RC and ORC. Parquet files support block compression and optimized for query performance as 10 or less columns can be selected from 50+ columns records. Parquet file write performance is slower than non columnar file formats. Parquet also support limited schema evolution by allowing new columns to be added at the end. Parquet can be read and written to with Avro APIs and Avro schemas.
In summary favor columnar file formats like Parquet. You can get better performance if you partition the data, compress the data, and store them in columnar formats like Apache Parquet. This will also reduce the pricing as cloud based SQL engines like Athena charges you by the amount of data scanned per query.