2 回答

TA貢獻(xiàn)2037條經(jīng)驗(yàn) 獲得超6個(gè)贊
據(jù)我所知,您正在嘗試將索引(具有連續(xù)值)添加到數(shù)據(jù)框。不幸的是,在 Spark 中沒有內(nèi)置函數(shù)可以做到這一點(diǎn)。您只能使用 df.withColumn("index", ) 添加遞增索引(但不一定具有連續(xù)值monotonicallyIncreasingId)。
盡管如此,RDD API 中有一個(gè)zipWithIndex函數(shù)可以完全滿足您的需要。因此,我們可以定義一個(gè)函數(shù),將數(shù)據(jù)幀轉(zhuǎn)換為 RDD,添加索引并將其轉(zhuǎn)換回?cái)?shù)據(jù)幀。
我不是 java 中 spark 的專家(scala 更緊湊)所以可能會(huì)做得更好。這是我會(huì)怎么做。
public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
r.toSeq().iterator().foreach(x -> list.add(x));
list.add(index);
return RowFactory.create(list);
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, null));
return df.sparkSession().createDataFrame(rdd, newSchema);
}
以下是您將如何使用它。請(qǐng)注意內(nèi)置 spark 函數(shù)的作用與我們的方法的作用形成對(duì)比。
Dataset<Row> df = spark.range(5)
.withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");
// df
+---+-----------+
| id| index1|
+---+-----------+
| 0| 0|
| 1| 8589934592|
| 2|17179869184|
| 3|25769803776|
| 4|25769803777|
+---+-----------+
// result
+---+-----------+----------+
| id| index1|good_index|
+---+-----------+----------+
| 0| 0| 1|
| 1| 8589934592| 2|
| 2|17179869184| 3|
| 3|25769803776| 4|
| 4|25769803777| 5|
+---+-----------+----------+

TA貢獻(xiàn)1878條經(jīng)驗(yàn) 獲得超4個(gè)贊
上面的答案經(jīng)過一些調(diào)整對(duì)我有用。下面是一個(gè)功能性的 Intellij Scratch 文件。我在 Spark 2.3.0 上:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
class Scratch {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("_LOCAL")
.master("local")
.getOrCreate();
Dataset<Row> df = spark.range(5)
.withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");
result.show();
}
public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
scala.collection.Iterator<Object> iterator = r.toSeq().iterator();
while(iterator.hasNext()) {
Object value = iterator.next();
assert value != null;
list.add(value);
}
list.add(index);
return RowFactory.create(list.toArray());
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));
return df.sparkSession().createDataFrame(rdd, newSchema);
}
}
輸出:
+---+------+----------+
| id|index1|good_index|
+---+------+----------+
| 0| 0| 1|
| 1| 1| 2|
| 2| 2| 3|
| 3| 3| 4|
| 4| 4| 5|
+---+------+----------+
添加回答
舉報(bào)