第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

使用 java 將索引列添加到 apache spark Dataset<Row>

使用 java 將索引列添加到 apache spark Dataset<Row>

手掌心 2022-11-30 13:50:10
下面的問題有 scala 和 pyspark 的解決方案,這個(gè)問題中提供的解決方案不適用于連續(xù)的索引值。Spark Dataframe:如何添加索引列:又名分布式數(shù)據(jù)索引我在 Apache-spark 中有一個(gè)現(xiàn)有數(shù)據(jù)集,我想根據(jù)索引從中選擇一些行。我打算添加一個(gè)索引列,其中包含從 1 開始的唯一值,并根據(jù)該列的值獲取行。我發(fā)現(xiàn)以下方法可以添加使用排序依據(jù)的索引:df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));我不想使用排序依據(jù)。我需要索引的順序與它們?cè)跀?shù)據(jù)集中的順序相同。有什么幫助嗎?
查看完整描述

2 回答

?
阿晨1998

TA貢獻(xiàn)2037條經(jīng)驗(yàn) 獲得超6個(gè)贊

據(jù)我所知,您正在嘗試將索引(具有連續(xù)值)添加到數(shù)據(jù)框。不幸的是,在 Spark 中沒有內(nèi)置函數(shù)可以做到這一點(diǎn)。您只能使用 df.withColumn("index", ) 添加遞增索引(但不一定具有連續(xù)值monotonicallyIncreasingId)。


盡管如此,RDD API 中有一個(gè)zipWithIndex函數(shù)可以完全滿足您的需要。因此,我們可以定義一個(gè)函數(shù),將數(shù)據(jù)幀轉(zhuǎn)換為 RDD,添加索引并將其轉(zhuǎn)換回?cái)?shù)據(jù)幀。


我不是 java 中 spark 的專家(scala 更緊湊)所以可能會(huì)做得更好。這是我會(huì)怎么做。


public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {

    JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {

        Row r = t._1;

        Long index = t._2 + 1;

        ArrayList<Object> list = new ArrayList<>();

        r.toSeq().iterator().foreach(x -> list.add(x));

        list.add(index);

        return RowFactory.create(list);

    });

    StructType newSchema = df.schema()

            .add(new StructField(name, DataTypes.LongType, true, null));

    return df.sparkSession().createDataFrame(rdd, newSchema);

}

以下是您將如何使用它。請(qǐng)注意內(nèi)置 spark 函數(shù)的作用與我們的方法的作用形成對(duì)比。


Dataset<Row> df = spark.range(5)

    .withColumn("index1", functions.monotonicallyIncreasingId());

Dataset<Row> result = zipWithIndex(df, "good_index");

// df

+---+-----------+

| id|     index1|

+---+-----------+

|  0|          0|

|  1| 8589934592|

|  2|17179869184|

|  3|25769803776|

|  4|25769803777|

+---+-----------+


// result

+---+-----------+----------+

| id|     index1|good_index|

+---+-----------+----------+

|  0|          0|         1|

|  1| 8589934592|         2|

|  2|17179869184|         3|

|  3|25769803776|         4|

|  4|25769803777|         5|

+---+-----------+----------+


查看完整回答
反對(duì) 回復(fù) 2022-11-30
?
UYOU

TA貢獻(xiàn)1878條經(jīng)驗(yàn) 獲得超4個(gè)贊

上面的答案經(jīng)過一些調(diào)整對(duì)我有用。下面是一個(gè)功能性的 Intellij Scratch 文件。我在 Spark 2.3.0 上:


import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.functions;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;


import java.util.ArrayList;


class Scratch {

    public static void main(String[] args) {

        SparkSession spark = SparkSession

                    .builder()

                    .appName("_LOCAL")

                    .master("local")

                    .getOrCreate();

        Dataset<Row> df = spark.range(5)

                .withColumn("index1", functions.monotonicallyIncreasingId());

        Dataset<Row> result = zipWithIndex(df, "good_index");

        result.show();

    }

    public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {

        JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {

            Row r = t._1;

            Long index = t._2 + 1;

            ArrayList<Object> list = new ArrayList<>();

            scala.collection.Iterator<Object> iterator = r.toSeq().iterator();

            while(iterator.hasNext()) {

                Object value = iterator.next();

                assert value != null;

                list.add(value);

            }

            list.add(index);

            return RowFactory.create(list.toArray());

        });

        StructType newSchema = df.schema()

                .add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));

        return df.sparkSession().createDataFrame(rdd, newSchema);

    }

}

輸出:


+---+------+----------+

| id|index1|good_index|

+---+------+----------+

|  0|     0|         1|

|  1|     1|         2|

|  2|     2|         3|

|  3|     3|         4|

|  4|     4|         5|

+---+------+----------+


查看完整回答
反對(duì) 回復(fù) 2022-11-30
  • 2 回答
  • 0 關(guān)注
  • 151 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)