1 回答

TA貢獻1847條經(jīng)驗 獲得超11個贊
這些參數(shù)決定 將使用多少個線程。這就是為什么默認情況下,可用 CPU 內(nèi)核計數(shù)是:parallelism
ForkJoinPool
parallelism
Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())
在你的情況下,裝瓶工應該檢查文件是否存在并將其上傳到S3。這里的時間將取決于至少幾個因素:CPU,網(wǎng)卡和驅(qū)動程序,操作系統(tǒng),其他。在您的案例中,S3 網(wǎng)絡操作時間似乎沒有 CPU 限制,因為您正在通過創(chuàng)建更多模擬工作線程來觀察改進,也許網(wǎng)絡請求由操作系統(tǒng)排隊。
的正確值因工作負荷類型而異。由于上下文切換的負面影響,CPU 密集型工作流最好使用默認值等于 CPU 內(nèi)核。像您這樣的非CPU密集型工作負載可以通過更多的工作線程來加速,假設工作負載不會阻塞CPU,例如通過忙于等待。parallelism
parallelism
中沒有一個單一的理想值。parallelism
ForkJoinPool
由于您所有有用的建議和解釋,我設法減少到8秒。
由于瓶頸是上傳到 aws s3,并且您提到了 aws 上的非阻塞 API,因此經(jīng)過一些研究,我發(fā)現(xiàn)類 TransferManager 包含非阻塞上傳。
轉(zhuǎn)移管理器類
因此,我沒有使用分叉連接池來增加線程數(shù),而是保留了簡單的并行流:
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});
上傳到S3方法改變了一點,我沒有使用亞馬遜S3,而是使用了轉(zhuǎn)移管理器:
public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
if (s3client.doesObjectExist(bucketName, fileName)) {
throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
}
InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(fileContent.getBytes().length);
return transferManager.upload(bucketName, fileName, targetStream, metadata);
}
這樣,當調(diào)用上載時,它不會等待它完成,從而允許處理另一個 DTO。處理完所有 DTO 后,我會檢查其上傳狀態(tài)以查看可能的錯誤(在第一個 Each 之外)
添加回答
舉報