3 回答

TA貢獻(xiàn)1777條經(jīng)驗 獲得超3個贊
repartition
df .repartition(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv")
coalesce
:
df .coalesce(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv")
mydata.csv/part-00000
cat
getmerge

TA貢獻(xiàn)1874條經(jīng)驗 獲得超12個贊
import org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs._def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) // the "true" setting deletes the source files once they are merged into the new output}val newData = << create your dataframe >>val outputfile = "/user/feeds/project/outputs/subject" var filename = "myinsights"var outputFileName = outputfile + "/temp_" + filename var mergedFileName = outputfile + "/merged_" + filenamevar mergeFindGlob = outputFileName newData.write .format("com.databricks.spark.csv") .option("header", "false") .mode("overwrite") .save(outputFileName) merge(mergeFindGlob, mergedFileName ) newData.unpersist()

TA貢獻(xiàn)1824條經(jīng)驗 獲得超6個贊
coalesce(1)
repartition(1)
FileUtil.copyMerge()
Coalesce()
copyMerge()
添加回答
舉報