Skip to content

Commit

Permalink
Fix the errors that we introduced in the databricks version (seems li…
Browse files Browse the repository at this point in the history
…ke maybe a merge issue)
  • Loading branch information
holdenk committed Aug 19, 2015
1 parent 4012338 commit b70c5c5
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object BasicParseJsonWithJackson {
val outputFile = args(2)
val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
val input = sc.textFile(inputFile)

// Parse it into a specific case class. We use mapPartitions beacuse:
// (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
// on the driver and have to send data back to the driver to go through the singleton object.
Expand All @@ -44,13 +44,17 @@ object BasicParseJsonWithJackson {
// list with one element if everything is ok (Some(_)).
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[ioRecord]))
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}
})
}, true)
result.filter(_.lovesPandas).map(mapper.writeValueAsString(_))
}, true)
result.filter(_.lovesPandas).mapPartitions(records => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
records.map(mapper.writeValueAsString(_))
})
.saveAsTextFile(outputFile)
}
}

0 comments on commit b70c5c5

Please sign in to comment.