scala - How to filter from RDDs and DataFrames in Spark? -


i have .tsv file pageviews_by_second consisting of timestamp site , requestsfields:

"timestamp"              "site"   "requests" "2015-03-16t00:09:55"   "mobile"    1595 "2015-03-16t00:10:39"   "mobile"    1544 "2015-03-16t00:19:39"   "desktop"   2460 

i want first row gone, because leads errors in operations have perform on data.

i tried doing in following ways:

1.filtering rdd before splitting

val rdd1 = sc.textfile("pageviews_by_second")        val top_row = rdd1.first()  //returns: top_row: string = "timestamp"    "site"  "requests"     val rdd2 = rdd1.filter(x => x!= top_row) rdd2.first()  //returns: "2015-03-16t00:09:55"    "mobile"    1595 

2.filtering rdd after splitting it

val rdd1 = sc.textfile("pageviews_by_second").map(_.split("\t") rdd1.first()  //returns res0: array[string] = array("timestamp, 'site", "requests") val top_row = rdd1.first() val rdd2 = rdd1.filter(x => x!= top_row) rdd2.first() //returns: res1: array[string] = array("timestamp", "site" ,"requests") val rdd2 = rdd1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests")  rdd2.first() //returns: res1: array[string] = array("timestamp", "site" ,"requests") 

3.converting dataframe using 'case class' , filtering it

case class wiki(timestamp: string, site: string, requests: string) val df = sc.textfile("pageviews_by_second").map(_.split("\t")).map(w => wiki(w(0), w(1), w(2))).todf() val top_row = df.first() //returns: top_row: org.apache.spark.sql.row = ["timestamp","site","requests"] df.filter(_ => _ != top_row) //returns: error: missing parameter type val df2 = df.filter(_ => _ != top_row2) 

why 1st method able filter out first row while other 2 aren't ? in method 3, why error , how can rectify ?

you first need understand data types comparing while removing top row.

comparing 2 strings yield true or false in method 1. hence filters out top row

in method 2 comparing 2 arrays.use deep method of array deeper comparison of arrays in scala

method2 val rdd1 = sc.textfile("d:\\trial.txt").map(_.split("\t")) val top_row = rdd1.first() val rdd2 = rdd1.filter(x => x.deep!= top_row.deep) rdd2.first().foreach(println(_)) 

in method 3 comparing 2 rows object of dataframe. better if convert row toseq followed toarray , use deep method filter out first row of dataframe.

//method 3     df.filter(_ => _.toseq.toarray.deep!=top_row.toseq.toarray.deep) 

revert if helps. thanks!!!


Comments

Popular posts from this blog

javascript - jQuery: Add class depending on URL in the best way -

caching - How to check if a url path exists in the service worker cache -

Redirect to a HTTPS version using .htaccess -