3 回答

TA貢獻(xiàn)1951條經(jīng)驗(yàn) 獲得超3個(gè)贊
更新
Set
, Seq
, Map
, Date
, Timestamp
BigDecimal
SQLImplicits
.
@since 2.0.0
Encoders.scala
SQLImplicits.scala
到底是什么問題?
SparkSession
Encoders
createDataset
Encoder[T]
T
import spark.implicits._
import spark.implicits._class MyObj(val i: Int)// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
無法找到存儲(chǔ)在數(shù)據(jù)集中的類型的編碼器。導(dǎo)入sqlContext.Inductions支持原始類型(Int、String等)和Producttype(CASE類)。_對(duì)序列化其他類型的支持將在以后的版本中添加。
Product
import spark.implicits._case class Wrap[T](unwrap: T)class MyObj(val i: Int) // ...val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
未支持的OperationException:沒有為MyObj找到編碼器
Product
MyObj
Dataset[(Int,MyObj)]
MyObj
一些擴(kuò)展的類 Product
編譯,盡管在運(yùn)行時(shí)總是崩潰,而且 沒有辦法傳遞嵌套類型的自定義編碼器(我無法僅為 MyObj
使它知道如何編碼 Wrap[MyObj]
或 (Int,MyObj)
).
只管用 kryo
kryo
import spark.implicits._class MyObj(val i: Int)implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
import scala.reflect.ClassTagimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
spark-shell
spark.implicits._
class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new typeval d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
kryo
map
, filter
, foreach
join
d2
d3
d2.printSchema// root// |-- value: binary (nullable = true)
元組的部分解
import org.apache.spark.sql.{Encoder,Encoders}import scala.reflect.ClassTagimport spark.implicits._ // we can still take advantage of all the old implicitsimplicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2]): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3]): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)// ... you can keep making these
class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)) .toDF("_1","_2").as[(Int,MyObj)].alias("d2")val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
_1
, _2
"value"
d4.printSchema// root// |-- _1: struct (nullable = false)// | |-- _1: integer (nullable = true)// | |-- _2: binary (nullable = true)// |-- _2: struct (nullable = false)// | |-- _1: integer (nullable = true)// | |-- _2: binary (nullable = true)
允許我們?yōu)樵M獲得單獨(dú)的列(因此我們可以再次加入元組,耶!) 我們可以再一次依賴于請(qǐng)求(所以不需要經(jīng)過。) kryo
(到處都是) 幾乎完全向后兼容 import spark.implicits._
(涉及一些重命名) 是嗎? 不
讓我們加入 kyro
序列化二進(jìn)制列,更不用說那些可能具有 將某些元組列重命名為“value”(如果有必要的話,可以通過轉(zhuǎn)換將其撤消),會(huì)產(chǎn)生令人不快的副作用。 .toDF
,指定新的列名,并將其轉(zhuǎn)換回DataSet-模式名稱似乎通過聯(lián)接(最需要它們的地方)被保留。
一般類的部分解
kryo
MyObj
Int
, java.util.UUID
Set[String]
kryo
String
UUID
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])// alias for the type to convert to and fromtype MyObjEncoded = (Int, String, Set[String])// implicit conversionsimplicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")))).toDF("i","u","s").as[MyObjEncoded]
d.printSchema// root// |-- i: integer (nullable = false)// |-- u: string (nullable = true)// |-- s: binary (nullable = true)

TA貢獻(xiàn)2012條經(jīng)驗(yàn) 獲得超12個(gè)贊
假設(shè)您想使用自定義Enum:
trait CustomEnum { def value:String }case object Foo extends CustomEnum { val value = "F" }case object Bar extends CustomEnum { val value = "B" }object CustomEnum { def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get}
// First define a UDT class for it:class CustomEnumUDT extends UserDefinedType[CustomEnum] { override def sqlType: DataType = org.apache.spark.sql.types.StringType override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value) // Note that this will be a UTF8String type override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString) override def userClass: Class[CustomEnum] = classOf[CustomEnum]}// Then Register the UDT Class! // NOTE: you have to put this file into the org.apache.spark package!UDTRegistration.register(classOf[CustomEnum]. getName, classOf[CustomEnumUDT].getName)
case class UsingCustomEnum(id:Int, en:CustomEnum)val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo)).toDS()seq.filter(_.en == Foo).show()println(seq.collect())
假設(shè)您想使用多態(tài)記錄:
trait CustomPolycase class FooPoly(id:Int) extends CustomPolycase class BarPoly(value:String, secondValue:Long) extends CustomPoly
case class UsingPoly(id:Int, poly:CustomPoly)Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1))).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false}).show()
class CustomPolyUDT extends UserDefinedType[CustomPoly] { val kryo = new Kryo() override def sqlType: DataType = org.apache.spark.sql.types.BinaryType override def serialize(obj: CustomPoly): Any = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(obj) bos.toByteArray } override def deserialize(datum: Any): CustomPoly = { val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]]) val ois = new ObjectInputStream(bis) val obj = ois.readObject() obj.asInstanceOf[CustomPoly] } override def userClass: Class[CustomPoly] = classOf[CustomPoly]}
// NOTE: The file you do this in has to be inside of the org.apache.spark package!UDTRegistration.register(classOf[CustomPoly]. getName, classOf[CustomPolyUDT].getName)
// As shown above:case class UsingPoly(id:Int, poly:CustomPoly)Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1))).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false}).show()
添加回答
舉報(bào)