2 回答

TA貢獻(xiàn)1811條經(jīng)驗(yàn) 獲得超4個(gè)贊
在上述答案的幫助下,我找到了一個(gè)工作示例,該示例可在 github 上找到: https ://github.com/alexflint/bigquery-storage-api-example
主要代碼是:
const (
project = "myproject"
dataset = "mydataset"
table = "mytable"
trace = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)
// the data we will stream to bigquery
var rows = []*Row{
{Name: "John Doe", Age: 104},
{Name: "Jane Doe", Age: 69},
{Name: "Adam Smith", Age: 33},
}
func main() {
ctx := context.Background()
// create the bigquery client
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// create the write stream
// a COMMITTED write stream inserts data immediately into bigquery
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}
// get the stream by calling AppendRows
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
// get the protobuf descriptor for our row type
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
// serialize the rows
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
// send the rows to bigquery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
TraceId: trace, // identifies this client
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// protocol buffer schema
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// protocol buffer data
Rows: &storagepb.ProtoRows{
SerializedRows: data, // serialized protocol buffer data
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
// get the response, which will tell us whether it worked
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
log.Println("done")
}
上面“Row”結(jié)構(gòu)的協(xié)議緩沖區(qū)定義是:
syntax = "proto3";
package tutorial;
option go_package = ".;main";
message Row {
string Name = 1;
int32 Age = 2;
}
您需要首先使用與協(xié)議緩沖區(qū)對(duì)應(yīng)的模式創(chuàng)建一個(gè) bigquery 數(shù)據(jù)集和表。請(qǐng)參閱上面鏈接的存儲(chǔ)庫(kù)中的自述文件以了解如何執(zhí)行此操作。
運(yùn)行上面的代碼后,數(shù)據(jù)在 bigquery 中顯示如下:
$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE
+------------+-----+
| name | age |
+------------+-----+
| John Doe | 104 |
| Jane Doe | 69 |
| Adam Smith | 33 |
+------------+-----+
感謝大家的幫助!
- 2 回答
- 0 關(guān)注
- 191 瀏覽
添加回答
舉報(bào)