1 回答

TA貢獻(xiàn)1864條經(jīng)驗(yàn) 獲得超6個贊
問題在于您如何udf在 Spark中注冊。您不應(yīng)將UserDefinedAggregateFunctionwhich is not an udfbut an udafused 用于聚合。相反,您應(yīng)該做的是:
spark.udf().register("toVector", toVector, new VectorUDT());
然后要使用注冊的函數(shù),請使用:
df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));
在udf本身應(yīng)稍微調(diào)整如下:
UDF1 toVector = new UDF1<Seq<Float>, Vector>(){
public Vector call(Seq<Float> t1) throws Exception {
List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
double[] DoubleArray = new double[t1.length()];
for (int i = 0 ; i < L.size(); i++) {
DoubleArray[i]=L.get(i);
}
return Vectors.dense(DoubleArray);
}
};
請注意,在Spark 2.3+ 中,您可以創(chuàng)建udf可直接調(diào)用的 Scala 樣式。從這個答案:
UserDefinedFunction toVector = udf(
(Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);
df3.withColumn("featuresnew", toVector.apply(col("feautres")));
添加回答
舉報(bào)