我正在編寫一個(gè)庫(kù),以將Apache Spark與自定義環(huán)境集成。我正在實(shí)現(xiàn)自定義流源和流編寫器。我正在開發(fā)的某些資源至少在應(yīng)用程序崩潰后是不可恢復(fù)的。如果應(yīng)用程序重新啟動(dòng),則需要重新加載所有數(shù)據(jù)。因此,我們希望避免用戶不得不顯式設(shè)置'checkpointLocation'選項(xiàng)。但是,如果未提供該選項(xiàng),則會(huì)看到以下錯(cuò)誤:org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);但是,如果我使用控制臺(tái)流輸出,則一切正常。有沒有辦法獲得相同的行為?注意:我們正在將Spark v2接口用于流讀取器/寫入器。火花日志:18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpointorg.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...); at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213) at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296) ...18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook這就是我開始流媒體作業(yè)的方式:spark.readStream().format("mysource").load() .writeStream().format("mywriter").outputMode(OutputMode.Append()).start();一切正常,相反,例如,如果我運(yùn)行:spark.readStream().format("mysource").load() .writeStream().format("console").outputMode(OutputMode.Append()).start();
添加回答
舉報(bào)
0/150
提交
取消