scala - Spark Streaming MQTT -


i've been using spark stream data kafka , it's pretty easy.

i thought using mqtt utils easy, not reason.

i'm trying execute following piece of code.

  val sparkconf = new sparkconf(true).setappname("amqstream").setmaster("local")   val ssc = new streamingcontext(sparkconf, seconds(10))    val actorsystem = actorsystem()   implicit val kafkaproduceractor = actorsystem.actorof(props[kafkaproduceractor])    mqttutils.createstream(ssc, "tcp://localhost:1883", "akkatest")     .foreachrdd { rdd =>       println("got rdd: " + rdd.tostring())       rdd.foreach { msg =>         println("got msg: " + msg)       }     }    ssc.start()   ssc.awaittermination() 

the weird thing spark logs msg sent in console, not println.

it logs this:

19:38:18.803 [recurringtimer - blockgenerator] debug o.a.s.s.receiver.blockgenerator - last element in input-0-1435790298600 message

foreach distributed action, println may executing on workers. if want see of messages printed out locally, use built in print function on dstream or instead of foreachrdd collect (or take) of elements driver , print them there. hope helps , best of luck spark streaming :)


Comments

Popular posts from this blog

OpenCV OpenCL: Convert Mat to Bitmap in JNI Layer for Android -

android - org.xmlpull.v1.XmlPullParserException: expected: START_TAG {http://schemas.xmlsoap.org/soap/envelope/}Envelope -

python - How to remove the Xframe Options header in django? -