第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

Datax3.0中配置解讀(2)

標(biāo)簽:
Java

限速的逻辑


首先我们自己可以想像限速的地方会在什么地方?

当reader读取数据,发送到内存通道的时候可以进行速度的控制。

顺着这个思路,我们开始阅读代码

1-我们知道BufferedRecordExchanger.sendToWriter()方法,是发送数据到内存通道的地方,

里面的flush();方法则是真正执行发送的地方。

@Override
public void flush() {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
this.channel.pushAll(this.buffer);//加入到arrayBlockQueue
this.buffer.clear();
this.bufferIndex = 0;
this.memoryBytes.set(0);
}

2-进入到com.alibaba.datax.core.transport.channel.Channel类的pushAll方法

public void pushAll(final Collection<Record> rs) {
        Validate.notNull(rs);
        Validate.noNullElements(rs);
        this.doPushAll(rs);
        this.statPush(rs.size(), this.getByteSize(rs));//限速在此处
    }

可以看到整个发送数据内存通道的逻辑。

大致分为两步:1-批量发送数据,2-对数据发送速度的控制

3-到此,我们已经找到核心的代码了,继续看是怎么进行限速的

4-在Channel构造函数中会初始化限速的参数

long byteSpeed = configuration.getLong(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);//channel限制的速度1Mb/S
 long recordSpeed = configuration.getLong(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
 默认情况下core.json里面配置的是:
"speed": {
                    "byte": -1,
                    "record": -1
         }, 
代表不会进行限速的处理

5-我们可以看到在statPush方法中



boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
        boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
        if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {//判断是否配置了
            return;
        }
这里代表会去判断byte和record是否大于0,然后才可以继续
如果配置了参数则会通过比较和判断计算出需要休眠的时间来达到限速
           // 休眠时间取较大值
            long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
                    recordLimitSleepTime : byteLimitSleepTime;
            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }


6-我们可以查看日志看到设置是否生效:

Channel set byte_speed_limit to 5242880.

注意channel.byte设置大之后,响应的内存参数也要加大,默认是"-Xms1g -Xmx1g"

到此如果需要对单个channel速度大小进行调整则可以修改此参数

类似:

"transport": {
    "channel": {
        "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
        "speed": {
                "byte": 5242880,
            "record": -1
        },
        "flowControlInterval": 20,
        "capacity": 512,
        "byteCapacity": 67108864
    },
    "exchanger": {
        "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
        "bufferSize": 32
    }
}


后续会继续对几个配置进行解读,来看一下调整参数增加并发处理任务的方式

點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫(xiě)下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專(zhuān)欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消