3 回答

TA貢獻(xiàn)1811條經(jīng)驗(yàn) 獲得超4個(gè)贊
假設(shè),數(shù)據(jù)集中的每一行都是 json 字符串格式
import pyspark.sql.functions as F
def drop_null_cols(data):
import json
content = json.loads(data)
for key, value in list(content.items()):
if value is None:
del content[key]
return json.dumps(content)
drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())
df = spark.createDataFrame(
["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",
"{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",
"{\"name\":null, \"age\":31, \"city\":\"London\"}"],
"string"
).toDF("data")
df.select(
drop_null_cols_udf("data").alias("data")
).show(10,False)
如果輸入數(shù)據(jù)框有 cols 并且輸出只需要不是 null cols json
df = spark.createDataFrame(
[('Ranga', 25, 'Hyderabad'),
('John', None, 'New York'),
(None, 31, 'London'),
],
['name', 'age', 'city']
)
df.withColumn(
"data", F.to_json(F.struct([x for x in df.columns]))
).select(
drop_null_cols_udf("data").alias("data")
).show(10, False)
#df.write.format("csv").save("s3://path/to/file/) -- save to s3
結(jié)果
+-------------------------------------------------+
|data |
+-------------------------------------------------+
|{"name": "Ranga", "age": 25, "city": "Hyderabad"}|
|{"name": "John", "city": "New York"} |
|{"age": 31, "city": "London"} |
+-------------------------------------------------+

TA貢獻(xiàn)1946條經(jīng)驗(yàn) 獲得超3個(gè)贊
我將遵循以下方法(用 scala 編寫(xiě),但可以在 python 中以最小的變化實(shí)現(xiàn))-
找到數(shù)據(jù)集計(jì)數(shù)并將其命名為totalCount
val totalcount = inputDF.count()
查找count(col)所有數(shù)據(jù)框列,并將字段映射到它們的計(jì)數(shù)
這里對(duì)于輸入數(shù)據(jù)框的所有列,計(jì)算計(jì)數(shù)
請(qǐng)注意,count(anycol)返回提供的列全部非空的行數(shù)。例如 - 如果一列有 10 行值,如果說(shuō)有 5 個(gè)值,null則計(jì)數(shù)(列)變?yōu)?5
獲取第一行Map[colName, count(colName)]稱(chēng)為fieldToCount
val cols = inputDF.columns.map { inputCol =>
functions.count(col(inputCol)).as(inputCol)
}
// Returns the number of rows for which the supplied column are all non-null.
// count(null) returns 0
val row = dataset.select(cols: _*).head()
val fieldToCount = row.getValuesMap[Long]($(inputCols))
獲取要?jiǎng)h除的列
在此處使用步驟#2 中創(chuàng)建的 Map,并將計(jì)數(shù)小于 totalCount 的列標(biāo)記為要?jiǎng)h除的列
從輸入數(shù)據(jù)框中選擇所有列,并count == totalCount根據(jù)要求以任何格式將處理后的輸出數(shù)據(jù)框保存在任何地方。
請(qǐng)注意,this approach will remove all the column having at least one null value
val fieldToBool = fieldToCount.mapValues(_ < totalcount)
val processedDF = inputDF.select(fieldToBool.filterNot(_._2).map(_.1) :_*)
// save this processedDF anywhere in any format as per requirement
我相信這種方法會(huì)比您目前使用的方法表現(xiàn)更好

TA貢獻(xiàn)1805條經(jīng)驗(yàn) 獲得超9個(gè)贊
我解決了上面的問(wèn)題。我們可以簡(jiǎn)單地查詢(xún)數(shù)據(jù)框的空值。df = df.filter(df.column.isNotNull()) 從而刪除所有存在 null 的行。所以如果有 n 列,我們需要 2^n 次查詢(xún)來(lái)篩選出所有可能的組合。在我的例子中,有 10 列,所以總共有 1024 個(gè)查詢(xún),這是可以接受的,因?yàn)?sql 查詢(xún)是并行化的。
添加回答
舉報(bào)