慕工程0101907
2019-03-01 10:58:37
場景描述:
現(xiàn)有許多行日志文本,按天壓縮成一個(gè)個(gè)TB級的gzip文件。
使用流對每個(gè)壓縮文件的數(shù)據(jù)段進(jìn)行傳輸然后解壓,對解壓出的文本分詞并索引
以后查到這個(gè)詞時(shí),定位到這個(gè)詞所在的文件和段,再用流傳輸并解壓
(實(shí)際上是想利用已有的壓縮文件構(gòu)造一個(gè)類似ES的搜索引擎)
現(xiàn)在的問題是,因?yàn)榻邮盏降牟皇峭暾膲嚎s文件而是塊狀二進(jìn)制數(shù)據(jù),所以接收的數(shù)據(jù)由于信息不完全而無法解壓
現(xiàn)在想實(shí)現(xiàn)這樣的功能:首先將接收到的流數(shù)據(jù)解壓還原為完整的數(shù)據(jù)(原始日志數(shù)據(jù)以換行符分隔,能得到每段流數(shù)據(jù)壓縮前的文本和對應(yīng)文件的偏移量就好),然后考慮到傳輸和存儲等過程可能使數(shù)據(jù)出錯(cuò),所以針對每段數(shù)據(jù)流,在出錯(cuò)的情況下解壓出盡可能多的數(shù)據(jù)。
部分相關(guān)代碼如下:(改自https://stackoverflow.com/que...)
import zlib
import traceback
CHUNKSIZE=30
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
f = open('test.py.gz','rb')
buffer = f.read(CHUNKSIZE)
i = 0
while buffer :
i += 1
try:
#skip two chunk
if i < 3 or i > 4:
outstr = d.decompress(buffer)
print('*'*10 + outstr + '#'*10)
except Exception, e:
print(traceback.print_exc())
finally:
buffer = f.read(CHUNKSIZE)
outstr = d.flush()
print(outstr)
f.close()
當(dāng)i>=3以后,每次循環(huán)均報(bào)錯(cuò)我的結(jié)論是 若流不連續(xù)(跳過接收部分?jǐn)?shù)據(jù)),則之后的數(shù)據(jù)都無法解壓。問題1:如果做到能正確的解壓出收到的每部分?jǐn)?shù)據(jù)?(因?yàn)榭赡軤可娴絞zip壓縮的算法和數(shù)據(jù)結(jié)構(gòu),我正在看相關(guān)代碼。如果可以通過追加傳輸頭部的某一chuck或者需要解壓的數(shù)據(jù)的前后某些chuck能解決問題也算可以)問題2:如果不能正確的解壓接收到的每部分?jǐn)?shù)據(jù),那么如何做到解壓出盡可能多的數(shù)據(jù)?
3 回答


慕尼黑5688855
TA貢獻(xiàn)1848條經(jīng)驗(yàn) 獲得超2個(gè)贊
我覺得可以做一個(gè)出錯(cuò)重新續(xù)傳的功能,傳輸前備份當(dāng)前這一段數(shù)據(jù)流,你得判斷出當(dāng)前傳輸?shù)倪@一段數(shù)據(jù)流是否傳輸完整了。這就要求傳送端和接收端之間的傳輸協(xié)議是你能改動(dòng)的,出現(xiàn)錯(cuò)誤就立刻反饋fail給傳輸端,從剛才這段重新續(xù)傳,沒有錯(cuò)誤就反饋OK,繼續(xù)傳輸下一段。這樣就能保證數(shù)據(jù)的完整性。如果文件太大,可以在內(nèi)存中備份多些數(shù)據(jù)段,做些細(xì)節(jié)性的判斷。
添加回答
舉報(bào)
0/150
提交
取消