我正在嘗試使用 JdbcIO.Read 讀取 Java Beam 中的云 SQL 表。我想使用 .withRowMapper(Resultset resultSet) 方法將 Resultset 中的每一行轉換為 GenericData.Record。有沒有辦法可以將 JSON 架構字符串作為 .withRowMapper 方法中的輸入傳遞,例如 ParDo 接受 sideInputs 作為 PCollectionView我嘗試過執(zhí)行這兩種讀取操作(在同一 JdbcIO.Read 轉換中從 information_schema.columns 和 My Table 讀取)。但是,我想先生成 Schema PCollection,然后使用 JdbcIO.Read 讀取表我正在動態(tài)生成表的 Avro 模式,如下所示:PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read() .withDataSourceConfiguration(config) .withCoder(StringUtf8Coder.of()) .withQuery("SELECT DISTINCT column_name, data_type \n" + "FROM information_schema.columns\n" + "WHERE table_name = " + "'" + tableName + "'") .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> { // code here to generate avro schema string // this works fine for me}))創(chuàng)建 PCollectionView 它將保存每個表的 json 模式。 PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());// I want to access this view as side input in next JdbcIO.Read operation// something like this ;pipeline.apply(JdbcIO.<String>read() .withDataSourceConfiguration(config) .withCoder(StringUtf8Coder.of()) .withQuery(queryString) .withRowMapper(new JdbcIO.RowMapper<String>() { @Override public String mapRow(ResultSet resultSet) throws Exception { // access schema here and use it to parse and create //GenericData.Record from ResultSet fields as per schema return null; } })). withSideInputs(My PCollectionView here); // this option is not there right now.有沒有更好的方法來解決這個問題?
1 回答

千巷貓影
TA貢獻1829條經(jīng)驗 獲得超7個贊
此時 IO API 不接受 SideInputs。
在讀取后立即添加 ParDo 并在那里進行映射應該是可行的。ParDo 可以接受側面輸入。
添加回答
舉報
0/150
提交
取消