第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

Structured Streaming如何實(shí)現(xiàn)Parquet存儲(chǔ)目錄按時(shí)間分區(qū)

標(biāo)簽:
Spark

缘由

StreamingPro现在支持以SQL脚本的形式写Structured Streaming流式程序了:

这种方式的好处就是,删除分区直接删除就可以,坏处是,通过上面的方式,由于Structured Streaming的目录地址是不允许变化的,也就是他拿到一次值之后,后续就固定了,所以数据都会写入到服务启动的那天。

解决方案

解决办法是自己实现一个parquet sink,改造的地方并不多。新添加一个类:

class NewFileStreamSink(
                         sparkSession: SparkSession,
                         _path: String,
                         fileFormat: FileFormat,
                         partitionColumnNames: Seq[String],
                         options: Map[String, String]) extends Sink with Logging { // 使用velocity模板引擎,方便实现复杂的模板渲染
  def evaluate(value: String, context: Map[String, AnyRef]) = {    RenderEngine.render(value, context)
  }// 将路径获取改成一个方法调用,这样每次写入时,都会通过方法调用//从而获得一个新值
  def path = {
    evaluate(_path, Map("date" -> new DateTime()))
  }
-- 这些路径获取都需要变成方法  private def basePath = new Path(path)  private def logPath = new Path(basePath, FileStreamSink.metadataDir)  private def fileLog =    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)  private val hadoopConf = sparkSession.sessionState.newHadoopConf()  override def addBatch(batchId: Long, data: DataFrame): Unit = {    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
      logInfo(s"Skipping already committed batch $batchId")
    } else {      val committer = FileCommitProtocol.instantiate(
        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
        jobId = batchId.toString,
        outputPath = path,
        isAppend = false)

      committer match {        case manifestCommitter: ManifestFileCommitProtocol =>
          manifestCommitter.setupManifestOptions(fileLog, batchId)        case _ => // Do nothing
      }      FileFormatWriter.write(
        sparkSession = sparkSession,
        queryExecution = data.queryExecution,
        fileFormat = fileFormat,
        committer = committer,
        outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
        hadoopConf = hadoopConf,
        partitionColumnNames = partitionColumnNames,
        bucketSpec = None,
        refreshFunction = _ => (),
        options = options)
    }
  }  override def toString: String = s"FileSink[$path]"}

实现sink之后,我们还需要一个DataSource 以便我们能让这个新的Sink集成进Spark里并被外部使用:

package org.apache.spark.sql.execution.streaming.newfileimport org.apache.spark.sql.{AnalysisException, SQLContext}import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormatimport org.apache.spark.sql.execution.streaming. Sinkimport org.apache.spark.sql.sources.StreamSinkProviderimport org.apache.spark.sql.streaming.OutputModeclass DefaultSource extends StreamSinkProvider {  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {    val path = parameters.getOrElse("path", {      throw new IllegalArgumentException("'path' is not specified")
    })    if (outputMode != OutputMode.Append) {      throw new AnalysisException(        s"Data source ${getClass.getCanonicalName} does not support $outputMode output mode")
    }    new NewFileStreamSink(sqlContext.sparkSession, parameters("path"), new ParquetFileFormat(), partitionColumns, parameters)
  }
}

这个是标准的datasource API。 现在使用时可以这样:

save append table21  
-- 使用jodatime的语法as parquet.`/tmp/jack/hp_date=${date.toString("yyyy-MM-dd")}` 
options mode="Append"
and duration="10"-- 指定实现类and implClass="org.apache.spark.sql.execution.streaming.newfile"
and checkpointLocation="/tmp/cpl2";

是不是很方便?

额外的问题

在spark 2.2.0 之后,对meta文件合并,Spark做了些调整,如果合并过程中,发现之前的某个checkpoint点 文件会抛出异常。在spark 2.2.0则不存在这个问题。其实spark团队应该把这个作为可选项比较好,允许抛出或者保持安静。



作者:祝威廉
链接:https://www.jianshu.com/p/0fa3895dcd27


點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫(xiě)下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶(hù)
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專(zhuān)欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢(xún)優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消