Adding data to a hashmap from on apache-spark RDD operation (Java) -


i've used map step create javardd object containing objects need. based on objects want create global hashmap containing stats, can't figure out rdd operation use. @ first thought reduce solution, saw have return same type of objects. i'm not interested in reducing items, in gathering stats machines (they can computed separately , added up_.

for example: have rdd of objects containing integer array among other stuff , want compute how many times each of integers has appeared in array putting them hashtable. each machine should compute it's own hashtable , put them in 1 place in driver.

often when think want end map, you'd need transform records in rdd key-value pairs, , use reducebykey.

your specific example sounds famous wordcount example (see first example here), want count integers array within object, instead of counting words sentence (string). in scala, translate to:

import org.apache.spark.rdd.rdd import scala.collection.map  class example {    case class myobj(ints: array[int], otherstuff: string)    def countints(input: rdd[myobj]): map[int, int] = {     input       .flatmap(_.ints)    // flatmap maps each record several records - in case, each int becomes record        .map(i => (i, 1))   // turn key-value map, preliminary value 1 each key       .reducebykey(_ + _) // aggregate values key       .collectasmap()     // collects data map   } } 

generally, should let spark perform of operation possible in distributed manner, , delay collection memory as possible - if collect values before reducing, you'll run out of memory, unless dataset small enough begin (in case, don't need spark).

edit: , here's same code in java (much longer, identical...):

    static class myobj implements serializable {         integer[] ints;         string otherstuff;     }      map<integer, integer> countints(javardd<myobj> input) {         return input                 .flatmap(new flatmapfunction<myobj, integer>() {                     @override                     public iterable<integer> call(myobj myobj) throws exception {                         return arrays.aslist(myobj.ints);                     }                 })    // flatmap maps each record several records - in case, each int becomes record                 .maptopair(new pairfunction<integer, integer, integer>() {                     @override                     public tuple2<integer, integer> call(integer integer) throws exception {                         return new tuple2<>(integer, 1);                     }                 })   // turn key-value map, preliminary value 1                 .reducebykey(new function2<integer, integer, integer>() {                     @override                     public integer call(integer v1, integer v2) throws exception {                         return v1 + v2;                     }                 }) // aggregate values key                 .collectasmap();     // collects data map     } 

Comments

Popular posts from this blog

javascript - jQuery: Add class depending on URL in the best way -

caching - How to check if a url path exists in the service worker cache -

Redirect to a HTTPS version using .htaccess -