scala - How to filter from RDDs and DataFrames in Spark? -
i have .tsv
file pageviews_by_second
consisting of timestamp
site
, requests
fields:
"timestamp" "site" "requests" "2015-03-16t00:09:55" "mobile" 1595 "2015-03-16t00:10:39" "mobile" 1544 "2015-03-16t00:19:39" "desktop" 2460
i want first row gone, because leads errors in operations have perform on data.
i tried doing in following ways:
1.filtering rdd before splitting
val rdd1 = sc.textfile("pageviews_by_second") val top_row = rdd1.first() //returns: top_row: string = "timestamp" "site" "requests" val rdd2 = rdd1.filter(x => x!= top_row) rdd2.first() //returns: "2015-03-16t00:09:55" "mobile" 1595
2.filtering rdd after splitting it
val rdd1 = sc.textfile("pageviews_by_second").map(_.split("\t") rdd1.first() //returns res0: array[string] = array("timestamp, 'site", "requests") val top_row = rdd1.first() val rdd2 = rdd1.filter(x => x!= top_row) rdd2.first() //returns: res1: array[string] = array("timestamp", "site" ,"requests") val rdd2 = rdd1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests") rdd2.first() //returns: res1: array[string] = array("timestamp", "site" ,"requests")
3.converting dataframe using 'case class' , filtering it
case class wiki(timestamp: string, site: string, requests: string) val df = sc.textfile("pageviews_by_second").map(_.split("\t")).map(w => wiki(w(0), w(1), w(2))).todf() val top_row = df.first() //returns: top_row: org.apache.spark.sql.row = ["timestamp","site","requests"] df.filter(_ => _ != top_row) //returns: error: missing parameter type val df2 = df.filter(_ => _ != top_row2)
why 1st method able filter out first row while other 2 aren't ? in method 3, why error , how can rectify ?
you first need understand data types comparing while removing top row.
comparing 2 strings yield true or false in method 1. hence filters out top row
in method 2 comparing 2 arrays.use deep
method of array deeper comparison of arrays in scala
method2 val rdd1 = sc.textfile("d:\\trial.txt").map(_.split("\t")) val top_row = rdd1.first() val rdd2 = rdd1.filter(x => x.deep!= top_row.deep) rdd2.first().foreach(println(_))
in method 3 comparing 2 rows object of dataframe. better if convert row toseq
followed toarray
, use deep
method filter out first row of dataframe.
//method 3 df.filter(_ => _.toseq.toarray.deep!=top_row.toseq.toarray.deep)
revert if helps. thanks!!!
Comments
Post a Comment