Spark Architecture In Depth

Here I am going to walk you through the architecture of apache spark. In just 6 question answer paradigm approach, you will have a clear understanding, about how this massively devised parallel processing architecture works. Moreover, for developers who find it hard to debug the code, I have explicitly added steps for the same.

How the master and worker are setup ?

  • Before setting up everything, java should be installed on all the machines.
  • Now, after you download spark binaries from official website, go to Spark HOME_DIRECTORY/sbin. Here you will find start-master.sh and hence forth by running this script, you will setup a master node successfully on your cluster.
  • Now, to setup the rest of nodes as slave, follow the step 1. Go to SPARK_HOME/sbin/dir  ./start-slave.sh and run this script to make it as slave node. spark://IPADRESS_OF_YOUR_MASTER_SYSTEM:7077

So this gives you a basic understanding of the cluster setup where you have 1 master node and rest as slave nodes.

How the calling is done in reg to client and cluster mode.(Does the client machine exist in the cluster or not) ?

Client    :- The main method runs in the client machine itself and the tasks are sent to master for getting the IP addresses of the workers.
Cluster  :- The application master which is the master node runs the main method in the master.

Diagrammatical explanation of calling in Spark ?

Basic :-

The application is run on the master and the tasks are submitted to the YARN for allocating the resources. Now, these resources are allocated to the master by making a handshake. Then, the tasks are sent to the worker nodes where they are executed and the results are sent back to the driver.

Naming :-

(Application Master   —–   Resource Manager) [Master Node]

(Node Manager) [Slave Node]

Calling Spark Application Process :-

  • Application is submitted to yarn using master -yarn. A configuration file is picked up by the client machine and loaded from spark-defaults.conf. Then, the manually passed arguments are parsed and are collaborated together in a Map collection.
  • The deployment process finally begins:-  The class org.apache.spark.deploy.yarn.Client is called. After which, the application master container is started.
  • On your screen you will find log now :- INFO for your client class stating that Application container is already up with some (Ex :- 41 Gb )RAM and around 384 Mb of overhead. INFO Client: Will allocate AM container, with 41344 MB memory including 384 MB overhead.
  • This jar runs on AM and keeps you well updated about spark. ApplicationMaster host: 10.135.2.148
  • It calls YARN for the executor IP addresses where we can send our tasks.
  • Usual default ports :-
    SPARK_MASTER_PORT=7077
    SPARK_MASTER_WEBUI_PORT=8080

 

What is data locality ?

Data locality refers to where exactly the data is, on the cluster whether it is where the JVM is running or somewhere else.

Types of locality :-

  • PROCESS_LOCAL data is in the same JVM as that of the running code. This is the best locality possible.
  • NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes NO_PREF data is accessed equally quickly from anywhere and has no locality preference.
  • RACK_LOCAL data is on the same rack of servers. Data is on a different server of the same rack ,therefore needs to be sent over the network, typically through a single switch.
  • ANY data is elsewhere on the network and not in the same rack

Practical Explanation :-

  • Here, in this UI there is Index, ID, number of attempts till it succeeded, Locality Level which specifies how the data is picked up whether it is on the same executor(PROCESS_LOCAL) or another executor of the same node(NODE_LOCAL) or the data is on another node(RACK_LOCAL) or for running any pending tasks(ANY), executor ID/Host, Launch Time, Duration, GC Time which refers to cleaning the memory space and Shuffle Read Size/ Records.
  • Here, the UI shows the executor IP addresses of the tasks of the job which are assigned to the different executors. It shows the time that it takes for completing the same and the locality level(i.e. where the data is kept).

 

 

 

 

 

 

 

 

 

Data Locality Altering Arguments :-

By altering the below parameters as per our requirement we can change the locality level of data for optimization of our processing of the data.

  •   spark.locality.wait
           3s How long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
  •   spark.locality.wait.node
spark.locality.wait Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).
  •  spark.locality.wait.process
spark.locality.wait Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.
  •   spark.locality.wait.rack
spark.locality.wait Customize the locality wait for rack locality.

 

Memory level explanation of persistence(which explains the disk spills and everything) ?

Spark provides the best possible way of LRU caching. With the persist() method we can pass a parameter to specify the way in which we need to cache the data into our RAM.

Commands :-

scala> val lines = sc.textFile(“README.txt”)

scala> lines.getStorageLevel
res0: org.apache.spark.storage.StorageLevel = StorageLevel(disk=false, memory=false, offheap=false, deserialized=false, replication=1)

Caching ways :-

  • cache()   —-> Only 1 type of memory storage level is possible
  • persist() —-> Different memory storage levels can be mentioned

Memory Arguments :-

          Storage Level Meaning
  • MEMORY_ONLY
RDD as deserialized Java objects + Some partitions are not cached and computed on fly in the RAM. This is the default level.
  • MEMORY_AND_DISK
RDD as deserialized Java objects + Store the partitions that don’t fit on the disk. The computation doesn’t happen in memory and on the fly here.
  • MEMORY_ONLY_SER
RDD as serialized Java objects(one byte array per partition). It is more CPU intensive as java objects have to be deserialized before computing.
  • MEMORY_AND_DISK_SER
Similar to MEMORY_ONLY_SER + Data spilling to the disk that don’t fit in memory.
  • DISK_ONLY
Store the RDD partitions only on disk.
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
Same as the levels above, but replicate each partition on two cluster nodes.

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations.

 

Steps for debugging using Spark UI ?

Step 1 :- Find your application in the spark UI

 

Step 2 :- Check the jobs for your application and the time it takes for each of them.

Step 3 :- In a job there can be many stages. So the time has to be minimum for all the stages. It uses Dynamic Acyclic Graphs for creating the execution cycle of the stages of the job which is very dynamic.

Step 4 :- Here you have to focus on the Garbage collection, data locality and input size data.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *