4 回答

TA貢獻(xiàn)1804條經(jīng)驗(yàn) 獲得超8個(gè)贊
您根本不需要使用 RDD。您需要做的是從地圖列表中提取所需的架構(gòu),將地圖列表轉(zhuǎn)換為行列表,然后使用spark.createDataFrame.
在 Java 中,這有點(diǎn)痛苦,尤其是在創(chuàng)建Row對(duì)象時(shí),但它是這樣進(jìn)行的:
List<String> cols = new ArrayList(dataList.get(0).keySet());
List<Row> rows = dataList
.stream()
.map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))
.map(row -> row.collect(Collectors.toList()))
.map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())
.map(Row$.MODULE$::fromSeq)
.collect(Collectors.toList());
StructType schema = new StructType(
cols.stream()
.map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))
.collect(Collectors.toList())
.toArray(new StructField[0])
);
Dataset<Row> result = spark.createDataFrame(rows, schema);

TA貢獻(xiàn)1848條經(jīng)驗(yàn) 獲得超10個(gè)贊
spark 文檔已經(jīng)指出了如何加載內(nèi)存中的 json 字符串。
這是來自https://spark.apache.org/docs/latest/sql-data-sources-json.html的示例
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

TA貢獻(xiàn)1966條經(jīng)驗(yàn) 獲得超4個(gè)贊
public class MyRow implements Serializable {
private String fund;
private String broker;
private int qty;
public MyRow(String fund, String broker, int qty) {
super();
this.fund = fund;
this.broker = broker;
this.qty = qty;
}
public String getFund() {
return fund;
}
public void setFund(String fund) {
this.fund = fund;
}
public String getBroker() {
return broker;
}
public void setBroker(String broker) {
this.broker = broker;
}
public int getQty() {
return qty;
}
public void setQty(int qty) {
this.qty = qty;
}
}
現(xiàn)在創(chuàng)建一個(gè) ArrayList。此列表中的每個(gè)項(xiàng)目都將充當(dāng)最終數(shù)據(jù)框中的行。
MyRow r1 = new MyRow("f1", "b1", 100);
MyRow r2 = new MyRow("f2", "b2", 200);
List<MyRow> dataList = new ArrayList<>();
dataList.add(r1);
dataList.add(r2);
現(xiàn)在我們必須將此列表轉(zhuǎn)換為數(shù)據(jù)集 -
Dataset<Row> ds = spark.createDataFrame(dataList, MyRow.class);
ds.show()

TA貢獻(xiàn)1852條經(jīng)驗(yàn) 獲得超7個(gè)贊
import org.apache.spark.api.java.function.Function;
private static JavaRDD<Map<String, Object>> rows;
private static final Function f = (Function<Map<String, Object>, Row>) strObjMap -> RowFactory.create(new TreeMap<String, Object>(strObjMap).values().toArray(new Object[0]));
public void test(){
rows = sc.parallelize(list);
JavaRDD<Row> rowRDD = rows.map(f);
Map<String, Object> headMap = list.get(0);
TreeMap<String, Object> headerMap = new TreeMap<>(headMap);
List<StructField> fields = new ArrayList<>();
StructField field;
for (String key : headerMap.keySet()) {
System.out.println("key:::"+key);
Object value = list.get(0).get(key);
if (value instanceof Integer) {
field = DataTypes.createStructField(key, DataTypes.IntegerType, true);
}
else if (value instanceof Double) {
field = DataTypes.createStructField(key, DataTypes.DoubleType, true);
}
else if (value instanceof Date || value instanceof java.util.Date) {
field = DataTypes.createStructField(key, DataTypes.DateType, true);
}
else {
field = DataTypes.createStructField(key, DataTypes.StringType, true);
}
fields.add(field);
}
StructType struct = DataTypes.createStructType(fields);
Dataset<Row> data = this.spark.createDataFrame(rowRDD, struct);
}
添加回答
舉報(bào)