1 回答

TA貢獻(xiàn)1757條經(jīng)驗(yàn) 獲得超7個(gè)贊
兩個(gè)坑, 性能坑和線程坑
DStream是抽象類,它把連續(xù)的數(shù)據(jù)流拆成很多的小RDD數(shù)據(jù)塊, 這叫做“微批次”, spark的流式處理, 都是“微批次處理”。 DStream內(nèi)部實(shí)現(xiàn)上有批次處理時(shí)間間隔,滑動窗口等機(jī)制來保證每個(gè)微批次的時(shí)間間隔里, 數(shù)據(jù)流以RDD的形式發(fā)送給spark做進(jìn)一步處理。因此, 在一個(gè)為批次的處理時(shí)間間隔里, DStream只產(chǎn)生一個(gè)RDD。
可以利用dstream.foreachRDD把數(shù)據(jù)發(fā)送給外部系統(tǒng)。 但是想要正確地, 有效率的使用它, 必須理解一下背后的機(jī)制。通常向外部系統(tǒng)寫數(shù)據(jù)需要一個(gè)Connection對象(通過它與外部服務(wù)器交互)。程序員可能會想當(dāng)然地在spark上創(chuàng)建一個(gè)connection對象, 然后在spark線程里用這個(gè)對象來存RDD。比如下面的程序:
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
這個(gè)代碼會產(chǎn)生執(zhí)行錯(cuò)誤, 因?yàn)閞dd是分布式存儲的,它是一個(gè)數(shù)據(jù)結(jié)構(gòu),它是一組指向集群數(shù)據(jù)的指針, rdd.foreach會在集群里的不同機(jī)器上創(chuàng)建spark工作線程, 而connection對象則不會在集群里的各個(gè)機(jī)器之間傳遞, 所以有些spark工作線程就會產(chǎn)生connection對象沒有被初始化的執(zhí)行錯(cuò)誤。 解決的辦法可以是在spark worker里為每一個(gè)worker創(chuàng)建一個(gè)connection對象, 但是如果你這么做, 程序要為每一條record創(chuàng)建一次connection,顯然效率和性能都非常差。
另一種改進(jìn)方法是為每個(gè)spark分區(qū)創(chuàng)建一個(gè)connection對象,同時(shí)維護(hù)一個(gè)全局的靜態(tài)的連接遲對象, 這樣就可以最好的復(fù)用connection。 另外需要注意: 雖然有多個(gè)connection對象, 但在同一時(shí)間只有一個(gè)connection.send(record)執(zhí)行, 因?yàn)樵谕粋€(gè)時(shí)間里, 只有 一個(gè)微批次的RDD產(chǎn)生出來。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
有人問了個(gè)問題,為什么foreachRDD里有兩層嵌套的foreach? 為什么dstream.foreachRDD里還要再套一層rdd.foreach
可以這么理解, DStream.foreachRDD 是一個(gè)輸出操作符,它返回的不是RDD里的一行數(shù)據(jù), 而是輸出DStream后面的RDD,在一個(gè)時(shí)間間隔里, 只返回一個(gè)RDD的“微批次”, 為了訪問這個(gè)“微批次”RDD里的數(shù)據(jù), 我們還需要在RDD數(shù)據(jù)對象上做進(jìn)一步操作.。 參考下面的代碼實(shí)例, 更容易理解。
給頂一個(gè) RDD [Security, Prices]數(shù)據(jù)結(jié)構(gòu)
dstream.foreachRDD { pricesRDD => // Loop over RDD
val x= pricesRDD.count
if (x > 0) // RDD has data
{
for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD
{
var index = line._2.split(',').view(0).toInt // That is the index
var timestamp = line._2.split(',').view(1).toString // This is the timestamp from source
var security = line._2.split(',').view(12.toString // This is the name of the security
var price = line._2.split(',').view(3).toFloat // This is the price of the security
if (price.toFloat > 90.0)
{
// Do something here
// Sent notification, write to HDFS etc
}
}
}
}
- 1 回答
- 0 關(guān)注
- 1560 瀏覽
添加回答
舉報(bào)