scala - Spark Streaming MQTT -
i've been using spark stream data kafka , it's pretty easy.
i thought using mqtt utils easy, not reason.
i'm trying execute following piece of code.
val sparkconf = new sparkconf(true).setappname("amqstream").setmaster("local") val ssc = new streamingcontext(sparkconf, seconds(10)) val actorsystem = actorsystem() implicit val kafkaproduceractor = actorsystem.actorof(props[kafkaproduceractor]) mqttutils.createstream(ssc, "tcp://localhost:1883", "akkatest") .foreachrdd { rdd => println("got rdd: " + rdd.tostring()) rdd.foreach { msg => println("got msg: " + msg) } } ssc.start() ssc.awaittermination()
the weird thing spark logs msg sent in console, not println.
it logs this:
19:38:18.803 [recurringtimer - blockgenerator] debug o.a.s.s.receiver.blockgenerator - last element in input-0-1435790298600 message
foreach
distributed action, println may executing on workers. if want see of messages printed out locally, use built in print
function on dstream or instead of foreachrdd
collect (or take) of elements driver , print them there. hope helps , best of luck spark streaming :)
Comments
Post a Comment