val wordOccurrences = distFile
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
Actions
wordOccurrences.collect()
wordOccurrences.saveAsTextFile("...")
wordOccurrences.count()
wordOccurrences.take(4)
scala> wordOccurrences.toDebugString
(4) MapPartitionsRDD[16] at map at <console>:26 []
| ShuffledRDD[15] at reduceByKey at <console>:26 []
+-(4) MapPartitionsRDD[14] at map at <console>:26 []
| MapPartitionsRDD[13] at flatMap at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:23 []
val result = input.map(x => x*x)
println(result.count())
// RDD will be recomputed here
println(result.collect().mkString(","))
import org.apache.spark.storage.StorageLevel
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
// RDD won' be recomputed here
println(result.collect().mkString(","))
// Use default storage level (MEMORY_ONLY)
result.cache()
result.unpersist()
val result = input.map(x => x*x)
println(result.count())
// RDD will be recomputed here
println(result.collect().mkString(","))
import org.apache.spark.storage.StorageLevel
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
// RDD won' be recomputed here
println(result.collect().mkString(","))
// Use default storage level (MEMORY_ONLY)
result.cache()
result.unpersist()
Level | Space | CPU | RAM | Disk |
MEMORY_ONLY | High | Low | Y | N |
DISK_ONLY | Low | High | N | Y |
MEMORY_AND_DISK | High | Medium | Some | Some |
MEMORY_ONLY_SER* | Low | High | Y | N |
MEMORY_AND_DISK_SER* | Low | High | Some | Some |
(*) : Data is serialized before being persisted (takes less space) |
var counter = 0
var rdd = sc.parallelize(data)
rdd.foreach { rddItem => counter += 1 }
print("Counter value: " + counter)
val hugeArray = ...
var bigRddWithIndex = ...
bigRddWithIndex.map { rddItem => hugeArray[rddItem.key] }
...
Accumulators and Broadcast variables !
var counter = 0
var rdd = sc.parallelize(data)
rdd.foreach { rddItem => counter += 1 }
print("Counter value: " + counter)
val hugeArray = ...
var rddWithIndex = ...
rddWithIndex.map { rddItem => hugeArray[rddItem.key] }
...
Accumulators and Broadcast variables !
var counter = 0
var rdd = sc.parallelize(data)
rdd.foreach { rddItem => counter += 1 }
print("Counter value: " + counter)
val hugeArray = ...
var rddWithIndex = ...
rddWithIndex.map { rddItem => hugeArray[rddItem.key] }
...
Accumulators and Broadcast variables !
Accumulators
|
|
Accumulators aggregate values coming from the executors to the driver |
Broadcast variables
|
|
Broadcast variables propagate a read-only value to all executors |
Accumulators : |
|
Broadcast variables : |
|
|
|
|
|
|
|
Recommendations : |
|
val dataframe = hiveCtx.jsonFile(inputFile)
input.registerTempTable("foobar")
val foobarz = hiveCtx.sql("SELECT * FROM foobar ORDER BY qux LIMIT 10")
val dataframe = hiveCtx.jsonFile(inputFile)
input.registerTempTable("foobar")
val foobarz = hiveCtx.sql("SELECT * FROM foobar ORDER BY qux LIMIT 10")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.socketTextStream("localhost", 9999)
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.print()
Spark ML : |
|
GraphX : |
|
I am a Scala developer at 1Science, but also a mobile dev freelancer. francistoth@coding-hipster.com | |
Thank you ! Questions ? http://www.coding-hipster.com/presentations.html |