Hadoop Ecosystem

A complete overview on Apache Hadoop

Lorenzo D'Isidoro
12 min readJun 7, 2022

This article is born from my desire to share some notes taken during the Big Data Specialization course offered by University of California San Diego on Coursera (here the link). In my opinion the first course, called Introduction to Big Data, might be a bit boring for those already in the world of Big Data because it is focused on giving a high-level overview of the main concepts.
However, I found the lessons on Hadoop useful as they provide knowledge to understand the ecosystem, the interaction between components and a clear understanding of when use these tools (all in a few minutes of lessons).

So I decided to follow this part of the course more carefully to draw some interesting ideas and considerations, furthermore I would like to share with you how to set up a Hadoop cluster locally using Docker in case you want to test out Hadoop quickly and easily. This article will be a simple collection of my notes on the Hadoop ecosystem, I hope this is useful for you and convinces you to start this course.

Overview

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.
(Source: https://hadoop.apache.org/)

The Hadoop ecosystem frameworks and applications have several overarching themes and goals, but in general the main goals that this framework wants to achieve are:

  • Scalability: In order to give the possibility to store large volumes of data on commodity hardware
  • Fault Tolerance: In order to handle errors and meet requirements despite failures.
  • Support Different Data Types for any given type of data.
  • Shared Environment: Facilitate a shared environment since even modest-sized clusters can have many cores, it is important to allow multiple jobs to execute simultaneously
  • Providing Value: the ecosystem includes a wide range of open source projects backed by a large active community (e.g. HDFS, YARN, MapReduce, …).
Image 1: The Hadoop Ecosystem (Source: Introduction to Big Data)

As you can seen from the image above, the ecosystem is made up of several components:

  • HDFS, that stand for Hadoop distributed file system, is the foundation for many Big Data frameworks because it provides scalable and reliable storage. As the size of your data increases, you can add commodity hardware to HDFS to increase storage capacity so it enables scaling out of your resources.
  • Hadoop YARN provides flexible scheduling and resource management over the HDFS storage (e.g. YARN is used at Yahoo to schedule jobs across 40,000 servers).
  • MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner (Source: https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html). It is a programming model that simplifies parallel computing, instead of dealing with the complexities of synchronization and scheduling because it’s enough know and use two functions, map and reduce. This framework is inspired by the map and reduce functions used in functional programming.
  • Hive and Pig are two additional programming models on top of MapReduce to augment data modeling of MapReduce with relational algebra and data flow modeling respectively. Hive was created at Facebook to issue SQL-like queries using MapReduce on their data in HDFS and Pig was created at Yahoo to model data flow based programs using MapReduce.
  • Giraph was built for processing large-scale graphs efficiently.
  • Storm, Spark, and Flink were built for real time and in memory processing of big data on top of the YARN resource scheduler and HDFS. In-memory processing is a powerful way of running big data applications even faster, achieving 100x’s better performance for some tasks.
  • Zookeeper is used to run all of these tools requires a centralized management system for synchronization, configuration and to ensure high availability.

Hadoop Distributed File System

HDFS serves as the foundation for most tools in the Hadoop ecosystem and allows you to store and access large datasets. It guarantee fault tolerant, provides high throughput access to data application and is suitable for applications that have to manage large data sets.

HDFS achieves scalability by partitioning or splitting large files across multiple computers. This allows parallel access to very large files since the computations run in parallel on each node where the data is stored. Typical file size is gigabytes to terabytes and the default chunk size, the size of each piece of a file is 64 megabytes. But you can configure this to any size.

Fault Tolerance

If HDFS node fails it is designed for full tolerance in such case, so it replicates, or makes a copy of, file blocks on different nodes to prevent data loss. By default, HDFS maintains three copies of every block (default size 64MB).

Image 1.1: The file is spitted in blocks (by default 64MB) and the number of replicas created depends on the replication factor (by default 3).

If any of the nodes fails, the data block is accessible from the other node containing the same copy of data. Hence there is no data loss due to replicas stored on different machines.

NameNode and DataNode

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode (Source www.hadoop.apache.org/docs).

Image 1.2: Source www.hadoop.apache.org/docs

Basically NameNode issues messages to DataNodes across the cluster and it is responsible for metadata. The DataNodes provide block storage, so the NameNode is the administrator or the coordinator of the HDFS cluster.

  • When the file is created, the NameNode records the name, location in the directory hierarchy and other metadata. Furthermore it decides which data nodes to store the contents of the file and remembers this mapping.
  • The DataNode runs on each node in the cluster and is responsible for storing the file blocks, it listens the commands from the NameNode for block creation, deletion, and replication. Replication provides two key capabilities. Fault tolerance and data locality (Nodes on the system which are in different geographical locations a location may mean a specific rack or a data center in a different town. The location is important since we want to move computation to data and not the other way around).

Data Types

HDFS provides a set of formats for common data types. But this is extensible and you can provide custom formats for your data types. For example:

  • Text files can be read: line by line or a word at a time
  • Geospatial data can be read as vectors or raters
  • FASTA, or FASTQ formats for sequence data genomics

YARN: Resource Manager For Hadoop

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons.
The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job or a DAG of jobs. The ResourceManager and the NodeManager form the data-computation framework.
The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (CPU, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
(Source www.hadoop.apache.org/docs)

Image 2: YARN Architecture (Source www.hadoop.apache.org/docs)

YARN interacts with applications and schedules resources for their use, it enables running multiple applications over HDFS increases resource efficiency and let’s you go beyond the map reduce or even beyond the data parallel programming model.
The original Hadoop stack had no resource manager, so one of the biggest limitation was it’s inability to support non-mapreduce applications.

Adding YARN in between HDFS and the applications enabled new systems to be built, focusing on different types of big data applications such as Giraph for graph data analysis, Storm for streaming data analysis, and Spark for in-memory analysis.

Based on the Image 2:

  • The ResourceManager controls all the resources, and decides who gets what
  • NodeManager operates at machine level and is in charge of a single machine.
  • Each application gets an ApplicationMaster: it negotiates resource from the Resource Manager and it talks to Node Manager to get its tasks completed.
  • The container is an abstract Notions that signifies a resource that is a collection of CPU memory disk network and other resources within the compute note to simplify and be less precise you can think of a container and the Machine.

Map Reduce

MapReduce is a scalable programming model that simplifies distributed processing of data. Map-Reduce consists of three main steps: Mapping, Shuffling and Reducing. An easy way to think about a MapReduce job is to compare it with act of ‘delegating’ a large task to a group of people, and then combining the result of each person’s effort, to produce the final outcome.
(Source: http://words.sdsc.edu/words-data-science/mapreduce)

MapReduce is a programming model for the Hadoop ecosystem. It relies on YARN to schedule and execute parallel processing over the distributed file blocks in HDFS. There are several tools that use the MapReduce model to provide a higher level interface to other programming models. Hive has a SQL-like interface that adds capabilities that help with relational data modeling. And Pig is a high level data flow language that adds capabilities that help with process map modeling.

The MapReduce programming model greatly simplifies running code in parallel since you don’t have to deal with any parallel processes issues (e.g. run on how many nodes). Instead, you only need to create and map and reduce tasks, and you don’t have to worry about multiple threads, synchronization, or concurrency issues.

Map and reduce are two concepts based on functional programming where the output the function is based solely on the input. For map, the operation is applied on each data element. And in reduce, the operation summarizes elements in some manner.

Example: Let’s examine WordCount program on several files, but for simplification we are assuming we have one big file as an input

  • Before WordCount runs, the input file is stored in HDFS. As you know now, HDFS partitions the blocks across multiple nodes in the cluster.
  • In this case, four partitions labeled, A, B, C, and D. The first step in MapReduce is to run a map operation on each node.
  • As the input partitions are read from HDFS, map is called for each line in the input:
  • Map creates a key value for each word on the line containing the word as the key, and 1 as the value. In this example, the word apple is read from the line in partition A:
  • All the key-values that were output from map are sorted based on their key. And the key values, with the same word, are moved, or shuffled, to the same node.
  • The reduce operation executes on these nodes to add values for key-value pairs with the same keys. For example, (apple, 1), and another (apple, 1), becomes (apple, 2). The result of reduce is a single key pair for each word that was read in the input file. The key is the word, and the value is the number of occurrences.

So just to recap, the map parallelization is over the input, as each partition gets processed one line at a time. To achieve this type of data parallelism we must decide on the data granularity of each parallel competition. In this case, it will be a line.
After that there will be parallel grouping of data in the shuffle and sort phase. This time, the parallelization is over the intermediate data.
After the grouping of the intermediate data the reduce step gets parallelized to construct one output file.

When not to use Hadoop technologies

There are certain kinds of tasks that you would not want to use MapReduce or Hadoop for:

  • Frequently changing data: if your data is frequently changing, MapReduce is slow since it reads the entire input data set each time
  • Dependent tasks: MapReduce greatly simplifies your job as a designer, since you do not have to deal with synchronization issues. However, it means that computations that do have dependencies, cannot be expressed with MapReduce.
  • Interactive analysis: MapReduce does not return any results until the entire process is finished. It must read the entire input data set. This makes it unsuitable for interactive applications where the results must be presented to the user very quickly, expecting a return from the user.
  • Real Time Analytics: Hadoop works on batch processing so the response time is high. There is a way in which they can use the HDFS and make the processing real time, basically we can store the Big Data in HDFS and use Spark over it.
  • Small dataset to process: Hadoop is good for data parallelism. As you know, data parallelism is the simultaneous execution of the same function on multiple nodes across the elements of a dataset.

How to set up a Hadoop cluster locally using Docker

To setup the Hadoop ecosystem locally we will generate the image using the GitHub project big-data-europe/docker-hadoop.
The prerequisites for moving forward with this guide include installing Docker, docker-compose, and Git on your machine.

First of all, clone the project

git clone git@github.com:big-data-europe/docker-hadoop.git

As written in the README.md, to deploy an example HDFS cluster, run:

docker-compose up -d

Now docker-compose use docker-compose.yml file to pulls needed images and open required ports. Using the following command can be found the list of running containers:

docker ps

So will be available local Hadoop cluster with:

  • 3 slaves, aka DataNodes
  • 1 HDFS NameNode
  • 1 YARN ResourceManager
  • 1 HistoryServer
  • 1 NodeManager

Copy data on Hadoop cluster

You can find the current status of whole system from the NameNode on http://localhost:9870. Now it’s possible to copy data into the Hadoop Distributed File System, so enter into the NameNode container running this command:

docker exec -it namenode bash

Now let’s create a folder with two files inside to be used with WorldCount, you will remember this application from the example above in the MapReduce paragraph.

$ mkdir files
$ cd files
$ echo "My apple is red and my rose is blue..." > file1.txt
$ echo "You are the apple of my eyes..." > file2.txt

Create the input directory on HDFS running the following command:

hadoop fs -mkdir -p hadoop_files

Put the input files to all the DataNodes on HDFS and see the content running the following commands:

$ hadoop dfs –copyFromLocal ./files /hadoop_files
$ hdfs dfs -ls /hadoop_files

Run WorldCount on Hadoop

WorldCount can be downloaded from Maven repository at this link, after download you have to copy the WordCount application in the NameNode container:

$ docker cp \
/hadoop-mapreduce-examples-3.3.3-sources.jar \
<NameNode_ContainerID>:hadoop-mapreduce-examples-3.3.3-sources.jar

And run the application:

$ hadoop jar hadoop-mapreduce-examples-3.3.3-sources.jar \
org.apache.hadoop.examples.WordCount \
hadoop_files \
wordcount_output

and than see the output in the folder wordcount_output, use HDFS cat which reads the file in HDFS and displays the content of the file in console:

hdfs dfs -cat wordcount_output/part-r-00000

--

--