我正在使用 Apache Beam Go SDK 并且很難以正確的格式獲取PCollection以按鍵進(jìn)行分組/組合。我在 PCollection 的字符串中每個(gè)鍵有多個(gè)記錄,如下所示:Bob, catBob, dogCarla, catCarla, bunnyDoug, horse我想使用GroupByKey和CombinePerKey,這樣我就可以像這樣匯總每個(gè)人的寵物:Bob, [cat, dog]Carla, [cat, bunny]Doug, [horse]如何將 PCollection<string> 轉(zhuǎn)換為 PCollection<KV<string, string>>?他們在這里提到了類似的東西,但不包括聚合字符串值的代碼。我可以使用 ParDo 獲取字符串鍵和字符串值,如下所示,但我不知道如何轉(zhuǎn)換為 GroupPerKey 輸入所需的 KV<string, string> 或 CoGBK<string, string> 格式。pcolOut := beam.ParDo(s, func(line string) (string, string) { cleanString := strings.TrimSpace(line) openingChar := "," iStart := strings.Index(cleanString, openingChar) key := cleanString[0:iStart] value := cleanString[iStart+1:] // How to convert to PCollection<KV<string, string>> before returning? return key, value}, pcolIn)groupedKV := beam.GroupByKey(s, pcolOut) 它失敗并出現(xiàn)以下錯(cuò)誤。有什么建議么?panic: inserting ParDo in scope root creating new DoFn in scope root binding fn main.main.func2 binding params [{Value string} {Value string}] to input CoGBK<string,string>values of CoGBK<string,string> cannot bind to {Value string}
1 回答

汪汪一只貓
TA貢獻(xiàn)1898條經(jīng)驗(yàn) 獲得超8個(gè)贊
要映射到 KV,您可以應(yīng)用 MapElements 并使用 into() 來設(shè)置 KV 類型,并在 via() 邏輯中創(chuàng)建一個(gè)新KV.of(myKey, myValue)的 ,例如,要獲取一個(gè)KV<String,String>,請使用以下內(nèi)容:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));
- 1 回答
- 0 關(guān)注
- 123 瀏覽
添加回答
舉報(bào)
0/150
提交
取消