<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0"
xmlns:content="http://purl.org/rss/1.0/modules/content/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:slash="http://purl.org/rss/1.0/modules/slash/"
xmlns:atom="http://www.w3.org/2005/Atom"
xmlns:wfw="http://wellformedweb.org/CommentAPI/">
<channel>
<title>dwt&#039;s life - go</title>
<link>https://dwt.life/tag/go/</link>
<atom:link href="https://dwt.life/feed/tag/go/" rel="self" type="application/rss+xml" />
<language>zh-CN</language>
<description></description>
<lastBuildDate>Thu, 28 Oct 2021 12:47:00 +0800</lastBuildDate>
<pubDate>Thu, 28 Oct 2021 12:47:00 +0800</pubDate>
<item>
<title>golang socket连接复用 - smux</title>
<link>https://dwt.life/archives/139/</link>
<guid>https://dwt.life/archives/139/</guid>
<pubDate>Thu, 28 Oct 2021 12:47:00 +0800</pubDate>
<dc:creator>Ricky</dc:creator>
<description><![CDATA[今天来介绍一个socket连接复用的包https://github.com/xtaci/smux如图所示，多个channel输入通过smux合并在一个连接中，后端服务将连接中的channel分离...]]></description>
<content:encoded xml:lang="zh-CN"><![CDATA[
<p>今天来介绍一个socket连接复用的包</p><p><a href="https://github.com/xtaci/smux">https://github.com/xtaci/smux</a></p><p>如图所示，多个channel输入通过smux合并在一个连接中，后端服务将连接中的channel分离出来进行处理</p><p><img src="https://pic.8oh.com.cn/cos/2021/10/28/f3626e698b3be_1635396387.png" alt="1635396383182.png" title="1635396383182.png"></p><p>场景分析<br>假设一个简单的使用场景，一个apiservice网关服务对外提供HTTP接口，后面还有一个rand随机数服务，对内提供随机数TCP接口。</p><p>客户端访问apiservice接口，apiservice连接randservice服务获取数据并返回。如果不做多路复用的话，apiservice和randservice之间的连接数就是客户端请求数，这样apiservice和randservice之间连接过多会导致性能问题。</p><pre><code>               n link                   n link
+-----------+          +-------------+           +---------------+
|           &lt;----------&gt;             &lt;-----------&gt;               |
|  client   &lt;----------&gt;  apiservice &lt;-----------&gt;  randservice  |
|           &lt;----------&gt;             &lt;-----------&gt;               |
+-----------+          +-------------+           +---------------+</code></pre><p>经过多路复用后，apiservice和randservice之间只有一个连接，这样无论多少个客户端请求都不会导致连接过多问题。</p><pre><code>               n link                   1 link
+-----------+          +-------------+           +---------------+
|           &lt;----------&gt;             |           |               |
|  client   &lt;----------&gt;  apiservice &lt;-----------&gt;  randservice  |
|           &lt;----------&gt;             |           |               |
+-----------+          +-------------+           +---------------+</code></pre><p>（当然这只是个示例场景而已，生产中apiservice和randservice之间使用RPC框架即可，不用我们手动写socket通信）</p><p>代码示例<br>1.随机数服务 randservice.go</p><pre><code>package main

import (
    &quot;bytes&quot;
    &quot;encoding/binary&quot;
    &quot;fmt&quot;
    &quot;github.com/rs/zerolog&quot;
    &quot;github.com/rs/zerolog/log&quot;
    &quot;github.com/xtaci/smux&quot;
    &quot;math/rand&quot;
    &quot;net&quot;
    &quot;runtime&quot;
    &quot;time&quot;
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

/**
一个生成随机数的tcp服务
客户端发送&#039;R&#039;, &#039;A&#039;, &#039;N&#039;, &#039;D&#039;，服务返回一个随机数
*/
func main() {
    listener, err := net.Listen(&quot;tcp&quot;, &quot;:9000&quot;)
    if err != nil {
        panic(err)
    }
    log.Info().Msg(&quot;随机数服务启动，监听9000端口&quot;)
    defer listener.Close()
    for {
        conn, err := listener.Accept()
        if err != nil {
            fmt.Println(err.Error())
            continue
        }
        go SessionHandler(conn)
    }
}

/**
处理会话
每个tcp连接生成一个会话session
*/
func SessionHandler(conn net.Conn) {
    session, err := smux.Server(conn, nil)
    if err != nil {
        panic(err)
    }
    log.Info().Msgf(&quot;收到客户端连接，创建新会话，对端地址：%s&quot;, session.RemoteAddr().String())

    for !session.IsClosed() {
        stream, err := session.AcceptStream()
        if err != nil {
            fmt.Println(err.Error())
            break
        }
        go StreamHandler(stream)
    }
    log.Info().Msgf(&quot;客户端连接断开，销毁会话，对端地址：%s&quot;, session.RemoteAddr().String())
}

/**
流数据处理
*/
func StreamHandler(stream *smux.Stream) {
    buffer := make([]byte, 1024)
    n, err := stream.Read(buffer)
    if err != nil {
        log.Error().Msgf(&quot;流id：%d，异常信息：%s&quot;, stream.ID(), err.Error())
        stream.Close()
        return
    }
    cmd := buffer[:n]
    if bytes.Equal(cmd, []byte{&#039;R&#039;, &#039;A&#039;, &#039;N&#039;, &#039;D&#039;}) {
        rand := rand.Uint64()
        response := make([]byte, 8)
        binary.BigEndian.PutUint64(response, rand)
        stream.Write(response)
        log.Debug().Msgf(&quot;收到客户端数据，流id：%d，随机数：%d， 响应数据：%v&quot;, stream.ID(), rand, response)
    } else {
        log.Warn().Msgf(&quot;收到未知请求命令，流id：%d，请求命令：%v&quot;, stream.ID(), cmd)
    }
}</code></pre><p>2.api接口服务 apiservice.go</p><pre><code>package main

import (
    &quot;encoding/binary&quot;
    &quot;fmt&quot;
    &quot;github.com/rs/zerolog&quot;
    &quot;github.com/rs/zerolog/log&quot;
    &quot;github.com/xtaci/smux&quot;
    &quot;net&quot;
    &quot;net/http&quot;
    &quot;runtime&quot;
)

/**
随机数服务客户端连接
*/
var randClient *smux.Session

func init() {
    //连接后端随机数服务
    conn, err := net.Dial(&quot;tcp&quot;, &quot;:9000&quot;)
    if err != nil {
        log.Warn().Msg(&quot;随机数服务未启动&quot;)
        panic(err)
    }
    session, err := smux.Client(conn, nil)
    if err != nil {
        log.Error().Msg(&quot;打开会话失败&quot;)
        panic(err)
    }
    randClient = session
}

/**
一个api网关，对外提供api接口
调用随机数服务来获取随机数
*/
func main() {
    defer randClient.Close()
    http.HandleFunc(&quot;/rand&quot;, RandHandler)
    http.ListenAndServe(&quot;:8080&quot;, nil)
}

/**
随机数接口
*/
func RandHandler(w http.ResponseWriter, r *http.Request) {
    stream, err := randClient.OpenStream()
    if err != nil {
        w.WriteHeader(500)
        fmt.Fprint(w, err.Error())
    } else {
        log.Debug().Msgf(&quot;收到请求，打开流成功，流id：%d&quot;, stream.ID())
        defer stream.Close()
        stream.Write([]byte{&#039;R&#039;, &#039;A&#039;, &#039;N&#039;, &#039;D&#039;})
        buffer := make([]byte, 1024)
        n, err := stream.Read(buffer)
        if err != nil {
            w.WriteHeader(500)
            fmt.Fprint(w, err.Error())
        } else {
            response := buffer[:n]
            var rand = binary.BigEndian.Uint64(response)
            log.Debug().Msgf(&quot;收到服务端数据，流id：%d，随机数：%d， 响应数据：%v&quot;, stream.ID(), rand, response)
            fmt.Fprintf(w, &quot;%d&quot;, rand)
        }
    }
}</code></pre><p>原理分析<br>smux将socket连接封装成session，每次请求响应封装成一个stream，通过自定义协议发送数据</p><pre><code>VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)  

VALUES FOR LATEST VERSION:
VERSION:
    1/2
    
CMD:
    cmdSYN(0)
    cmdFIN(1)
    cmdPSH(2)
    cmdNOP(3)
    cmdUPD(4)   // only supported on version 2
    
STREAMID:
    client use odd numbers starts from 1
    server use even numbers starts from 0
    
cmdUPD:
    | CONSUMED(4B) | WINDOW(4B) |</code></pre><p>比如我们发送的RAND命令封装成以下数据包发送给服务端，假设请求的STREAMID为11223344</p><pre><code>VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | RAND
VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | 0102030405060708</code></pre><p>扩展优化<br>但是这样又导致了另一个问题，由于apiservice和randservice之间只有一个连接，而这一个连接只能由一个goroutine处理，这样就导致性能低下<br>所以进一步扩展apiservice和randservice之间建立固定数量的连接，如10个连接，用来处理所有的请求，就是通过连接池的方式来性能最大化</p><p>改造后的示意图如下：</p><pre><code>               n link                  10 link
+-----------+          +-------------+           +---------------+
|           &lt;----------&gt;             &lt;-----------&gt;               |
|  client   &lt;----------&gt;  apiservice &lt;-----------&gt;  randservice  |
|           &lt;----------&gt;             &lt;-----------&gt;               |
+-----------+          +-------------+           +---------------+</code></pre><p>连接池版代码 apiservicewithpool.go</p><pre><code>package main

import (
    &quot;context&quot;
    &quot;encoding/binary&quot;
    &quot;fmt&quot;
    cpool &quot;github.com/jolestar/go-commons-pool/v2&quot;
    &quot;github.com/rs/zerolog&quot;
    &quot;github.com/rs/zerolog/log&quot;
    &quot;github.com/xtaci/smux&quot;
    &quot;net&quot;
    &quot;net/http&quot;
    &quot;runtime&quot;
)

var commonPool *cpool.ObjectPool
var ctx = context.Background()

func init() {
    factory := cpool.NewPooledObjectFactorySimple(NewSessionCpool)

    commonPool = cpool.NewObjectPoolWithDefaultConfig(ctx, factory)
    commonPool.Config.MaxTotal = 10
}

/**
连接池生成新会话函数
*/
func NewSessionCpool(ctx context.Context) (interface{}, error) {
    log.Debug().Msg(&quot;连接池中生成一个连接&quot;)
    //连接后端随机数服务
    conn, err := net.Dial(&quot;tcp&quot;, &quot;:9000&quot;)
    if err != nil {
        log.Warn().Msg(&quot;随机数服务未启动&quot;)
        panic(err)
    }
    //随机数服务客户端连接
    session, err := smux.Client(conn, nil)
    if err != nil {
        log.Error().Msg(&quot;打开会话失败&quot;)
        panic(err)
    }
    return session, err
}

/**
一个api网关，对外提供api接口
调用随机数服务来获取随机数

通过sync.Pool实现“连接池” ！！！ 不推荐这种方式，sync.Pool的种种特性不适合作为连接池
*/
func main() {
    http.HandleFunc(&quot;/rand&quot;, CommonPoolRandHandler)
    http.ListenAndServe(&quot;:8080&quot;, nil)
}

/**
随机数接口
*/
func CommonPoolRandHandler(w http.ResponseWriter, r *http.Request) {
    obj, err := commonPool.BorrowObject(ctx)
    if err != nil {
        w.WriteHeader(500)
        fmt.Fprint(w, err.Error())
        return
    }
    client := obj.(*smux.Session)
    stream, err := client.OpenStream()
    if err != nil {
        w.WriteHeader(500)
        fmt.Fprint(w, err.Error())
    } else {
        log.Debug().Msgf(&quot;收到请求，打开流成功，流id：%d&quot;, stream.ID())
        defer stream.Close()
        stream.Write([]byte{&#039;R&#039;, &#039;A&#039;, &#039;N&#039;, &#039;D&#039;})
        buffer := make([]byte, 1024)
        n, err := stream.Read(buffer)
        if err != nil {
            w.WriteHeader(500)
            fmt.Fprint(w, err.Error())
        } else {
            response := buffer[:n]
            var rand = binary.BigEndian.Uint64(response)
            log.Debug().Msgf(&quot;收到服务端数据，流id：%d，随机数：%d， 响应数据：%v&quot;, stream.ID(), rand, response)
            fmt.Fprintf(w, &quot;%d&quot;, rand)
        }
    }
    commonPool.ReturnObject(ctx, obj)
}</code></pre><p>经过连接池改造后的模型就像MySQL或Redis的使用场景，每次请求相当于一个stream，多个stream共用一个session，一个session背后有一个socket连接，程序和MySQL或Redis之间创建多个session放入连接池中，每次请求从连接池中拿出session进行读写操作</p>
]]></content:encoded>
<slash:comments>0</slash:comments>
<comments>https://dwt.life/archives/139/#comments</comments>
<wfw:commentRss>https://dwt.life/feed/tag/go/</wfw:commentRss>
</item>
<item>
<title>Go 信号阻塞</title>
<link>https://dwt.life/archives/138/</link>
<guid>https://dwt.life/archives/138/</guid>
<pubDate>Thu, 21 Oct 2021 23:43:06 +0800</pubDate>
<dc:creator>Ricky</dc:creator>
<description><![CDATA[package mainimport (&quot;fmt&quot;&quot;os&quot;&quot;os/signal&quot;&quot;syscall&quot;)// 监听指定...]]></description>
<content:encoded xml:lang="zh-CN"><![CDATA[
<pre><code class="lang-go">package main

import (
&quot;fmt&quot;
&quot;os&quot;
&quot;os/signal&quot;
&quot;syscall&quot;
)

// 监听指定信号
func main() {
//合建chan
c := make(chan os.Signal)
//监听指定信号 ctrl+c kill
signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGUSR1, syscall.SIGUSR2)
//阻塞直到有信号传入
fmt.Println(&quot;启动&quot;)
//阻塞直至有信号传入
s := &lt;-c
fmt.Println(&quot;退出信号&quot;, s)
}</code></pre>
]]></content:encoded>
<slash:comments>0</slash:comments>
<comments>https://dwt.life/archives/138/#comments</comments>
<wfw:commentRss>https://dwt.life/feed/tag/go/</wfw:commentRss>
</item>
<item>
<title>tcp之间传输文件go实现</title>
<link>https://dwt.life/archives/11/</link>
<guid>https://dwt.life/archives/11/</guid>
<pubDate>Tue, 29 Jun 2021 15:03:00 +0800</pubDate>
<dc:creator>Ricky</dc:creator>
<description><![CDATA[服务端package mainimport (    &quot;io&quot;    &quot;os&quot;    &quot;net&quot;    &quot;fmt&quot;...]]></description>
<content:encoded xml:lang="zh-CN"><![CDATA[
<h1>服务端</h1><pre><code>package main

import (
    &quot;io&quot;
    &quot;os&quot;
    &quot;net&quot;
    &quot;fmt&quot; 
)
func revFile (fileName string, conn net.Conn) {
    defer conn.Close()
    fs,err := os.Create(fileName)
    defer fs.Close()
    if err != nil {
        fmt.Println(&quot;os.Create err =&quot;,err)
        return
    }

    // 拿到数据
    buf := make([]byte ,1024*10)
    for {
        n,err := conn.Read(buf)
        if err != nil {
            fmt.Println(&quot;conn.Read err =&quot;,err)
            if err == io.EOF {
                fmt.Println(&quot;文件结束了&quot;,err)
            }
            return
        }
        if n == 0 {
            fmt.Println(&quot;文件结束了&quot;,err)
            return
        }
         fs.Write(buf[:n])

    }
}
func main ( ) {
    // 创建一个服务器
    Server,err := net.Listen(&quot;tcp&quot;,&quot;192.168.1.11:8000&quot;)
    if err != nil {
        fmt.Println(&quot;net.Listen err =&quot;,err)
        return
    }
    defer Server.Close()
    // 接受文件名
    for {
        conn, err := Server.Accept()
        defer conn.Close()
        if err != nil {
            fmt.Println(&quot;Server.Accept err =&quot;,err)
            return
        }
        buf := make([]byte ,1024)
        n,err1 := conn.Read(buf)
        if err1 != nil {
            fmt.Println(&quot;conn.Read err =&quot;, err1)
            return
        }
        // 拿到了文件的名字
        fileName := string(buf[:n])
        // 返回ok
        conn.Write([]byte (&quot;ok&quot;))
        // 接收文件,
        revFile(fileName,conn)


    }
}</code></pre><h1>客户端</h1><pre><code>package main

import (
    &quot;net&quot;
    &quot;fmt&quot;
    &quot;os&quot;
)
func sendFile (path string,conn net.Conn) {
    defer conn.Close()
     fs,err :=  os.Open(path)
     defer fs.Close()
 if err != nil {
    fmt.Println(&quot;os.Open err = &quot;, err)
    return
 }
    buf := make([]byte,1024*10)
    for{
        //  打开之后读取文件
        n, err1:= fs.Read(buf)
        if err1 != nil {
            fmt.Println(&quot;fs.Open err = &quot;, err1)
            return
        }
    
        //  发送文件
        conn.Write(buf[:n])
    }

}
func main ( ) {
    for {
        
    fmt.Println(&quot;请输入一个全路径的文件,比如,D:\\a.jpg&quot;)
    //  获取命令行参数
    var path string 
    fmt.Scan(&amp;path)
    // 获取文件名, 
    info,err := os.Stat(path);
    if err != nil {
        fmt.Println(&quot;os.Stat err = &quot;,err)
        return
    }
    // 发送文件名
    conn, err1 :=  net.Dial(&quot;tcp&quot;,&quot;192.168.1.11:8000&quot;)
    defer conn.Close()
    if err1 != nil {
        fmt.Println(&quot;net.Dial err = &quot;,err1)
        return
    }
    conn.Write([]byte (info.Name()))
    // 接受到是不是ok
    buf := make([]byte,1024)
    n , err2 := conn.Read(buf)
    if err2 != nil {
        fmt.Println(&quot;conn.Read err = &quot;,err2)
        return
    }
    if &quot;ok&quot; == string(buf[:n]) {
        fmt.Println(&quot;成功&quot;);
        sendFile(path,conn)
    } 
    // 如果是ok,那么开启一个连接,发送文件
    }
}</code></pre>
]]></content:encoded>
<slash:comments>0</slash:comments>
<comments>https://dwt.life/archives/11/#comments</comments>
<wfw:commentRss>https://dwt.life/feed/tag/go/</wfw:commentRss>
</item>
</channel>
</rss>