第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

將 JavaRDD<Status> 轉(zhuǎn)換為 JavaRDD<String> 的問題

將 JavaRDD<Status> 轉(zhuǎn)換為 JavaRDD<String> 的問題

人到中年有點(diǎn)甜 2023-06-08 20:51:09
我正在嘗試將推文從 twitter 保存到 MongoDb 數(shù)據(jù)庫。我有RDD<Status>,我正在嘗試借助 ObjectMapper 將其轉(zhuǎn)換為 JSON 格式。但是這種轉(zhuǎn)換存在一些問題(public class Main {    //set system credentials for access to twitter    private static void setTwitterOAuth() {        System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);        System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);        System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);    }    public static void main(String [] args) {        setTwitterOAuth();        SparkConf conf = new SparkConf().setMaster("local[2]")                                        .setAppName("SparkTwitter");        JavaSparkContext sparkContext = new JavaSparkContext(conf);        JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);        //Stream that contains just tweets in english        JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));        enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());        enTweetsDStream.print();        jssc.start();        jssc.awaitTermination();    }    static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {     try {            ObjectMapper objectMapper = new ObjectMapper();            SQLContext sqlContext = new SQLContext(sparkContext);            JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));            DataFrame dataFrame = sqlContext.read().json(tweet);        } catch (Exception e) {            System.out.println("Error saving to database");        }    }JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));這是一個(gè)問題。需要不兼容的類型JavaRDD<String>,但地圖被推斷為javaRDD<R>
查看完整描述

1 回答

?
慕標(biāo)琳琳

TA貢獻(xiàn)1830條經(jīng)驗(yàn) 獲得超9個(gè)贊

不幸的是,Java 類型推斷并不總是非常聰明,所以我在這些情況下所做的是提取我的 lambda 的所有位作為變量,直到我找到一個(gè) Java 無法為其提供準(zhǔn)確類型的位。然后我給表達(dá)式我認(rèn)為它應(yīng)該具有的類型,看看為什么 Java 會(huì)抱怨它。有時(shí)它只是編譯器的一個(gè)限制,您必須顯式地將表達(dá)式“轉(zhuǎn)換”為所需的類型,有時(shí)您會(huì)發(fā)現(xiàn)代碼存在問題。在你的情況下,代碼對(duì)我來說很好,所以一定有別的東西。

然而,我有一個(gè)評(píng)論:在這里你支付一次 JSON 序列化(從StatusJSON 字符串)然后反序列化(從 JSON 字符串到Row)的成本。另外,您沒有向您提供任何架構(gòu)Dataset,因此它必須兩次傳遞數(shù)據(jù)(或根據(jù)您的配置對(duì)其進(jìn)行采樣)以推斷架構(gòu)。如果數(shù)據(jù)很大,所有這些都可能非常昂貴。如果性能是一個(gè)問題并且相對(duì)簡單,我建議您直接編寫從Status到的轉(zhuǎn)換。RowStatus

另一個(gè)“順便說一句”:您正在隱式序列化您的ObjectMapper,很可能您不想這樣做。看起來該類確實(shí)支持 Java 序列化,但具有特殊的邏輯。由于 Spark 的默認(rèn)配置是使用 Kryo(其性能比 Java 序列化好得多),我懷疑它在使用默認(rèn)FieldSerializer.?您有以下三種選擇:

  • 使對(duì)象映射器靜態(tài)化以避免序列化它

  • 配置您的 Kryo 注冊器以ObjectMapper使用 Java 序列化序列化/反序列化類型的對(duì)象。那會(huì)起作用,但不值得付出努力。

  • 到處使用 Java 序列化而不是 Kryo。餿主意!它很慢并且占用大量空間(內(nèi)存和磁盤取決于序列化對(duì)象的寫入位置)。


查看完整回答
反對(duì) 回復(fù) 2023-06-08
  • 1 回答
  • 0 關(guān)注
  • 147 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)