2 回答

TA貢獻1966條經(jīng)驗 獲得超4個贊
此代碼沒有泄漏。
為了演示,讓我們稍微更新一下**,這樣帖子就可以重現(xiàn)了。
main.go
package main
import (
"bytes"
"crypto/tls"
_ "expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"strconv"
"sync"
"time"
"github.com/gorilla/mux"
)
var (
//http client
Client *http.Client
//http Transport
Transport *http.Transport
)
func init() {
go http.ListenAndServe("localhost:6060", nil)
//Get Any command line argument passed
args := os.Args[1:]
numCPU := runtime.NumCPU()
if len(args) > 1 {
numCPU, _ = strconv.Atoi(args[0])
}
Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration() * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
Client = &http.Client{
// Timeout: time.Duration(300) * time.Millisecond,
Transport: Transport,
}
runtime.GOMAXPROCS(numCPU)
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "Hello!!!")
})
router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
prepareRequest(w, r, vars["name"])
}).Methods("POST", "GET")
// Register pprof handlers
// router.HandleFunc("/debug/pprof/", pprof.Index)
// router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
// router.HandleFunc("/debug/pprof/profile", pprof.Profile)
// router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
// router.HandleFunc("/debug/pprof/trace", pprof.Trace)
routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")
srv := &http.Server{
Addr: "localhost:8080",
/*ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
IdleTimeout: 10 * time.Second,*/
Handler: routerMiddleWare,
}
log.Fatal(srv.ListenAndServe())
}
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
// go func() {
// make(chan []byte) <- make([]byte, 10024)
// }()
//other part of the code and call to goroutine
var urls []string
urls = append(urls,
"http://localhost:7000/",
"http://localhost:7000/",
)
results, s, c := callUrls(urls)
finalCall(w, results, s, c)
}
type Response struct {
Status int
Url string
Body string
}
func callUrls(urls []string) ([]*Response, []string, []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
ch := make(chan func() (*Response, string, string), len(urls))
for _, url := range urls {
go func(url string) {
//decide if request is valid for client to make http call using country/os
isValid := true //assuming url to be called
if isValid {
//make post call
//request body have many more paramter, just sample included.
//instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
}
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
//req.Close = true
response, err := Client.Do(req)
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
}
return
}
defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
io.Copy(ioutil.Discard, response.Body)
//Close the body, forced this
//Also tried without defer, and only wothout following line
response.Body.Close()
//do something with response body replace a few string etc.
//and return
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
}
} else {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
}
}
}(url)
}
wg.Wait()
var (
results []*Response
msg []string
status []string
)
for {
r, x, y := (<-ch)()
if r != nil {
results = append(results, r)
msg = append(msg, x)
status = append(status, y)
}
if len(results) == len(urls) {
return results, msg, status
}
}
}
func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
fmt.Println("response", "response body", results, msg, status)
}
k/main.go
package main
import "net/http"
func main() {
y := make([]byte, 100)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(y)
})
http.ListenAndServe(":7000", nil)
}
安裝額外的可視化工具,并用于ab模擬一些負載,它將完成直觀演示的工作。
go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms
那時你閱讀了 的輸出expvarmon,如果它是現(xiàn)場的,你有類似的東西
你可以看到東西在揮動,gc 正在積極工作。
應用程序已加載,正在消耗內存,等待服務器釋放其 conn 和 gc 清理它們
您可以看到memstats.Alloc
, memstats.HeapAlloc
,memstats.HeapInuse
現(xiàn)在減少了,正如 gc 完成工作并且不存在泄漏時所預期的那樣。
如果你要檢查go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap
,就在ab
跑之后
它表明該應用程序正在使用177Mb內存。
其中大部分102Mb被net/http.Transport.getConn.
您的處理程序正在計算1Mb,其余的是需要的各種東西。
如果您要在服務器發(fā)布和 gc 之后截取屏幕截圖,您會看到一個更小的圖表。這里不演示。
現(xiàn)在讓我們生成一個泄漏并再次使用這兩個工具查看它。
在代碼中取消注釋,
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
go func() {
make(chan []byte) <- make([]byte, 10024)
}()
//...
重新啟動應用程序(按q,expvarmon雖然不是必需的)
go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms
表明
在expvarmon
你可以看到相同的行為,只是數(shù)字發(fā)生了變化,而在靜止狀態(tài),它被 gced 后,它仍然消耗了大量的內存,比一個 void golang http server 拿一個比較點要多得多。
同樣,對堆進行截圖,它顯示您的處理程序現(xiàn)在正在消耗大部分內存 ~ 450Mb
,注意箭頭,它顯示有 for 452mb
of10kb
分配和4.50Mb
of 96b
。它們分別對應于[]byte
被推送到chan []byte
.
最后,您可以檢查堆棧跟蹤以查找死的 goroutine,從而導致內存泄漏,打開http://localhost:6060/debug/pprof/goroutine?debug=1
goroutine profile: total 50012
50000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281
# 0x76b85c main.prepareRequest.func1+0x4c /home/mh-cbon/gow/src/test/oom/main.go:101
4 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281
# 0x42b685 internal/poll.runtime_pollWait+0x55 /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182
# 0x4c3a3a internal/poll.(*pollDesc).wait+0x9a /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87
// more...
它告訴我們程序正在托管50 012
goroutine,然后它按文件位置分組列出它們,其中第一個數(shù)字是運行實例的計數(shù),50 000
在本示例的第一組中。緊隨其后的是導致 goroutine 存在的堆棧跟蹤。
你可以看到有一堆系統(tǒng)的東西,在你的情況下,你不應該太擔心它。
如果你的程序按你認為的那樣工作,你必須尋找那些你認為不應該存在的人。
但是,總體而言,您的代碼并不令人滿意,并且可能并且可能應該通過對其分配和整體設計概念的徹底審查來改進。
** 這是對原始源代碼所做更改的摘要。
它添加了一個新程序
k/main.go
來充當后端服務器。它添加了
_ "expvar"
導入語句它啟動 pprof 在
init
階段期間注冊的 std api HTTP 服務器實例go http.ListenAndServe("localhost:6060", nil)
禁用客戶端超時
Timeout: time.Duration(300) * time.Millisecond,
,否則負載測試不返回 200s服務器地址設置為
Addr: "localhost:8080",
urls
在其中創(chuàng)建的值prepareRequest
設置為 len=2 的靜態(tài)列表req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(
它為{"body":"param"}添加了錯誤檢查)))
它禁用錯誤檢查
io.Copy(ioutil.Discard, response.Body)

TA貢獻1845條經(jīng)驗 獲得超8個贊
我已經(jīng)通過將net/http包替換為fasthttp. 早些時候我沒有使用它,因為我無法在 fasthttp 客戶端上找到超時方法,但我看到確實有一種DoTimeout用于 fasthttp 客戶端的方法,它在指定的持續(xù)時間后使請求超時。
這里更新的代碼:
在vars.go中 ClientFastHttp *fasthttp.Client
main.go
package main
import (
"./common"
"crypto/tls"
"fmt"
"github.com/gorilla/mux"
"github.com/valyala/fasthttp"
"log"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"strconv"
"sync"
"time"
)
func init() {
//Get Any command line argument passed
args := os.Args[1:]
numCPU := runtime.NumCPU()
if len(args) > 1 {
numCPU, _ = strconv.Atoi(args[0])
}
common.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration() * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
common.Client = &http.Client{
Timeout: time.Duration(300) * time.Millisecond,
Transport: common.Transport,
}
runtime.GOMAXPROCS(numCPU)
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "Hello!!!")
})
router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
prepareRequest(w, r, vars["name"])
}).Methods("POST")
// Register pprof handlers
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")
srv := &http.Server{
Addr: "0.0.0.0:" + "80",
/*ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
IdleTimeout: 10 * time.Second,*/
Handler: routerMiddleWare,
}
log.Fatal(srv.ListenAndServe())
}
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
//other part of the code and call to goroutine
var urls []string
results, s, c := callUrls(urls)
finalCall(w, results, s, c)
}
type Response struct {
Status int
Url string
Body string
}
func callUrls(urls []string) ([]*Response, []string, []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
ch := make(chan func() (*Response, string, string), len(urls))
for _, url := range urls {
go func(url string) {
//decide if request is valid for client to make http call using country/os
isValid := true //assuming url to be called
if isValid {
//make post call
//request body have many more paramter, just sample included.
//instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
req := fasthttp.AcquireRequest()
req.SetRequestURI(url)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
req.Header.SetMethod("POST")
req.SetBody([]byte(`{"body":"param"}`))
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req) // <- do not forget to release
defer fasthttp.ReleaseResponse(resp) // <- do not forget to release
//err := clientFastHttp.Do(req, response)
//endregion
t := time.Duration(300)
err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)
body := resp.Body()
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "error", "500"
}
return
}
/*defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
_, err = io.Copy(ioutil.Discard, response.Body)
//Close the body, forced this
//Also tried without defer, and only wothout following line
response.Body.Close()*/
//do something with response body replace a few string etc.
//and return
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
}
} else {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
}
}
}(url)
}
wg.Wait()
var (
results []*Response
msg []string
status []string
)
for {
r, x, y := (<-ch)()
if r != nil {
results = append(results, r)
msg = append(msg, x)
status = append(status, y)
}
if len(results) == len(urls) {
return results, msg, status
}
}
}
func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
fmt.Println("response", "response body", results, msg, status)
}
- 2 回答
- 0 關注
- 274 瀏覽
添加回答
舉報