Adding data to a hashmap from on apache-spark RDD operation (Java) -
i've used map step create javardd object containing objects need. based on objects want create global hashmap containing stats, can't figure out rdd operation use. @ first thought reduce solution, saw have return same type of objects. i'm not interested in reducing items, in gathering stats machines (they can computed separately , added up_.
for example: have rdd of objects containing integer array among other stuff , want compute how many times each of integers has appeared in array putting them hashtable. each machine should compute it's own hashtable , put them in 1 place in driver.
often when think want end map, you'd need transform records in rdd key-value pairs, , use reducebykey
.
your specific example sounds famous wordcount example (see first example here), want count integers array within object, instead of counting words sentence (string). in scala, translate to:
import org.apache.spark.rdd.rdd import scala.collection.map class example { case class myobj(ints: array[int], otherstuff: string) def countints(input: rdd[myobj]): map[int, int] = { input .flatmap(_.ints) // flatmap maps each record several records - in case, each int becomes record .map(i => (i, 1)) // turn key-value map, preliminary value 1 each key .reducebykey(_ + _) // aggregate values key .collectasmap() // collects data map } }
generally, should let spark perform of operation possible in distributed manner, , delay collection memory as possible - if collect values before reducing, you'll run out of memory, unless dataset small enough begin (in case, don't need spark).
edit: , here's same code in java (much longer, identical...):
static class myobj implements serializable { integer[] ints; string otherstuff; } map<integer, integer> countints(javardd<myobj> input) { return input .flatmap(new flatmapfunction<myobj, integer>() { @override public iterable<integer> call(myobj myobj) throws exception { return arrays.aslist(myobj.ints); } }) // flatmap maps each record several records - in case, each int becomes record .maptopair(new pairfunction<integer, integer, integer>() { @override public tuple2<integer, integer> call(integer integer) throws exception { return new tuple2<>(integer, 1); } }) // turn key-value map, preliminary value 1 .reducebykey(new function2<integer, integer, integer>() { @override public integer call(integer v1, integer v2) throws exception { return v1 + v2; } }) // aggregate values key .collectasmap(); // collects data map }
Comments
Post a Comment