第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

Zookeeper Jute

1. 前言

在我們使用 Zookeeper 客戶端和 Zookeeper 服務(wù)端建立連接,發(fā)送請求時,Zookeeper 客戶端需要把請求協(xié)議進(jìn)行序列化才能進(jìn)行發(fā)送,Zookeeper 服務(wù)端接收到請求,還需要把請求協(xié)議進(jìn)行反序列化才能解析請求,這樣才完成了一次請求的發(fā)送。那么序列化是什么呢?我們?yōu)槭裁匆褂眯蛄谢??Zookeeper 中又是如何使用序列化的呢?我們就帶著這些問題開始本節(jié)的內(nèi)容。

2. Zookeeper 序列化

在 Zookeeper 的通信與會話一節(jié)中,我們學(xué)習(xí)了 Zookeeper 使用的是基于 TCP 的通信協(xié)議,TCP 協(xié)議是一種面向連接的、可靠的、基于字節(jié)流的通信協(xié)議。
我們想要使用 Zookeeper 客戶端向 Zookeeper 服務(wù)端發(fā)送請求,我們就需要把請求發(fā)送的 Java 對象轉(zhuǎn)換為字節(jié)流的形式,這個轉(zhuǎn)換的過程就是序列化。相對來說,把字節(jié)流轉(zhuǎn)換為 Java 對象的過程就是反序列化。
Zookeeper 序列化
那么我們什么時候使用序列化呢?

  1. 對象需要持久化時;
  2. 對象進(jìn)行網(wǎng)絡(luò)傳輸時。

Zookeeper 的應(yīng)用場景就需要大量的網(wǎng)絡(luò)傳輸,所以需要使用序列化來提高 Zookeeper 客戶端與服務(wù)端之間的通信效率。
在 Zookeeper 中,這一過程不需要我們自己去實(shí)現(xiàn),無論是 Zookeeper 客戶端還是 Zookeeper 服務(wù)端,都已經(jīng)把序列化和反序列化的過程進(jìn)行了封裝。那么 Zookeeper 時如何實(shí)現(xiàn)序列化的呢?接下來我們就來介紹 Zookeeper 的序列化實(shí)現(xiàn)方式 Jute。

2.1 Jute 介紹

Jute 前身是 Hadoop Record IO 中的序列化組件,從 Zookeeper 第一個正式版,到目前最新的穩(wěn)定版本 Apache ZooKeeper 3.6.1 都是用的 Jute 作為序列化組件。
為什么 Zookeeper 會一直選擇 Jute 作為它的序列化組件呢?并不是 Jute 的性能比其他的序列化框架好,相反的,現(xiàn)在市面上有許多性能更好的序列化組件,比如 Apache Thrift,Apache Avro 等組件,性能都要優(yōu)于 Jute。之所以還使用 Jute,是因?yàn)榈侥壳盀橹梗琂ute 序列化還不是 Zookeeper 的性能瓶頸,沒有必要強(qiáng)行更換,而且很難避免因?yàn)樘鎿Q基礎(chǔ)組件而帶來的一系列版本兼容的問題。
簡單的介紹了一下 Jute ,那么在 Zookeeper 中,Jute 又是如何實(shí)現(xiàn)的呢?接下來我們就來講解 Jute 在 Zookeeper 中的實(shí)現(xiàn)。

2.2 Jute 實(shí)現(xiàn)

在 Zookeeper 的通信及會話一節(jié)中,我們學(xué)習(xí)了 Zookeeper 的請求協(xié)議和響應(yīng)協(xié)議,并查看了部分源碼。我們可以發(fā)現(xiàn),無論是請求協(xié)議還是響應(yīng)協(xié)議的具體類,都實(shí)現(xiàn)了接口 Record,Record 接口其實(shí)就是 Jute 定義的序列化接口。

package org.apache.jute;

import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience.Public;

@Public
public interface Record {
    // 序列化
    void serialize(OutputArchive var1, String var2) throws IOException;
	// 反序列化
    void deserialize(InputArchive var1, String var2) throws IOException;
}

我們這里使用請求頭 RequestHeader 作為例子來查看序列化和反序列化的實(shí)現(xiàn):

// 請求頭,實(shí)現(xiàn)了 Record 
public class RequestHeader implements Record  {
    private int xid;
    private int type;
    // 使用 OutputArchive 進(jìn)行序列化,tag:序列化標(biāo)識符
    public void serialize(OutputArchive a_, String tag) throws IOException {
        a_.startRecord(this, tag);
        a_.writeInt(this.xid, "xid");
        a_.writeInt(this.type, "type");
        a_.endRecord(this, tag);
    }
	// 使用 InputArchive 進(jìn)行反序列化,tag 序列化標(biāo)識符
    public void deserialize(InputArchive a_, String tag) throws IOException {
        a_.startRecord(tag);
        this.xid = a_.readInt("xid");
        this.type = a_.readInt("type");
        a_.endRecord(tag);
    }
}

在 serialize 序列化方法中,根據(jù)成員變量的數(shù)據(jù)類型來選擇 OutputArchive 的方法來進(jìn)行序列化操作。在 deserialize 反序列化方法中,根據(jù)成員變量的數(shù)據(jù)類型來選擇 InputArchive 的方法來進(jìn)行反序列化操作。
在這里我們可以發(fā)現(xiàn),Record 只是定義了序列化方法和反序列化方法,真正執(zhí)行序列化操作是 OutputArchive 和 InputArchive 這兩個接口的實(shí)現(xiàn)類。接下來我們講解 OutputArchive 序列化接口的實(shí)現(xiàn)類 BinaryOutputArchive。

2.3 BinaryOutputArchive

BinaryOutputArchive 是二進(jìn)制的序列化方式,這種方式將 Java 對象轉(zhuǎn)化成二進(jìn)制的格式來進(jìn)行數(shù)據(jù)傳輸。在它的具體方法中,使用的是 java.io.DataOutputStream 的方法來完成 Java 對象到二進(jìn)制的轉(zhuǎn)換,我們來看看具體實(shí)現(xiàn):

/**
* 二進(jìn)制的序列化
*/
public class BinaryOutputArchive implements OutputArchive {
    // 定義字節(jié)緩沖區(qū)大小
    private ByteBuffer bb = ByteBuffer.allocate(1024);
    // 數(shù)據(jù)輸出接口 DataOutput 
    private DataOutput out;
	// 使用 OutputStream 初始化 BinaryOutputArchive
    public static BinaryOutputArchive getArchive(OutputStream strm) {
        return new BinaryOutputArchive(new DataOutputStream(strm));
    }
	// 構(gòu)造方法傳入 DataOutput
    public BinaryOutputArchive(DataOutput out) {
        this.out = out;
    }

    // 輸出 byte 類型為二進(jìn)制
    public void writeByte(byte b, String tag) throws IOException {
        this.out.writeByte(b);
    }
	// 輸出 boolean 類型為二進(jìn)制
    public void writeBool(boolean b, String tag) throws IOException {
        this.out.writeBoolean(b);
    }
	// 輸出 int 類型為二進(jìn)制
    public void writeInt(int i, String tag) throws IOException {
        this.out.writeInt(i);
    }
	// 輸出 long 類型為二進(jìn)制
    public void writeLong(long l, String tag) throws IOException {
        this.out.writeLong(l);
    }
	// 輸出 float 類型為二進(jìn)制
    public void writeFloat(float f, String tag) throws IOException {
        this.out.writeFloat(f);
    }
	// 輸出 double 類型為二進(jìn)制
    public void writeDouble(double d, String tag) throws IOException {
        this.out.writeDouble(d);
    }
	// 工具方法:字符串轉(zhuǎn)字節(jié)緩沖區(qū)
    private ByteBuffer stringToByteBuffer(CharSequence s) {
        this.bb.clear();
        int len = s.length();

        for(int i = 0; i < len; ++i) {
            if (this.bb.remaining() < 3) {
                ByteBuffer n = ByteBuffer.allocate(this.bb.capacity() << 1);
                this.bb.flip();
                n.put(this.bb);
                this.bb = n;
            }

            char c = s.charAt(i);
            if (c < 128) {
                this.bb.put((byte)c);
            } else if (c < 2048) {
                this.bb.put((byte)(192 | c >> 6));
                this.bb.put((byte)(128 | c & 63));
            } else {
                this.bb.put((byte)(224 | c >> 12));
                this.bb.put((byte)(128 | c >> 6 & 63));
                this.bb.put((byte)(128 | c & 63));
            }
        }

        this.bb.flip();
        return this.bb;
    }
	// 輸出 String 類型為二進(jìn)制
    public void writeString(String s, String tag) throws IOException {
        if (s == null) {
            this.writeInt(-1, "len");
        } else {
            // String 不為空轉(zhuǎn)字節(jié)緩沖區(qū)
            ByteBuffer bb = this.stringToByteBuffer(s);
            this.writeInt(bb.remaining(), "len");
            // 輸出 字節(jié)數(shù)組
            this.out.write(bb.array(), bb.position(), bb.limit());
        }
    }
	// 輸出 字節(jié)數(shù)組為二進(jìn)制
    public void writeBuffer(byte[] barr, String tag) throws IOException {
        if (barr == null) {
            this.out.writeInt(-1);
        } else {
            this.out.writeInt(barr.length);
            this.out.write(barr);
        }
    }
}

介紹了序列化的具體實(shí)現(xiàn)類,接下來就是反序列化接口 InputArchive 的實(shí)現(xiàn)類 BinaryInputArchive。

2.4 BinaryInputArchive

BinaryInputArchive 是二進(jìn)制的反序列化,也就是把二進(jìn)制格式轉(zhuǎn)換成 Java 對象。在它的具體方法中,使用了 java.io.DataOutputStream 的方法來完成二進(jìn)制格式到 Java 對象的轉(zhuǎn)換,我們來看具體實(shí)現(xiàn):

package org.apache.jute;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
/**
 * 二進(jìn)制的反序列化
 */
public class BinaryInputArchive implements InputArchive {
	// 長度異常的字符串
    public static final String UNREASONBLE_LENGTH = "Unreasonable length = ";

    // 最大緩沖區(qū)
    public static final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);
    // 額外的最大緩沖區(qū)
    private static final int extraMaxBuffer;

    static {
        final Integer configuredExtraMaxBuffer =
            Integer.getInteger("zookeeper.jute.maxbuffer.extrasize", maxBuffer);
        if (configuredExtraMaxBuffer < 1024) {
            extraMaxBuffer = 1024;
        } else {
            extraMaxBuffer = configuredExtraMaxBuffer;
        }
    }
	// 數(shù)據(jù)輸入接口 DataInput
    private DataInput in;
    // 最大緩沖區(qū)的大小
    private int maxBufferSize;
    // 額外的最大緩沖區(qū)的大小
    private int extraMaxBufferSize;
	// 使用 InputStream 實(shí)例化 BinaryInputArchive
    public static BinaryInputArchive getArchive(InputStream strm) {
        return new BinaryInputArchive(new DataInputStream(strm));
    }
	
    private static class BinaryIndex implements Index {
        private int nelems;

        BinaryIndex(int nelems) {
            this.nelems = nelems;
        }

        public boolean done() {
            return (nelems <= 0);
        }

        public void incr() {
            nelems--;
        }
    }

    // 構(gòu)造方法
    public BinaryInputArchive(DataInput in) {
        this(in, maxBuffer, extraMaxBuffer);
    }

    // 構(gòu)造方法
    public BinaryInputArchive(DataInput in, int maxBufferSize, int extraMaxBufferSize) {
        this.in = in;
        this.maxBufferSize = maxBufferSize;
        this.extraMaxBufferSize = extraMaxBufferSize;
    }
	// 讀取二進(jìn)制到 Byte 類型
    public byte readByte(String tag) throws IOException {
        return in.readByte();
    }
	// 讀取二進(jìn)制到 Boolean 類型
    public boolean readBool(String tag) throws IOException {
        return in.readBoolean();
    }
	// 讀取二進(jìn)制到 int 類型
    public int readInt(String tag) throws IOException {
        return in.readInt();
    }
	// 讀取二進(jìn)制到 long 類型
    public long readLong(String tag) throws IOException {
        return in.readLong();
    }
	// 讀取二進(jìn)制到 float 類型
    public float readFloat(String tag) throws IOException {
        return in.readFloat();
    }
	// 讀取二進(jìn)制到 double 類型
    public double readDouble(String tag) throws IOException {
        return in.readDouble();
    }
	// 讀取二進(jìn)制到 String 類型
    public String readString(String tag) throws IOException {
        int len = in.readInt();
        if (len == -1) {
            return null;
        }
        checkLength(len);
        byte[] b = new byte[len];
        in.readFully(b);
        return new String(b, StandardCharsets.UTF_8);
    }
	// 讀取二進(jìn)制到字節(jié)數(shù)組
    public byte[] readBuffer(String tag) throws IOException {
        int len = readInt(tag);
        if (len == -1) {
            return null;
        }
        checkLength(len);
        byte[] arr = new byte[len];
        in.readFully(arr);
        return arr;
    }
}

3. 總結(jié)

在本節(jié)內(nèi)容中我們學(xué)習(xí)了什么是序列化,為什么要使用序列化來進(jìn)行數(shù)據(jù)傳輸, 以及 Zookeeper 的序列化方式 Jute 的具體實(shí)現(xiàn)。以下是本節(jié)內(nèi)容總結(jié):以下是本節(jié)內(nèi)容總結(jié):

  1. 什么是序列化。
  2. 什么時候使用序列化。
  3. Zookeeper 的序列化方式 Jute。