Home
Reading
Searching
Subscribe
Sponsors
Statistics
Posting
Contact
Spam
Lists
Links
About
Hosting
Filtering
Features Download
Marketing
Archives
FAQ
Blog
 
Gmane
From: Snehal Nagmote <nagmote.snehal-Re5JQEeQqe8AvxtiuMwx3w <at> public.gmane.org>
Subject: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out
Newsgroups: gmane.comp.lang.scala.spark.user
Date: Monday 8th June 2015 23:44:18 UTC (over 2 years ago)
All,

I am using Kafka Spark Consumer
https://github.com/dibbhatt/kafka-spark-consumer
 in  spark streaming job .

After spark streaming job runs for few hours , all executors exit and I
still see status of application on SPARK UI as running

Does anyone know cause of this exception and how to fix this ?


 WARN  [sparkDriver-akka.actor.default-dispatcher-17:[email protected]]
- Error reported by receiver for stream 7: Error While Store for
Partition Partition{host=dal-kafka-broker01.bfd.walmart.com:9092,
partition=27} - org.apache.spark.SparkException: Error sending message
[message = UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com,
33621),input-7-1433793457165,StorageLevel(false, true, false, false,
1),10492,0,0)]
	at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
	at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
	at
org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
	at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
	at
org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
	at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
	at
org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71)
	at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161)
	at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136)
	at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152)
	at consumer.kafka.PartitionManager.next(PartitionManager.java:215)
	at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75)
	at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out
after [30 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
	at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.result(package.scala:107)
	at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

	... 14 more WARN
[sparkDriver-akka.actor.default-dispatcher-30:[email protected]] -
Error sending message [message =
UpdateBlockInfo(BlockManagerId(, dfw-searcher.com,
57286),broadcast_10665_piece0,StorageLevel(false, false, false, false,
1),0,0,0)] in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
	at
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
	at
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
	at
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
	at scala.concurrent.Await$.result(package.scala:107)
	at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
	at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
	at
org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
	at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
	at
org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
	at
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104)
	at
org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081)
	at
org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081)
	at scala.collection.immutable.Set$Set2.foreach(Set.scala:94)
	at
org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1081)
	at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcI$sp(BlockManagerSlaveActor.scala:63)
	at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveActor.scala:63)
	at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveActor.scala:63)
	at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
	at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
	at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,

Snehal
 
CD: 3ms