我有以下問題:假設(shè)我有一個(gè)包含壓縮目錄的目錄,該壓縮目錄包含多個(gè)文件,存儲(chǔ)在HDFS上。我想創(chuàng)建一個(gè)包含一些T類型對象的RDD,即:context = new JavaSparkContext(conf);JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> { // The name of the file String fileName = fileNameContent._1(); // The content of the file String content = fileNameContent._2(); // Class T has a constructor of taking the filename and the content of each // processed file (as two strings) T t = new T(content, fileName); return t;});現(xiàn)在,當(dāng)inputDataPath目錄包含文件時(shí),可以很好地工作,例如,當(dāng)它類似于:String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders但是,當(dāng)一個(gè)tgz包含多個(gè)文件時(shí),文件內(nèi)容(fileNameContent._2())為我提供了一些無用的二進(jìn)制字符串(相當(dāng)不錯(cuò))。我在SO上發(fā)現(xiàn)了類似的問題,但是情況不一樣,因?yàn)榻鉀Q方案是每次壓縮僅包含一個(gè)文件,而在我的情況下,還有許多其他文件需要單獨(dú)讀取為整個(gè)文件。我還發(fā)現(xiàn)了有關(guān)的問題wholeTextFiles,但這在我的情況下不起作用。任何想法如何做到這一點(diǎn)?編輯:我試圖從讀者在這里(試圖從測試的讀者在這里,就像在功能testTarballWithFolders()),但每當(dāng)我打電話TarballReader tarballReader = new TarballReader(fileName);我得到NullPointerException:java.lang.NullPointerException at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83) at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77) at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) at utils.TarballReader.<init>(TarballReader.java:61) at main.SparkMain.lambda$0(SparkMain.java:105) at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727)第105行MainSpark是我在帖子編輯中顯示的上方的行,而第61行TarballReader是GZIPInputStream gzip = new GZIPInputStream(in);in上面一行輸入流的值為空:InputStream in = this.getClass().getResourceAsStream(tarball);我在正確的道路上嗎?如果是這樣,我如何繼續(xù)?為什么我得到這個(gè)空值,我該如何解決?
2 回答

守著一只汪
TA貢獻(xiàn)1872條經(jīng)驗(yàn) 獲得超4個(gè)贊
可接受答案的一個(gè)小改進(jìn)是更改
Option(tar.getNextTarEntry)
至
Try(tar.getNextTarEntry).toOption.filter( _ != null)
以.tar.gz健壯的方式應(yīng)對格式錯(cuò)誤/截?cái)嗟摹?/p>
順便說一句,緩沖區(qū)數(shù)組的大小有什么特別之處嗎?如果接近平均文件大?。ㄔ谖业那闆r下可能是500k),平均速度會(huì)更快嗎?我猜是Stream相對于whileJava式的循環(huán)而言,我看到的是速度下降還是更可能是相對而言的開銷。
添加回答
舉報(bào)
0/150
提交
取消