第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

從 kafka 發(fā)送消息到 websocket

從 kafka 發(fā)送消息到 websocket

Go
米脂 2022-11-23 19:48:43
我在 golang 中使用 gin-framework 創(chuàng)建了一個 web 服務(wù)。在這個項(xiàng)目中,我還使用了一些來自特定主題的 kafka 消息。我想要實(shí)現(xiàn)的是將我從主題中收到的消息倒入 websocket 中。所以通信只是一種方式,超過 1 個人可以連接到網(wǎng)絡(luò)套接字并看到傳入的消息。我想使用通道,所以在接收 kafka 消息的函數(shù)中我有這樣的東西:ch <- KafkaMessage在 gin 框架中,我創(chuàng)建了這樣的東西:requestRouterRPCv1.GET("wf-live", wsWorkFlowLive)func wsWorkFlowLive(c *gin.Context) {    ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)    if err != nil {        log.Println("error get connection")        log.Fatal(err)    }    defer ws.Close()    err = ws.WriteJSON(<-ch)    if err != nil {        log.Println("error write message: " + err.Error())    }}var upGrader = websocket.Upgrader{    CheckOrigin: func(r *http.Request) bool {        return true    },}這是我用來測試 websocket 的 html 片段: <!DOCTYPE html>   <html>     <head>       <meta charset="UTF-8" />       <title>index</title>    </head>     <body>       <h1>test websocket</h1>       <p id="message-json"></p>      <p id="message-text"></p>            <script>        function jsonWS() {          var ws = new WebSocket("ws://ws.acme.com/ws/v1/wf-live");          ws.onmessage = function (evt) {            console.log("Received Message: " + evt.data);            document.getElementById("message-json").innerText += evt.data;          };          ws.onclose = function (evt) {            console.log("Connection closed.");          };        }        // Start websocket        jsonWS();      </script>    </body>  </html>但是我想念一些東西,我是個新手,因?yàn)橐坏┦盏降谝粭l kafka 消息,我就會出現(xiàn)以下錯誤行為:僅顯示第一條消息,連接快速關(guān)閉后要查看第二個,我必須刷新頁面,這不是 websocket 方式因?yàn)檫B接關(guān)閉通道它不是紅色的,所以它一直停留在 cosume 函數(shù)中,直到我讀取它。我不能有這種行為為了避免第 3 點(diǎn),我想我必須有一種機(jī)制,只有當(dāng)一個或多個 ws 連接時,我才將消息發(fā)送到通道。
查看完整描述

2 回答

?
FFIVE

TA貢獻(xiàn)1797條經(jīng)驗(yàn) 獲得超6個贊

Gorilla 聊天示例接近您的需要。該示例將從任何客戶端接收到的消息廣播到所有連接的客戶端。執(zhí)行以下操作以使代碼適應(yīng)您的用例:


更改客戶端讀取泵以丟棄收到的消息,而不是將消息發(fā)送到集線器。


func (c *Client) readPump() {

    defer func() {

        c.hub.unregister <- c

        c.conn.Close()

    }()

    c.conn.SetReadLimit(maxMessageSize)

    c.conn.SetReadDeadline(time.Now().Add(pongWait))

    c.conn.SetPongHandler(func(string) error { 

    c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })

    for {

        if _, _, err := c.NextReader(); err != nil {

          break

        }

    }

}

更改您的 Kafka 讀取循環(huán)以將消息發(fā)送到Hub.broadcast通道。


for {

    msg, err := c.ReadMessage(xxx)

    if err != nil {

          // handle error

    }

    hub.broadcast <- msg

}

刪除將客戶端發(fā)送隊(duì)列中的消息合并為單個 websocket 消息的代碼,或者調(diào)整客戶端以在單個 websocket 消息中處理多個 Kafka 消息。


查看完整回答
反對 回復(fù) 2022-11-23
?
翻翻過去那場雪

TA貢獻(xiàn)2065條經(jīng)驗(yàn) 獲得超14個贊

你很接近。只是缺少兩件事。

1- 使用select語句繼續(xù)從 kafka 通道接收新消息。

2- 保持活躍的 websocket 連接。這個答案有更多細(xì)節(jié)

讓我知道這是否適合您。


查看完整回答
反對 回復(fù) 2022-11-23
  • 2 回答
  • 0 關(guān)注
  • 327 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號