2 回答

TA貢獻(xiàn)1824條經(jīng)驗 獲得超8個贊
在與我的同事考慮后,我們決定實施和使用_ingestAPI,而不是在 ES 中創(chuàng)建一個管道,該管道在每個文檔上插入服務(wù)器文檔攝取日期。
腳步:
創(chuàng)建時間戳管道
PUT _ingest/pipeline/timestamp_pipeline
{
? "description" : "Inserts timestamp field for all documents",
? "processors" : [
? ? {
? ? ? "set" : {
? ? ? ? "field": "insert_date",
? ? ? ? "value": "{{_ingest.timestamp}}"
? ? ? }
? ? }
? ]
}
更新索引以添加新的默認(rèn)字段
PUT /*/_settings
{
? "index" : {
? ? "default_pipeline": "timestamp_pipeline"
? }
}
在 Python 中,我會_scroll像這樣使用 API:
? ? es = Elasticsearch(cfg.esUrl, port = cfg.esPort, timeout = 200)
? ? doc = {
? ? ? "query": {
? ? ? ? "range": {
? ? ? ? ? "insert_date": {
? ? ? ? ? ? "gte": lastRowDateOffset
? ? ? ? ? }
? ? ? ? }
? ? ? }
? ? }
? ? res = es.search(
? ? ? ? index = Index,
? ? ? ? sort = "insert_date:asc",
? ? ? ? scroll = "2m",
? ? ? ? size = NumberOfResultsPerPage,
? ? ? ? body = doc
? ? )
lastRowDateOffset最后一次跑步的日期在哪里

TA貢獻(xiàn)1871條經(jīng)驗 獲得超8個贊
我想您正在尋找的是一個流式傳輸/更改 API,@Val 在這里對此進(jìn)行了很好的描述,還有一個開放的功能請求。
與此同時,您不能真正依賴size
和from
參數(shù)——您可能會進(jìn)行冗余查詢并在重復(fù)項到達(dá)您的數(shù)據(jù)倉庫之前對其進(jìn)行處理。
另一種選擇是在這方面跳過 ES 并直接流式傳輸?shù)絺}庫嗎?我的意思是,在給定時間之前拍攝一次 ES 快照(這樣您就可以保留歷史數(shù)據(jù)),將其提供給倉庫,然后直接從您獲取數(shù)據(jù)的地方流式傳輸?shù)絺}庫。
附錄
AFAIK 默認(rèn)排序是按插入日期。但是沒有內(nèi)部_insertTime
或類似的東西。你可以使用游標(biāo)——它被稱為滾動,這是一個 py實現(xiàn)。但這是從“最新”文檔到“第一個”文檔,反之亦然。所以它會給你所有現(xiàn)有的文檔,但我不太確定你滾動時新添加的文檔。然后你想再次運行滾動,這是次優(yōu)的。
您還可以預(yù)先排序您的索引,當(dāng)結(jié)合滾動時,它應(yīng)該非常適合您的用例。
添加回答
舉報