第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

如何在數(shù)據(jù)集中存儲(chǔ)自定義對(duì)象?

如何在數(shù)據(jù)集中存儲(chǔ)自定義對(duì)象?

婷婷同學(xué)_ 2019-06-19 15:45:49
如何在數(shù)據(jù)集中存儲(chǔ)自定義對(duì)象?根據(jù)介紹星火數(shù)據(jù)集:在我們期待Spark2.0的同時(shí),我們計(jì)劃對(duì)數(shù)據(jù)集進(jìn)行一些令人興奮的改進(jìn),特別是:.自定義編碼器-盡管我們目前為各種各樣的類型自動(dòng)生成編碼器,但我們希望為自定義對(duì)象打開一個(gè)API。并嘗試將自定義類型存儲(chǔ)在Dataset導(dǎo)致以下錯(cuò)誤:無法找到存儲(chǔ)在數(shù)據(jù)集中的類型的編碼器。導(dǎo)入sqlContext.Inductions支持原始類型(Int、String等)和Producttype(CASE類)。_對(duì)序列化其他類型的支持將在以后的版本中添加?;颍寒惓#何凑业骄幋a器用于.有什么解決辦法嗎?注意,這個(gè)問題僅作為CommunityWiki答案的入口點(diǎn)存在。隨時(shí)更新/改進(jìn)問題和答案。
查看完整描述

3 回答

?
飲歌長(zhǎng)嘯

TA貢獻(xiàn)1951條經(jīng)驗(yàn) 獲得超3個(gè)贊

更新

這個(gè)答案仍然是有效和信息豐富的,盡管現(xiàn)在情況更好,自從2.2/2.3,這增加了內(nèi)置編碼器的支持SetSeqMapDateTimestamp,和BigDecimal..如果您堅(jiān)持只使用case類和通常的Scala類型來創(chuàng)建類型,那么應(yīng)該可以只使用SQLImplicits.


不幸的是,在這方面幾乎沒有增加任何幫助。尋覓@since 2.0.0在……里面Encoders.scalaSQLImplicits.scala查找主要與原始類型有關(guān)的內(nèi)容(以及對(duì)Case類的一些調(diào)整)。所以,首先要說的是:目前對(duì)自定義類編碼器沒有真正好的支持。..這樣的話,下面是一些我們可以期望做得很好的技巧,考慮到我們目前所擁有的一切。作為一個(gè)預(yù)先的免責(zé)聲明:這不會(huì)完美的工作,我會(huì)盡我最大的努力使所有的限制清楚和預(yù)先。

到底是什么問題?

當(dāng)您想創(chuàng)建數(shù)據(jù)集時(shí),SPark“需要一個(gè)編碼器(將T類型的JVM對(duì)象與內(nèi)部SparkSQL表示形式相互轉(zhuǎn)換),該編碼器通常是通過從SparkSession,也可以通過調(diào)用Encoders“(摘自醫(yī)生createDataset)。編碼器將采用以下形式Encoder[T]哪里T是您正在編碼的類型。第一個(gè)建議是增加import spark.implicits._(這給了你這些(第二個(gè)建議是顯式地傳遞隱式編碼器,使用這,這個(gè)一組編碼器相關(guān)功能。

沒有普通類可用的編碼器,所以

import spark.implicits._class MyObj(val i: Int)// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

將給出以下隱式相關(guān)編譯時(shí)錯(cuò)誤:

無法找到存儲(chǔ)在數(shù)據(jù)集中的類型的編碼器。導(dǎo)入sqlContext.Inductions支持原始類型(Int、String等)和Producttype(CASE類)。_對(duì)序列化其他類型的支持將在以后的版本中添加。

但是,如果將剛才用于在某個(gè)類中獲取上述錯(cuò)誤的任何類型包裝,則Product,錯(cuò)誤會(huì)被延遲到運(yùn)行時(shí),所以

import spark.implicits._case class Wrap[T](unwrap: T)class MyObj(val i: Int)
// ...val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

編譯很好,但是在運(yùn)行時(shí)會(huì)失敗。

未支持的OperationException:沒有為MyObj找到編碼器

這樣做的原因是,實(shí)施者SPark創(chuàng)建的Induces實(shí)際上只在運(yùn)行時(shí)(通過Scala關(guān)系)生成。在本例中,在編譯時(shí)所有的SPark檢查都是最外層的類擴(kuò)展Product(所有的CASE類都這樣做),并且只在運(yùn)行時(shí)才意識(shí)到它仍然不知道如何處理MyObj(如果我試圖創(chuàng)建一個(gè)Dataset[(Int,MyObj)]-星火等待運(yùn)行時(shí)繼續(xù)運(yùn)行MyObj)。這些是迫切需要解決的核心問題:

  • 一些擴(kuò)展的類

    Product

    編譯,盡管在運(yùn)行時(shí)總是崩潰,而且
  • 沒有辦法傳遞嵌套類型的自定義編碼器(我無法僅為

    MyObj

    使它知道如何編碼

    Wrap[MyObj]

    (Int,MyObj)).

只管用kryo

每個(gè)人建議的解決方案是使用kryo編碼器。

import spark.implicits._class MyObj(val i: Int)implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

不過,這很快就會(huì)變得很乏味。特別是當(dāng)您的代碼正在操作各種數(shù)據(jù)集、加入、分組等時(shí)。那么,為什么不直接默示這一切都是自動(dòng)完成的呢?

import scala.reflect.ClassTagimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

現(xiàn)在,我似乎可以做任何我想做的事情(下面的示例在spark-shell哪里spark.implicits._自動(dòng)導(dǎo)入)

class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d)).alias("d2")
 // mapping works fine and ..val d3 = d1.map(d => (d.i,  d)).alias("d3") 
 // .. deals with the new typeval d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者幾乎。問題是使用kryo導(dǎo)致SPark只將DataSet中的每一行存儲(chǔ)為平面二進(jìn)制對(duì)象。為mapfilterforeach這就足夠了,但是對(duì)于像join,星火確實(shí)需要將它們分隔成列。檢查架構(gòu)d2d3,您可以看到只有一個(gè)二進(jìn)制列:

d2.printSchema// root//  |-- value: binary (nullable = true)

元組的部分解

因此,在Scala中使用InstitucesinScala的魔力(更多在6.26.3過載分辨率),我可以為自己做一系列能做得盡可能好的事情,至少對(duì)于元組來說是這樣,并且可以很好地與現(xiàn)有的Institutions一起工作:

import org.apache.spark.sql.{Encoder,Encoders}import scala.reflect.ClassTagimport spark.implicits._  
// we can still take advantage of all the old implicitsimplicit def single[A](implicit c: ClassTag[A]):
 Encoder[A] = Encoders.kryo[A](c)implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)// ... you can keep making these

然后,帶著這些請(qǐng)求,我可以讓上面的例子起作用,盡管用了一些列重命名。

class MyObj(val i: Int)val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))val d2 = d1.map(d => (d.i+1,d))
.toDF("_1","_2").as[(Int,MyObj)].alias("d2")val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我還沒有弄清楚如何獲得預(yù)期的元組名稱(_1_2.)默認(rèn)情況下不用重命名-如果有人想玩這個(gè)游戲,這,這個(gè)名字"value"被介紹和這,這個(gè)通常添加元組名稱的位置。但是,關(guān)鍵是我現(xiàn)在有了一個(gè)很好的結(jié)構(gòu)化模式:

d4.printSchema// root//  |-- _1: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  |   
 |-- _2: binary (nullable = true)//  |-- _2: struct (nullable = false)//  |    |-- _1: integer (nullable = true)//  | 
    |-- _2: binary (nullable = true)

總之,這個(gè)解決辦法是:

  • 允許我們?yōu)樵M獲得單獨(dú)的列(因此我們可以再次加入元組,耶!)
  • 我們可以再一次依賴于請(qǐng)求(所以不需要經(jīng)過。)

    kryo

    (到處都是)
  • 幾乎完全向后兼容

    import spark.implicits._

    (涉及一些重命名)
  • 是嗎?

    讓我們加入

    kyro

    序列化二進(jìn)制列,更不用說那些可能具有
  • 將某些元組列重命名為“value”(如果有必要的話,可以通過轉(zhuǎn)換將其撤消),會(huì)產(chǎn)生令人不快的副作用。

    .toDF

    ,指定新的列名,并將其轉(zhuǎn)換回DataSet-模式名稱似乎通過聯(lián)接(最需要它們的地方)被保留。

一般類的部分解

這個(gè)不太愉快,也沒有很好的解決辦法。但是,現(xiàn)在我們有了上面的元組解決方案,我有一個(gè)預(yù)感-來自另一個(gè)答案的隱式轉(zhuǎn)換解決方案也不會(huì)那么痛苦,因?yàn)槟梢詫⒏鼜?fù)雜的類轉(zhuǎn)換為元組。然后,在創(chuàng)建DataSet之后,您可能會(huì)使用dataframe方法重命名這些列。如果一切順利,這是真的一個(gè)改進(jìn),因?yàn)槲椰F(xiàn)在可以在類的字段上執(zhí)行聯(lián)接。如果我只使用了一個(gè)平面二進(jìn)制kryo序列化程序是不可能的。

下面是一個(gè)做了一些事情的例子:我有一個(gè)類MyObj其中有類型的字段。Intjava.util.UUID,和Set[String]..第一個(gè)照顧好自己。第二個(gè),雖然我可以使用kryo如果存儲(chǔ)為String(自UUID這通常是我想反對(duì)的事情。第三個(gè)真正屬于二進(jìn)制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])// alias for the type to convert to and fromtype MyObjEncoded = 
(Int, String, Set[String])// implicit conversionsimplicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)implicit 
def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

現(xiàn)在,我可以使用這個(gè)機(jī)器創(chuàng)建一個(gè)具有良好模式的數(shù)據(jù)集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar")))).toDF("i","u","s").as[MyObjEncoded]

模式向我展示了I列的正確名稱和前兩種情況,這兩種情況我都可以使用。

d.printSchema// root//  |-- i: integer (nullable = false)//  |-- u: string (nullable = true)//  |-- s: binary (nullable = true)


查看完整回答
反對(duì) 回復(fù) 2019-06-19
?
繁花如伊

TA貢獻(xiàn)2012條經(jīng)驗(yàn) 獲得超12個(gè)贊

您可以使用UDT注冊(cè),然后使用案例類、元組等.所有正確的工作與您的用戶定義的類型!

假設(shè)您想使用自定義Enum:

trait CustomEnum { def value:String }case object Foo extends CustomEnum  { val value = "F" }case object Bar extends CustomEnum  
{ val value = "B" }object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get}

登記如下:

// First define a UDT class for it:class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]}// Then Register the UDT Class!
  // NOTE: you have to put this file into the org.apache.spark package!UDTRegistration.register(classOf[CustomEnum].
  getName, classOf[CustomEnumUDT].getName)

那就用它!

case class UsingCustomEnum(id:Int, en:CustomEnum)val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)).toDS()seq.filter(_.en == Foo).show()println(seq.collect())

假設(shè)您想使用多態(tài)記錄:

trait CustomPolycase class FooPoly(id:Int) extends CustomPolycase class BarPoly(value:String, secondValue:Long) extends CustomPoly

..它的用法是這樣的:

case class UsingPoly(id:Int, poly:CustomPoly)Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false}).show()

您可以編寫一個(gè)自定義的UDT,它將所有內(nèi)容編碼為字節(jié)(我在這里使用java序列化,但更好的方法可能是檢測(cè)SPark的Kryo上下文)。

首先定義UDT類:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]}

然后注冊(cè):

// NOTE: The file you do this in has to be inside of the org.apache.spark package!UDTRegistration.register(classOf[CustomPoly].
getName, classOf[CustomPolyUDT].getName)

那你就可以用它了!

// As shown above:case class UsingPoly(id:Int, poly:CustomPoly)Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false}).show()


查看完整回答
反對(duì) 回復(fù) 2019-06-19
  • 3 回答
  • 0 關(guān)注
  • 692 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)