第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

將Java中的地圖列表轉(zhuǎn)換為spark中的數(shù)據(jù)集

將Java中的地圖列表轉(zhuǎn)換為spark中的數(shù)據(jù)集

德瑪西亞99 2023-05-17 15:56:48
我有一個(gè) Java 中的 Map 列表,基本上代表行。List<Map<String, Object>> dataList = new ArrayList<>();Map<String, Object> row1 = new HashMap<>();row1.put("fund", "f1");row1.put("broker", "b1");row1.put("qty", 100);Map<String, Object> row2 = new HashMap<>();row2.put("fund", "f2");row2.put("broker", "b2");row2.put("qty", 200);dataList.add(row1);dataList.add(row2);我正在嘗試從中創(chuàng)建一個(gè) Spark DataFrame。我試圖將其轉(zhuǎn)換為JavaRDD<Map<String, Object>>使用JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);但我不確定如何從這里轉(zhuǎn)到Dataset<Row>. 我看過 Scala 示例,但沒有看過 Java 示例。我還嘗試將列表轉(zhuǎn)換為 JSON 字符串,并讀取 JSON 字符串。String jsonStr = mapper.writeValueAsString(dataList);但似乎我必須將它寫入文件然后使用讀取Dataset<Row> df = spark.read().json(pathToFile);如果可能的話,我寧愿在內(nèi)存中進(jìn)行,而不是寫入文件并從那里讀取。SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")            .set("spark.sql.shuffle.partitions", "1");JavaSparkContext sc = new JavaSparkContext(sparkConf);    SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();List<Map<String, Object>> dataList = new ArrayList<>();Map<String, Object> row1 = new HashMap<>();row1.put("fund", "f1");row1.put("broker", "b1");row1.put("qty", 100);Map<String, Object> row2 = new HashMap<>();row2.put("fund", "f2");row2.put("broker", "b2");row2.put("qty", 200);dataList.add(row1);dataList.add(row2);ObjectMapper mapper = new ObjectMapper();    String jsonStr = mapper.writeValueAsString(dataList);JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);data.show();
查看完整描述

4 回答

?
胡說叔叔

TA貢獻(xiàn)1804條經(jīng)驗(yàn) 獲得超8個(gè)贊

您根本不需要使用 RDD。您需要做的是從地圖列表中提取所需的架構(gòu),將地圖列表轉(zhuǎn)換為行列表,然后使用spark.createDataFrame.


在 Java 中,這有點(diǎn)痛苦,尤其是在創(chuàng)建Row對(duì)象時(shí),但它是這樣進(jìn)行的:


List<String> cols = new ArrayList(dataList.get(0).keySet());

List<Row> rows = dataList

    .stream()

    .map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))

    .map(row -> row.collect(Collectors.toList()))

    .map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())

    .map(Row$.MODULE$::fromSeq)

    .collect(Collectors.toList());


StructType schema = new StructType(

    cols.stream()

        .map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))

        .collect(Collectors.toList())

        .toArray(new StructField[0])

);

Dataset<Row> result = spark.createDataFrame(rows, schema);


查看完整回答
反對(duì) 回復(fù) 2023-05-17
?
慕桂英546537

TA貢獻(xiàn)1848條經(jīng)驗(yàn) 獲得超10個(gè)贊

spark 文檔已經(jīng)指出了如何加載內(nèi)存中的 json 字符串。


這是來自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例


// Alternatively, a DataFrame can be created for a JSON dataset represented by

// a Dataset<String> storing one JSON object per string.

List<String> jsonData = Arrays.asList(

        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");

Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());

Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);

anotherPeople.show();

// +---------------+----+

// |        address|name|

// +---------------+----+

// |[Columbus,Ohio]| Yin|

// +---------------+----+


查看完整回答
反對(duì) 回復(fù) 2023-05-17
?
慕標(biāo)5832272

TA貢獻(xiàn)1966條經(jīng)驗(yàn) 獲得超4個(gè)贊

public class MyRow implements Serializable {


  private String fund;

  private String broker;

  private int qty;


  public MyRow(String fund, String broker, int qty) {

    super();

    this.fund = fund;

    this.broker = broker;

    this.qty = qty;

  }


  public String getFund() {

    return fund;

  }


  public void setFund(String fund) {

    this.fund = fund;

  }



  public String getBroker() {

    return broker;

  }


  public void setBroker(String broker) {

    this.broker = broker;

  }


  public int getQty() {

    return qty;

  }


  public void setQty(int qty) {

    this.qty = qty;

  }


}

現(xiàn)在創(chuàng)建一個(gè) ArrayList。此列表中的每個(gè)項(xiàng)目都將充當(dāng)最終數(shù)據(jù)框中的行。


MyRow r1 = new MyRow("f1", "b1", 100);

MyRow r2 = new MyRow("f2", "b2", 200);

List<MyRow> dataList = new ArrayList<>();

dataList.add(r1);

dataList.add(r2);

現(xiàn)在我們必須將此列表轉(zhuǎn)換為數(shù)據(jù)集 -


Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);

ds.show()


查看完整回答
反對(duì) 回復(fù) 2023-05-17
?
慕姐4208626

TA貢獻(xiàn)1852條經(jīng)驗(yàn) 獲得超7個(gè)贊

import org.apache.spark.api.java.function.Function;

private static JavaRDD<Map<String, Object>> rows;

private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));

public void test(){

    rows = sc.parallelize(list);

    JavaRDD<Row> rowRDD = rows.map(f);

    Map<String, Object> headMap = list.get(0);

    TreeMap<String, Object> headerMap = new TreeMap<>(headMap);

    List<StructField> fields = new ArrayList<>();

    StructField field;

    for (String key : headerMap.keySet()) {

        System.out.println("key:::"+key);

        Object value = list.get(0).get(key);

        if (value instanceof Integer) {

            field = DataTypes.createStructField(key, DataTypes.IntegerType, true);

        }

        else if (value instanceof Double) {

            field = DataTypes.createStructField(key, DataTypes.DoubleType, true);

        }

        else if (value instanceof Date || value instanceof java.util.Date) {

            field = DataTypes.createStructField(key, DataTypes.DateType, true);

        }

        else {

            field = DataTypes.createStructField(key, DataTypes.StringType, true);

        }

            fields.add(field);

    }

    StructType struct = DataTypes.createStructType(fields);

    Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);

}


查看完整回答
反對(duì) 回復(fù) 2023-05-17
  • 4 回答
  • 0 關(guān)注
  • 321 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)