我正在嘗試使用 Apache Beam 讀取 avro 文件并使用 Beam SQL 來轉(zhuǎn)換數(shù)據(jù)。我對 Beam 和 Java 還是新手。這是我的簡單代碼:public class BeamSQLReadAvro { @SuppressWarnings("serial") public static void main(String[] args) throws IOException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); Pipeline p = Pipeline.create(options); /* Schema definition */ Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc")); /* Create record/row */ PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro")); /* SQL Transform */ records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10")) /* Print output */ .apply("Output", MapElements.via( new SimpleFunction<Row, Row>() { @Override public Row apply(Row input) { System.out.println("PCOLLECTION: " + input.getValues()); return input; } } ) ); p.run().waitUntilFinish(); }}它給了我錯誤Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema我不明白,我定義了一個名為 schema 的變量。這里有什么指點嗎?
使用 Beam SQL 查詢 Avro 架構(gòu)
HUH函數(shù)
2024-01-05 16:43:20