我試圖將數(shù)據(jù)從PostgreSQL表中的表移動到HDFS上的Hive表。為此,我想出了以下代碼: val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true") val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate() val colList = allColumns.split(",").toList val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' '))) } finalDF }數(shù)據(jù)將插入到基于以下內(nèi)容動態(tài)分區(qū)的配置單元表中 prtn_String_columns: source_system_name, period_year, period_num數(shù)據(jù)未正確分區(qū)。一個分區(qū)較小,而另一個分區(qū)較大。這里有一個偏斜問題。將數(shù)據(jù)插入Hive表時,該作業(yè)在此行失?。簊park.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")但是我知道這是由于數(shù)據(jù)偏斜問題而發(fā)生的。我試圖增加執(zhí)行程序的數(shù)量,增加執(zhí)行程序的內(nèi)存,驅(qū)動程序的內(nèi)存,試圖將其另存為csv文件,而不是將數(shù)據(jù)幀保存到Hive表中,但是不會因給出異常而影響執(zhí)行:java.lang.OutOfMemoryError: GC overhead limit exceeded代碼中有什么我需要更正的嗎?誰能讓我知道如何解決這個問題?
如何從JDBC源遷移數(shù)據(jù)時優(yōu)化分區(qū)?
慕虎7371278
2019-11-04 09:38:05