红茶的个人站点

  • 首页
  • 专栏
  • 开发工具
  • 其它
  • 隐私政策
Awalon
Talk is cheap,show me the code.
  1. 首页
  2. 专栏
  3. Go语言编程笔记
  4. 正文

Go语言编程笔记8:goroutine续

2021年11月24日 1148点热度 0人点赞 0条评论

image-20211108153040805

图源:wallpapercave.com

本篇笔记会讲解Go语言编程笔记7:goroutine和通道中剩余的内容。

文件统计程序

Linux上有一个程序du,可以很方便地计算指定目录的真实大小,《Go程序设计语言》一书中也有一个类似的示例:

package main
​
import (
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "path"
    "time"
)
​
var paramHuman = flag.Bool("h", false, "human show")
​
func main() {
    start := time.Now()
    flag.Parse()
    dirNames := flag.Args()
    if len(dirNames) == 0 {
        //如果没有指定目录,使用当前目录
        dirNames = []string{"."}
    }
    var allSize int64
    var allFileNums int
    for _, name := range dirNames {
        size, fileNums := getDirSize(name)
        allSize += size
        allFileNums += fileNums
    }
    printResult(allSize, allFileNums, *paramHuman)
    end := time.Now()
    usedTimes := end.Sub(start)
    fmt.Printf("used %.2f s.\n", usedTimes.Seconds())
}
​
func printResult(size int64, fileNums int, human bool) {
    if human {
        humanSize, humanUnit := humanByteSize(size)
        fmt.Printf("total %d %s,%d files.\n", humanSize, humanUnit, fileNums)
        return
    }
    fmt.Printf("total %d byte, %d files.\n", size, fileNums)
}
​
func humanByteSize(byteSize int64) (humanSize int64, humanUnit string) {
    const kb = 1024
    const mb = 1024 * kb
    const gb = 1024 * mb
    if byteSize > gb {
        humanSize = byteSize / gb
        humanUnit = "G"
    } else if byteSize > mb {
        humanSize = byteSize / mb
        humanUnit = "M"
    } else if byteSize > kb {
        humanSize = byteSize / kb
        humanUnit = "K"
    } else {
        humanSize = byteSize
        humanUnit = "B"
    }
    return
}
​
func getDirSize(dir string) (size int64, fileNums int) {
    fis, err := ioutil.ReadDir(dir)
    if err != nil {
        log.Println(err)
    }
    for _, fi := range fis {
        if fi.IsDir() {
            subDirSize, subDirFn := getDirSize(path.Join(dir, fi.Name()))
            size += subDirSize
            fileNums += subDirFn
        } else {
            size += fi.Size()
            fileNums += 1
        }
    }
    return
}

这是我参考《Go程序设计语言》中的示例编写的。

这个示例中利用标准库flag从命令行获取参数,关于该库的详细说明可以阅读Go语言flag包:命令行参数解析。此外还通过ioutil.ReadDir函数读取目录信息。

仿照通常的Linux风格应用,我添加了一个-h参数,用于人性化输出。

可以编译该程序后将du.exe移动到某个目录下进行测试:

❯ .\du.exe -h
total 51 G,171687 files.
used 7.41 s.

下面我们用goroutine来实现并发,提升程序效率:

package main
​
import (
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "path"
    "sync"
    "time"
)
​
var paramHuman = flag.Bool("h", false, "human show")
​
func main() {
    start := time.Now()
    flag.Parse()
    dirNames := flag.Args()
    if len(dirNames) == 0 {
        //如果没有指定目录,使用当前目录
        dirNames = []string{"."}
    }
    var allSize int64
    var allFileNums int
    var fileScanWG sync.WaitGroup
    sizeChan := make(chan int64)
    gLimit := make(chan struct{}, 20)
    for _, name := range dirNames {
        fileScanWG.Add(1)
        go getDirSize(name, sizeChan, &fileScanWG, gLimit)
    }
    go func() {
        fileScanWG.Wait()
        close(sizeChan)
    }()
    for size := range sizeChan {
        allSize += size
        allFileNums += 1
    }
    printResult(allSize, allFileNums, *paramHuman)
    end := time.Now()
    usedTimes := end.Sub(start)
    fmt.Printf("used %.2f s.\n", usedTimes.Seconds())
}
​
func printResult(size int64, fileNums int, human bool) {
    if human {
        humanSize, humanUnit := humanByteSize(size)
        fmt.Printf("total %d %s,%d files.\n", humanSize, humanUnit, fileNums)
        return
    }
    fmt.Printf("total %d byte, %d files.\n", size, fileNums)
}
​
func humanByteSize(byteSize int64) (humanSize int64, humanUnit string) {
    const kb = 1024
    const mb = 1024 * kb
    const gb = 1024 * mb
    if byteSize > gb {
        humanSize = byteSize / gb
        humanUnit = "G"
    } else if byteSize > mb {
        humanSize = byteSize / mb
        humanUnit = "M"
    } else if byteSize > kb {
        humanSize = byteSize / kb
        humanUnit = "K"
    } else {
        humanSize = byteSize
        humanUnit = "B"
    }
    return
}
​
func getDirSize(dir string, sizeChan chan int64, fileScanWG *sync.WaitGroup, gLimit chan struct{}) {
    defer fileScanWG.Done()
    gLimit <- struct{}{}
    fis, err := ioutil.ReadDir(dir)
    <-gLimit
    if err != nil {
        log.Println(err)
    }
    for _, fi := range fis {
        if fi.IsDir() {
            fileScanWG.Add(1)
            go getDirSize(path.Join(dir, fi.Name()), sizeChan, fileScanWG, gLimit)
        } else {
            sizeChan <- fi.Size()
        }
    }
}

改写过程分为3步:

  1. 改用goroutine调用负责读取目录的递归函数getDirSize,这也正是系统的瓶颈所在,是可以用并发来改写的部分。当然,改写后就无法通过return来返回结果了,改用一个记录文件容量的通道。

  2. 添加一个sync.WaitGroup来追踪goroutine的开启和关闭,并用额外的goroutine调用fileScanWG.wait()来等待所有goroutine调用结束后关闭通道。这样主goroutine就可以通过遍历通道来收集结果。

  3. 当前程序将试目录下子项目的情况,可能瞬间启动上千个goroutine,这本身没有什么问题,Go语言会自行管理并完成goroutine调度。但如果goroutine会操作某些“限制性的资源”,比如网络请求或者打开文件,过多的goroutine并发就会直接导致某些资源被拖垮。所以我们需要进行限制,就像是Python的futures包中使用线程池那样,对限制性资源的使用进行限制。方法也不难想到,就是利用一个有限的缓冲通道,该缓冲通道的容量相当于限制性资源的最大并发使用数目(也可以简单当作是线程池),我们只要在使用限制性资源前尝试往该通道写入数据,使用完资源后从通道中读取即可。这就相当于是获取了一个使用限制性资源的令牌。当然,也可以先初始化用作令牌的通道,填满数据,在使用的时候先读取再写入,效果是一样的,不过多了一步填写数据的操作。

实际测试:

❯ .\du.exe -h
total 51 G,171687 files.
used 3.89 s.

中止goroutine

之前在Go语言编程笔记7:goroutine和通道中的火箭发射倒计时示例程序,我们看到了如何通过一个额外的通道来通知另一个goroutine中止运行。但如果是需要停止多个goroutine,就没法用类似的方式,因为读取通道的行为会从通道中取出数据,这就意味着我们需要往通道中填入正在运行的goroutine数量的数据才能关闭所有goroutine,但是这在某些时候是不可能的,因为就像上面那个文件统计程序那样,核心的并发程序可能是递归调用,goroutine是不断生成的,我们是没法获取当前有多少goroutine正在运行的。

此时我们可以换个思路,同样是通过通道来发送关闭信息,但并不需要写入数据,而是直接关闭。因为通道关闭后,再尝试读取通道就不会阻塞,我们可利用这个特性来判断是否应当关闭goroutine:

package main
​
import (
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "path"
    "sync"
    "time"
)
​
var paramHuman = flag.Bool("h", false, "human show")
​
func main() {
    start := time.Now()
    flag.Parse()
    dirNames := flag.Args()
    if len(dirNames) == 0 {
        //如果没有指定目录,使用当前目录
        dirNames = []string{"."}
    }
    var allSize int64
    var allFileNums int
    var fileScanWG sync.WaitGroup
    var fsCloseChan = make(chan struct{}) //file scan close chan
    sizeChan := make(chan int64)
    gLimit := make(chan struct{}, 20)
    go func() {
        os.Stdin.Read(make([]byte, 1))
        close(fsCloseChan)
    }()
    for _, name := range dirNames {
        fileScanWG.Add(1)
        go getDirSize(name, sizeChan, &fileScanWG, gLimit, fsCloseChan)
    }
    go func() {
        fileScanWG.Wait()
        close(sizeChan)
    }()
loop:
    for {
        select {
        case size, ok := <-sizeChan:
            if !ok {
                break loop
            }
            allSize += size
            allFileNums += 1
        case <-fsCloseChan:
            //消耗正在等待返回结果的goroutine以正常终止
            for range sizeChan {
            }
            fmt.Println("program is closed.")
            return
        }
    }
    printResult(allSize, allFileNums, *paramHuman)
    end := time.Now()
    usedTimes := end.Sub(start)
    fmt.Printf("used %.2f s.\n", usedTimes.Seconds())
}
​
func isFileScanClosed(fsCloseChan chan struct{}) bool {
    select {
    case <-fsCloseChan:
        //通道关闭,表示应当关闭所有并发任务
        return true
    default:
        //通道没有关闭
        return false
    }
}
​
func printResult(size int64, fileNums int, human bool) {
    if human {
        humanSize, humanUnit := humanByteSize(size)
        fmt.Printf("total %d %s,%d files.\n", humanSize, humanUnit, fileNums)
        return
    }
    fmt.Printf("total %d byte, %d files.\n", size, fileNums)
}
​
func humanByteSize(byteSize int64) (humanSize int64, humanUnit string) {
    const kb = 1024
    const mb = 1024 * kb
    const gb = 1024 * mb
    if byteSize > gb {
        humanSize = byteSize / gb
        humanUnit = "G"
    } else if byteSize > mb {
        humanSize = byteSize / mb
        humanUnit = "M"
    } else if byteSize > kb {
        humanSize = byteSize / kb
        humanUnit = "K"
    } else {
        humanSize = byteSize
        humanUnit = "B"
    }
    return
}
​
func getDirSize(dir string, sizeChan chan int64, fileScanWG *sync.WaitGroup, gLimit chan struct{}, fsCloseChan chan struct{}) {
    defer fileScanWG.Done()
    if isFileScanClosed(fsCloseChan) {
        return
    }
    gLimit <- struct{}{}
    fis, err := ioutil.ReadDir(dir)
    <-gLimit
    if err != nil {
        log.Println(err)
        return
    }
    for _, fi := range fis {
        if fi.IsDir() {
            fileScanWG.Add(1)
            go getDirSize(path.Join(dir, fi.Name()), sizeChan, fileScanWG, gLimit, fsCloseChan)
        } else {
            sizeChan <- fi.Size()
        }
    }
}

需要做的修改有以下几处:

  1. 在主goroutine中创建一个用于“广播”程序关闭的通道,并创建一个goroutine来监控键盘输入,以在用户键入字符后关闭该通道来广播关闭信息。

  2. 修改主gouroutine中的结果收集部分,利用多路复用来检查是否有关闭信息产生,如果有,就在消耗已有goroutine后退出程序。

  3. 在主要的并发函数getDirSize中添加对关闭信号通道的检查,如果关闭了,就直接return,不需要继续扫描目录并产生子goroutine。

聊天室

我们可以利用多个通道构建一个多人网络聊天室,《Go程序设计语言》中就有这么一个例子:

package main
​
import (
    "bufio"
    "fmt"
    "log"
    "net"
)
​
func main() {
    listener, err := net.Listen("tcp", ":8000")
    if err != nil {
        log.Println(err)
        return
    }
    go createChatRoom()
    fmt.Println("chat room is created in localhost:8000")
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Println(err)
            continue
        }
        // fmt.Println("get a request from " + conn.RemoteAddr().String())
        go handleConn(conn)
    }
}
​
type clientChan chan string
​
var enterChan = make(chan clientChan)
var exitChan = make(chan clientChan)
var msgChan = make(chan string)
​
func createChatRoom() {
    clients := make(map[clientChan]bool)
    for {
        select {
        case newClientChan := <-enterChan:
            clients[newClientChan] = true
        case leavedClientChan := <-exitChan:
            delete(clients, leavedClientChan)
            close(leavedClientChan)
        case msg := <-msgChan:
            for cChan := range clients {
                cChan <- msg
            }
        }
    }
}
​
func handleConn(conn net.Conn) {
    defer conn.Close()
    // log.Println("handle " + conn.RemoteAddr().String())
    chatChan := make(clientChan)
    name := conn.RemoteAddr().String()
    msg := name + " is joined this chat rom."
    msgChan <- msg
    log.Println(msg)
    enterChan <- chatChan
    go writeClientChanToClient(conn, chatChan)
    readClientToMsgChan(conn, name)
    exitChan <- chatChan
    msg = name + " is leaved."
    msgChan <- msg
    log.Println(msg)
}
​
func readClientToMsgChan(conn net.Conn, name string) {
    sc := bufio.NewScanner(conn)
    for sc.Scan() {
        msgChan <- name + " said: " + sc.Text()
    }
}
​
func writeClientChanToClient(conn net.Conn, chatChan clientChan) {
    for msg := range chatChan {
        fmt.Fprintln(conn, msg)
    }
}

这里是我参考原书示例代码后实现的版本。该代码的核心内容是,由一个核心goroutine作为管理聊天室的和核心线程,该goroutine利用通道与其它处理客户端请求的goroutine来通信。所以这里核心的概念是代表客户端的通信通道,这里使用type clientChan chan string这个命名类型来表示。这里可以用“注册-注销”的机制来实现,使用两个chan clientChan类型的通道来作为注册和注销的通道,客户端goroutine通过这两个通道来发起“注册”或者“注销”。聊天室goroutine利用一个map来管理活动的客户端的通信通道。

其它部分的代码都很好理解,这里不做过多解释。

实际演示:

❯ .\qq.exe
chat room is created in localhost:8000
2021/11/24 17:05:04 [::1]:8109 is joined this chat rom.
2021/11/24 17:05:34 [::1]:8116 is joined this chat rom.
2021/11/24 17:06:32 [::1]:8116 is leaved.

客户端:

❯ .\netcat.exe
hello
[::1]:8116 said: hello
你好
[::1]:8116 said: 你好
bye
[::1]:8116 said: bye
^Z
2021/11/24 17:06:32 done
  • Windows下的字符终端可以通过ctrl+z表示终止字符,用来关闭通信。

  • 不知道什么缘故,无法使用nc在WSL下连接服务器,只能使用《Go程序设计语言》中的客户端代码编译的应用在Windows下通信,具体代码见netcat3。

就这样吧,谢谢阅读。

往期内容

  • Go语言编程笔记7:goroutine和通道

  • Go语言编程笔记6:接口

  • Go语言编程笔记5:函数

  • Go语言编程笔记4:结构体和切片

  • Go语言编程笔记3:控制流

  • Go语言编程笔记2:变量

  • Go语言编程笔记1:Hello World

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: Go语言
最后更新:2021年11月24日

魔芋红茶

加一点PHP,加一点Go,加一点Python......

点赞
< 上一篇
下一篇 >

文章评论

取消回复

*

code

COPYRIGHT © 2021 icexmoon.cn. ALL RIGHTS RESERVED.
本网站由提供CDN加速/云存储服务

Theme Kratos Made By Seaton Jiang

宁ICP备2021001508号

宁公网安备64040202000141号