Apache Spark Memo

Here, I am collecting the memo while I am learning the spark so that people like me can benefit fot this collection.

1. Spark Core

  1. The stage creation rule is based on the idea to pipeline as many narrow transformations as possible. Once stages are figured out, spark will generate tasks from stages. The first stage will create ShuffleMapTasks and the last stage will create ResultTasks because in the last stage, one action operation is included to produce results.

  2. The number of tasks to be generated depends on how your files are distributed. Suppose that you have 3 three different files in three different nodes, the first stage will generate 3 tasks : one task per partition. Therefore, you should not map your steps to tasks directly. A task belongs to a stage, and is related to a partition.

  3. At high level, there are two transformations that can be applied onto the RDDs, namely narrow transformation and wide transformation. Wide transformations basically result in stage boundaries. Narrow transformation - doesn’t require the data to be shuffled across the partitions. for example, Map, filter and etc.. Wide transformation - requires the data to be shuffled for example, reduceByKey and etc..

  4. Partition - all the data you work with in Spark is split into partitions. What a single partition is and how is it determined? Partition size completely depends on the data source you use. For most of the methods to read the data in Spark you can specify the amount of partitions you want to have in your RDD. When you read a file from HDFS, you use Hadoop’s InputFormat to make it. By default each input split returned by InputFormat is mapped to a single partition in RDD. For most of the files on HDFS single input split is generated for a single block of data stored on HDFS, which equals to approximately 64MB of 128MB of data. Approximately, because the data in HDFS is split on exact block boundaries in bytes, but when it is processed it is split on the record splits. For text file the splitting character is the newline char, for sequence file it is the block end and so on. The only exception of this rule is compressed files – if you have the whole text file compressed, then it cannot be split into records and the whole file would become a single input split and thus a single partition in Spark and you have to manually repartition it.

  5. Spark’s basic abstraction is the Resilient Distributed Dataset, or RDD. The RDD is how Spark simplifies complex operations like join or groupBy and hides the fact that under the hood, you’re dealing with fragmented data. That fragmentation is what enables Spark to execute in parallel, and the level of fragmentation is a function of the number of partitions of your RDD. The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. Furthermore, since with fewer partitions there’s more data in each partition, you increase the memory pressure on your program. On the flip side, with too many partitions, your performance may degrade as you take a greater hit from network and disk I/O. Ultimately this concept ties into Spark’s notion of parallelism and how you can tune it (see the discussion of tuning parallelism here) to optimize performance.

  6. The property spark.cleaner.ttl parameter to trigger automatic cleanups.

  7. Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows minimum the data movement, but only if you are decreasing the number of RDD partitions.

  8. In a typical Spark node,40% of the memory is used for computing while 60% is used for storing the data.

  9. cache()/persist() is also lays operation. However, unpersist() is non-lazy operation.

2. Spark SQL

  1. When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command. Unlike the registerTempTable command, saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table.

  2. By default saveAsTable will create a “managed table”, meaning that the location of the data will be controlled by the metastore. Managed tables will also have their data deleted automatically when a table is dropped. However, Spark can also create temp table from data frame using rdd.registerTempTable(“table_name”). These tables are out of control by hive.

  3. Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. Hive support is enabled by adding the -Phive and -Phive-thriftserver flags to Spark’s build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.