图源:
本篇笔记会讲解中剩余的内容。
文件统计程序
Linux上有一个程序du
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"path"
"time"
)
var paramHuman = flag.Bool("h", false, "human show")
func main() {
start := time.Now()
flag.Parse()
dirNames := flag.Args()
if len(dirNames) == 0 {
//如果没有指定目录,使用当前目录
dirNames = []string{"."}
}
var allSize int64
var allFileNums int
for _, name := range dirNames {
size, fileNums := getDirSize(name)
allSize += size
allFileNums += fileNums
}
printResult(allSize, allFileNums, *paramHuman)
end := time.Now()
usedTimes := end.Sub(start)
fmt.Printf("used %.2f s.\n", usedTimes.Seconds())
}
func printResult(size int64, fileNums int, human bool) {
if human {
humanSize, humanUnit := humanByteSize(size)
fmt.Printf("total %d %s,%d files.\n", humanSize, humanUnit, fileNums)
return
}
fmt.Printf("total %d byte, %d files.\n", size, fileNums)
}
func humanByteSize(byteSize int64) (humanSize int64, humanUnit string) {
const kb = 1024
const mb = 1024 * kb
const gb = 1024 * mb
if byteSize > gb {
humanSize = byteSize / gb
humanUnit = "G"
} else if byteSize > mb {
humanSize = byteSize / mb
humanUnit = "M"
} else if byteSize > kb {
humanSize = byteSize / kb
humanUnit = "K"
} else {
humanSize = byteSize
humanUnit = "B"
}
return
}
func getDirSize(dir string) (size int64, fileNums int) {
fis, err := ioutil.ReadDir(dir)
if err != nil {
log.Println(err)
}
for _, fi := range fis {
if fi.IsDir() {
subDirSize, subDirFn := getDirSize(path.Join(dir, fi.Name()))
size += subDirSize
fileNums += subDirFn
} else {
size += fi.Size()
fileNums += 1
}
}
return
}
这是我参考《Go程序设计语言》中的示例编写的。
这个示例中利用标准库flag
从命令行获取参数,关于该库的详细说明可以阅读。此外还通过ioutil.ReadDir
函数读取目录信息。
仿照通常的Linux
风格应用,我添加了一个-h
参数,用于人性化输出。
可以编译该程序后将du.exe
移动到某个目录下进行测试:
❯ .\du.exe -h
total 51 G,171687 files.
used 7.41 s.
下面我们用goroutine来实现并发,提升程序效率:
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"path"
"sync"
"time"
)
var paramHuman = flag.Bool("h", false, "human show")
func main() {
start := time.Now()
flag.Parse()
dirNames := flag.Args()
if len(dirNames) == 0 {
//如果没有指定目录,使用当前目录
dirNames = []string{"."}
}
var allSize int64
var allFileNums int
var fileScanWG sync.WaitGroup
sizeChan := make(chan int64)
gLimit := make(chan struct{}, 20)
for _, name := range dirNames {
fileScanWG.Add(1)
go getDirSize(name, sizeChan, &fileScanWG, gLimit)
}
go func() {
fileScanWG.Wait()
close(sizeChan)
}()
for size := range sizeChan {
allSize += size
allFileNums += 1
}
printResult(allSize, allFileNums, *paramHuman)
end := time.Now()
usedTimes := end.Sub(start)
fmt.Printf("used %.2f s.\n", usedTimes.Seconds())
}
func printResult(size int64, fileNums int, human bool) {
if human {
humanSize, humanUnit := humanByteSize(size)
fmt.Printf("total %d %s,%d files.\n", humanSize, humanUnit, fileNums)
return
}
fmt.Printf("total %d byte, %d files.\n", size, fileNums)
}
func humanByteSize(byteSize int64) (humanSize int64, humanUnit string) {
const kb = 1024
const mb = 1024 * kb
const gb = 1024 * mb
if byteSize > gb {
humanSize = byteSize / gb
humanUnit = "G"
} else if byteSize > mb {
humanSize = byteSize / mb
humanUnit = "M"
} else if byteSize > kb {
humanSize = byteSize / kb
humanUnit = "K"
} else {
humanSize = byteSize
humanUnit = "B"
}
return
}
func getDirSize(dir string, sizeChan chan int64, fileScanWG *sync.WaitGroup, gLimit chan struct{}) {
defer fileScanWG.Done()
gLimit <- struct{}{}
fis, err := ioutil.ReadDir(dir)
<-gLimit
if err != nil {
log.Println(err)
}
for _, fi := range fis {
if fi.IsDir() {
fileScanWG.Add(1)
go getDirSize(path.Join(dir, fi.Name()), sizeChan, fileScanWG, gLimit)
} else {
sizeChan <- fi.Size()
}
}
}
改写过程分为3步:
-
改用goroutine调用负责读取目录的递归函数
getDirSize
,这也正是系统的瓶颈所在,是可以用并发来改写的部分。当然,改写后就无法通过return
来返回结果了,改用一个记录文件容量的通道。 -
添加一个
sync.WaitGroup
来追踪goroutine的开启和关闭,并用额外的goroutine调用fileScanWG.wait()
来等待所有goroutine调用结束后关闭通道。这样主goroutine就可以通过遍历通道来收集结果。 -
当前程序将试目录下子项目的情况,可能瞬间启动上千个goroutine,这本身没有什么问题,Go语言会自行管理并完成goroutine调度。但如果goroutine会操作某些“限制性的资源”,比如网络请求或者打开文件,过多的goroutine并发就会直接导致某些资源被拖垮。所以我们需要进行限制,就像是Python的futures包中使用线程池那样,对限制性资源的使用进行限制。方法也不难想到,就是利用一个有限的缓冲通道,该缓冲通道的容量相当于限制性资源的最大并发使用数目(也可以简单当作是线程池),我们只要在使用限制性资源前尝试往该通道写入数据,使用完资源后从通道中读取即可。这就相当于是获取了一个使用限制性资源的令牌。当然,也可以先初始化用作令牌的通道,填满数据,在使用的时候先读取再写入,效果是一样的,不过多了一步填写数据的操作。
实际测试:
❯ .\du.exe -h
total 51 G,171687 files.
used 3.89 s.
中止goroutine
之前在中的火箭发射倒计时示例程序,我们看到了如何通过一个额外的通道来通知另一个goroutine中止运行。但如果是需要停止多个goroutine,就没法用类似的方式,因为读取通道的行为会从通道中取出数据,这就意味着我们需要往通道中填入正在运行的goroutine数量的数据才能关闭所有goroutine,但是这在某些时候是不可能的,因为就像上面那个文件统计程序那样,核心的并发程序可能是递归调用,goroutine是不断生成的,我们是没法获取当前有多少goroutine正在运行的。
此时我们可以换个思路,同样是通过通道来发送关闭信息,但并不需要写入数据,而是直接关闭。因为通道关闭后,再尝试读取通道就不会阻塞,我们可利用这个特性来判断是否应当关闭goroutine:
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path"
"sync"
"time"
)
var paramHuman = flag.Bool("h", false, "human show")
func main() {
start := time.Now()
flag.Parse()
dirNames := flag.Args()
if len(dirNames) == 0 {
//如果没有指定目录,使用当前目录
dirNames = []string{"."}
}
var allSize int64
var allFileNums int
var fileScanWG sync.WaitGroup
var fsCloseChan = make(chan struct{}) //file scan close chan
sizeChan := make(chan int64)
gLimit := make(chan struct{}, 20)
go func() {
os.Stdin.Read(make([]byte, 1))
close(fsCloseChan)
}()
for _, name := range dirNames {
fileScanWG.Add(1)
go getDirSize(name, sizeChan, &fileScanWG, gLimit, fsCloseChan)
}
go func() {
fileScanWG.Wait()
close(sizeChan)
}()
loop:
for {
select {
case size, ok := <-sizeChan:
if !ok {
break loop
}
allSize += size
allFileNums += 1
case <-fsCloseChan:
//消耗正在等待返回结果的goroutine以正常终止
for range sizeChan {
}
fmt.Println("program is closed.")
return
}
}
printResult(allSize, allFileNums, *paramHuman)
end := time.Now()
usedTimes := end.Sub(start)
fmt.Printf("used %.2f s.\n", usedTimes.Seconds())
}
func isFileScanClosed(fsCloseChan chan struct{}) bool {
select {
case <-fsCloseChan:
//通道关闭,表示应当关闭所有并发任务
return true
default:
//通道没有关闭
return false
}
}
func printResult(size int64, fileNums int, human bool) {
if human {
humanSize, humanUnit := humanByteSize(size)
fmt.Printf("total %d %s,%d files.\n", humanSize, humanUnit, fileNums)
return
}
fmt.Printf("total %d byte, %d files.\n", size, fileNums)
}
func humanByteSize(byteSize int64) (humanSize int64, humanUnit string) {
const kb = 1024
const mb = 1024 * kb
const gb = 1024 * mb
if byteSize > gb {
humanSize = byteSize / gb
humanUnit = "G"
} else if byteSize > mb {
humanSize = byteSize / mb
humanUnit = "M"
} else if byteSize > kb {
humanSize = byteSize / kb
humanUnit = "K"
} else {
humanSize = byteSize
humanUnit = "B"
}
return
}
func getDirSize(dir string, sizeChan chan int64, fileScanWG *sync.WaitGroup, gLimit chan struct{}, fsCloseChan chan struct{}) {
defer fileScanWG.Done()
if isFileScanClosed(fsCloseChan) {
return
}
gLimit <- struct{}{}
fis, err := ioutil.ReadDir(dir)
<-gLimit
if err != nil {
log.Println(err)
return
}
for _, fi := range fis {
if fi.IsDir() {
fileScanWG.Add(1)
go getDirSize(path.Join(dir, fi.Name()), sizeChan, fileScanWG, gLimit, fsCloseChan)
} else {
sizeChan <- fi.Size()
}
}
}
需要做的修改有以下几处:
-
在主goroutine中创建一个用于“广播”程序关闭的通道,并创建一个goroutine来监控键盘输入,以在用户键入字符后关闭该通道来广播关闭信息。
-
修改主gouroutine中的结果收集部分,利用多路复用来检查是否有关闭信息产生,如果有,就在消耗已有goroutine后退出程序。
-
在主要的并发函数
getDirSize
中添加对关闭信号通道的检查,如果关闭了,就直接return
,不需要继续扫描目录并产生子goroutine
。
聊天室
我们可以利用多个通道构建一个多人网络聊天室,《Go程序设计语言》中就有这么一个例子:
package main
import (
"bufio"
"fmt"
"log"
"net"
)
func main() {
listener, err := net.Listen("tcp", ":8000")
if err != nil {
log.Println(err)
return
}
go createChatRoom()
fmt.Println("chat room is created in localhost:8000")
for {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
continue
}
// fmt.Println("get a request from " + conn.RemoteAddr().String())
go handleConn(conn)
}
}
type clientChan chan string
var enterChan = make(chan clientChan)
var exitChan = make(chan clientChan)
var msgChan = make(chan string)
func createChatRoom() {
clients := make(map[clientChan]bool)
for {
select {
case newClientChan := <-enterChan:
clients[newClientChan] = true
case leavedClientChan := <-exitChan:
delete(clients, leavedClientChan)
close(leavedClientChan)
case msg := <-msgChan:
for cChan := range clients {
cChan <- msg
}
}
}
}
func handleConn(conn net.Conn) {
defer conn.Close()
// log.Println("handle " + conn.RemoteAddr().String())
chatChan := make(clientChan)
name := conn.RemoteAddr().String()
msg := name + " is joined this chat rom."
msgChan <- msg
log.Println(msg)
enterChan <- chatChan
go writeClientChanToClient(conn, chatChan)
readClientToMsgChan(conn, name)
exitChan <- chatChan
msg = name + " is leaved."
msgChan <- msg
log.Println(msg)
}
func readClientToMsgChan(conn net.Conn, name string) {
sc := bufio.NewScanner(conn)
for sc.Scan() {
msgChan <- name + " said: " + sc.Text()
}
}
func writeClientChanToClient(conn net.Conn, chatChan clientChan) {
for msg := range chatChan {
fmt.Fprintln(conn, msg)
}
}
这里是我参考原书示例代码后实现的版本。该代码的核心内容是,由一个核心goroutine作为管理聊天室的和核心线程,该goroutine利用通道与其它处理客户端请求的goroutine来通信。所以这里核心的概念是代表客户端的通信通道,这里使用type clientChan chan string
这个命名类型来表示。这里可以用“注册-注销”的机制来实现,使用两个chan clientChan
类型的通道来作为注册和注销的通道,客户端goroutine通过这两个通道来发起“注册”或者“注销”。聊天室goroutine利用一个map
来管理活动的客户端的通信通道。
其它部分的代码都很好理解,这里不做过多解释。
实际演示:
❯ .\qq.exe
chat room is created in localhost:8000
2021/11/24 17:05:04 [::1]:8109 is joined this chat rom.
2021/11/24 17:05:34 [::1]:8116 is joined this chat rom.
2021/11/24 17:06:32 [::1]:8116 is leaved.
客户端:
❯ .\netcat.exe
hello
[::1]:8116 said: hello
你好
[::1]:8116 said: 你好
bye
[::1]:8116 said: bye
^Z
2021/11/24 17:06:32 done
Windows
下的字符终端可以通过ctrl+z
表示终止字符,用来关闭通信。不知道什么缘故,无法使用
nc
在WSL下连接服务器,只能使用《Go程序设计语言》中的客户端代码编译的应用在Windows下通信,具体代码见。
就这样吧,谢谢阅读。
文章评论