下面是一段创建structured streaming的Dataset的代码:
val lines = spark.readStream.format("socket") .option("host", "localhost").option("port", 9999).load();
会创建一个socket类型的Source,该name2class的映射由DataSource.lookupDataSource()完成
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) ... serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList ...
应该是从当前类路径中查找所有的DataSourceRegister,并读取它的shortName(),如果是"socket"就确定了由该DataSourceRegister来创建对应的DataSource
果然,有一个TextSocketSourceProvider
class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { ... override def shortName(): String = "socket" override def createSource( sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { val host = parameters("host") val port = parameters("port").toInt new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext) } }
TextSocketSourceProvider的createSource创建一个TextSocketSource
TextSocketSource是一个Source,Source接口如下:
trait Source { def schema: StructType def getOffset: Option[Offset] def getBatch(start: Option[Offset], end: Offset): DataFrame def commit(end: Offset) : Unit = {} def stop(): Unit }
作者:中科院_白乔
链接:https://www.jianshu.com/p/6cdff973d606
點擊查看更多內(nèi)容
為 TA 點贊
評論
評論
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章
正在加載中
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦