查看: 69|回复: 0

go源码解析之TCP连接(一)——Listen

[复制链接]

6

主题

8

帖子

21

积分

新手上路

Rank: 1

积分
21
发表于 2023-7-17 11:00:59 | 显示全部楼层 |阅读模式
go源码解析之TCP连接系列基于go源码1.16.5
端口是如何监听的

首先奉上net文档中第一个映入眼帘的example
ln, err := net.Listen("tcp", ":8080")
if err != nil {
        // handle error
}
for {
        conn, err := ln.Accept()
        if err != nil {
                // handle error
        }
        go handleConnection(conn)
}
下面我们通过逐行跟踪源码,来看开启监听的过程:
1. net.Listen

src\net\dial.go
func Listen(network, address string) (Listener, error) {
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}
这个监听方法,其中network可以是tcp、tcp4、tcp6、unix、unixpacket,我们通常传入tcp即代表监听tcp连接,包括ipv4和ipv6,其他类型不在我们的介绍范围,包括udp本文也不讨论。address是监听的地址,ip:port格式,如果不指定port,将由系统自动分配一个端口。
ListenConfig的struct体如下:
src\net\dial.go
type ListenConfig struct {
    Control func(network, address string, c syscall.RawConn) error
    KeepAlive time.Duration
}
其中Control是一个方法变量,根据注释,这个方法会在连接创建之后并将连接绑定到操作系统之前调用,相当于是提供给用户层的一个连接创建的回调方法,至于它的用处和调用时机,随着后续更深层的代码分析再做进一步介绍。
KeepAlive,应该和内核参数/proc/sys/net/ipv4/tcp_keepalive_time、tcp_keepalive_intvl、tcp_keepalive_probes是相同的作用,但是根据注释说明,0是开启,负数是关闭,没有说明正数的作用。后续用到再研究。
2.ListenConfig的Listen方法

src\net\dial.go
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
    ...
        sl := &sysListener{
        ListenConfig: *lc,
        network:      network,
        address:      address,
    }
    var l Listener
    la := addrs.first(isIPv4)
    switch la := la.(type) {
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)
        ...
    }
    ...
    return l, nil
}
其中…代表省略的一些细节处理或者是无关分支,后续也都会以这种方式贴代码。
ListenConfig的Listen方法同样是传入了network和address,ctx是上层传入的context.Background()。返回值是Listener类型和error,其中的Listener其实是一个接口类型,具体接口定义如下:
src\net\net.go
type Listener interface {
    Accept() (Conn, error) //等待并返回建立成功的连接
    Close() error //关闭监听
    Addr() Addr //监听地址
}
我们再看ListenConfig的Listen方法的逻辑,第一行对传入的地址进行了解析,转换成了下层可用的地址格式。紧接着生成了一个sysListener的变量,sysListener的作用很简单,它的存在就是为了构造各种类型的实现了Listener接口的监听器,因此它的所有的方法都是listenXXX,XXX则代表网络协议类型,例如这里的listenTCP,还有listenUDP等等。
sysListener.listenTCP

继续看代码,下面的switch case我们不管,直接看case是TCPAddr的情况,调用了sysListener的listenTCP方法,方法中代码如下:
src\net\tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
可见sysListener构造了一个TCPListener并返回,看一下internetSocket,internetSocket的作用是创建一个socket,TCPListener将使用这个socket来监听端口接收连接,下面看具体代码:
src\net\ipsock_posix.go
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
        raddr = raddr.toLocal(net)
    }
    family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
这个方法的参数可真长,我们对照方法调用一个个看一下:

  • 参数1,ctx不说了
  • 参数2,net,是我们最初传入的network,即网络协议类型,tcp、udp等
  • 参数3,laddr是local address的缩写,即本地地址。我们构建Listener需要传入本地地址
  • 参数4,raddr是remoe address的缩写,即远端地址。构建Listener不需要远端地址,当连接到远端时需要raddr
  • 参数5,sotype,传入了syscall.SOCK_STREAM即代表进行tcp监听,与之对应的是SOCK_DGRAM
  • 参数6,proto,默认0。
  • 参数7,mode,传入了listen,代表要建立的socket是监听socket
  • 参数8,ctrlFn,这里就是上面ListenConfig的Controller属性
方法的第一部分还是地址转换,第二部分的favoriteAddrFamily方法则是返回了支持的协议簇(AF_INET或者AF_INET6,代表了ipv4和ipv6),第三部分则是socket方法的调用,它的入参和internetSocket的基本一致,返回值是*netFD,而netFD则是对系统文件描述符(socket也有一个唯一的文件描述符fd与之对应)的包装,下面我们看下socket方法中是怎么创建netFD的:
socket

src\net\sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }

    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
                ...
    }
    if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
        fd.Close()
        return nil, err
    }
    return fd, nil
}
我们从上到下介绍每个方法调用的作用:

  • sysSocket,顾名思义,它的作用是创建系统socket
  • setDefaultSockopts,设置了socket的一些属性,例如是否只支持ipv6
  • newFD,对返回的系统fd进行了包装,生成了本方法要返回的netFD
  • if laddr != nil && raddr == nil,如果传入了本地地址,没有传入远端地址,则认为新的socket是用来监听的,调用了netFD的listenStream进行端口绑定,可以看到这里将ctrlFn(ListenConfig的Controller属性)又一次传入,那么ListenConfig的Controller方法属性是在socket创建之后执行的,具体在什么操作之前,还需要进一步跟代码。
  • fd.dial,是传入了远端地址的情况,则认为新的socket是用来connect的,dial进行了连接。
一个tcp的监听socket创建完成、进行了端口绑定,并将此socket的fd包装成了netFD返回给调用者,沿着调用链一直向上返回到sysListener的listenTCP方法,为方便大家查看,将上面贴过的代码再次贴到这里:
src\net\tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
中场小结

在继续深入sysSocket、setDefaultSockopts、newFD、listenStream几个方法之前,我们现在通过一张图来回顾一下前面的调用过程


到此为止,整个逻辑除了最下层的socket方法中略显复杂,其他每个方法体都很小,但是调用链路还是比较长,我们来简单总结下每一层的代码设计。

  • net.Listen是整个链路的入口方法,它创建了一个空的ListenConfig,并调用了ListenConfig的Listen方法
  • ListenConfig,它目前拥有两个可选配置项:Control和KeepAlive。它将被作为配置数据传递给下游,设计成一个struct可以避免通过传参的方式传递很多配置
  • ListenConfig.Listen方法,将上层传入的字符串类型的address转换成下层使用的Addr数据,并通过判断network的类型调用sysListener的不同的listen方法(listenTCP、listenUDP等)
  • sysListener将ListenConfig、address、network作为自己的属性,并实现了各种network的listen方法
  • sysListener.listenTCP方法,调用internetSocket方法,并使用返回的netFD创建TCPListener
  • internetSocket方法,是一个创建监听socket和connect socket(dial方法主动发起连接)的共用方法
  • socket方法,是unixsock和ipsock的共用方法,它首先创建了socket并为socket设置默认属性,再将返回的fd包装成netFD,最后使用此socket绑定端口或者进行连接。
  • 最终将TCPListener返回给net.Listen的调用者,调用者可以调用TCPListener的Accept方法开始接受连接请求,这一部分将在下一篇中介绍。
下面继续介绍sysSocket、setDefaultSockopts、newFD、listenStream几个方法
sysSocket

老套路,先祭出代码:
src\net\sock_cloexec.go
func sysSocket(family, sotype, proto int) (int, error) {
    s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)

    ...

    return s, nil
}
中间省略部分是socketFunc报错后的容错处理,老版本内核由于不支持创建socket时设置SOCK_NONBLOCK或者SOCK_CLOEXEC,导致创建失败。省略部分进行了容错,先创建socket,再进行socket属性的设置。
在跟入socketFunc之前先介绍一下它的参数:

  • family是AF_INET或者AF_INET6,即ipv4或者ipv6
  • sotype是SOCK_STREAM或者SOCK_DGRAM,即tcp或者udp
  • SOCK_NONBLOCK是将socket设置为非阻塞
  • SOCK_CLOEXEC是将socket设置为close-on-exec
  • proto默认0
socketFunc是一个全局的方法变量,它的值如下:
src\net\hook_unix.go
var (
    ...

    // Placeholders for socket system calls.
    socketFunc        func(int, int, int) (int, error)  = syscall.Socket
    connectFunc       func(int, syscall.Sockaddr) error = syscall.Connect
    listenFunc        func(int, int) error              = syscall.Listen
    getsockoptIntFunc func(int, int, int) (int, error)  = syscall.GetsockoptInt
)
可见除了socketFunc之外,还有connectFunc、listenFunc、getsockoptIntFunc,它们都是syscall包里的方法。
继续跟入syscall.Socket:
src\syscall\syscall_unix.go
func Socket(domain, typ, proto int) (fd int, err error) {
    if domain == AF_INET6 && SocketDisableIPv6 {
        return -1, EAFNOSUPPORT
    }
    fd, err = socket(domain, typ, proto)
    return
}
src\syscall\zsyscall_linux_amd64.go
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT

func socket(domain int, typ int, proto int) (fd int, err error) {
    r0, _, e1 := RawSyscall(SYS_SOCKET, uintptr(domain), uintptr(typ), uintptr(proto))
    fd = int(r0)
    if e1 != 0 {
        err = errnoErr(e1)
    }
    return
}
src\syscall\zsysnum_linux_amd64.go
const {
    ...
    SYS_SOCKET                 = 41
    ...
}
src\syscall\asm_linux_amd64.s
// func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2, err uintptr)
TEXT ·RawSyscall(SB),NOSPLIT,$0-56
    MOVQ    a1+8(FP), DI
    MOVQ    a2+16(FP), SI
    MOVQ    a3+24(FP), DX
    MOVQ    trap+0(FP), AX  // syscall entry
    SYSCALL
        ...
以上4段代码逻辑都比较简单,就是实现了一个socket的系统调用,最后的rawSyscall是使用汇编实现的一段系统调用方法,创建socket的系统调用号是SYS_SOCKET。
setDefaultSockopts

老规矩,上代码:
src\net\sockopt_linux.go
func setDefaultSockopts(s, family, sotype int, ipv6only bool) error {
    if family == syscall.AF_INET6 && sotype != syscall.SOCK_RAW {
        syscall.SetsockoptInt(s, syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, boolint(ipv6only))
    }
    if (sotype == syscall.SOCK_DGRAM || sotype == syscall.SOCK_RAW) && family != syscall.AF_UNIX {
        // Allow broadcast.
        return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1))
    }
    return nil
}
可见代码在一定条件下设置了是否只允许ipv6。如果是udp的话,还将socket设置为允许广播。
syscall.SetsockoptInt方法同syscall.Socket方法,都是syscall中的系统调用。
newFD

废话不多说,上代码:
src\net\fd_unix.go
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
    ret := &netFD{
        pfd: poll.FD{
            Sysfd:         sysfd,
            IsStream:      sotype == syscall.SOCK_STREAM,
            ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
        },
        family: family,
        sotype: sotype,
        net:    net,
    }
    return ret, nil
}
newFD方法将创建成功的系统fd包装成了netFD,下面挑选几个netFD的重要方法来了解它:
func (fd *netFD) Read(p []byte) (n int, err error)
func (fd *netFD) Write(p []byte) (nn int, err error)
func (fd *netFD) SetDeadline(t time.Time)
func (fd *netFD) SetReadDeadline(t time.Time)
func (fd *netFD) SetWriteDeadline(t time.Time)
func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error)
func (fd *netFD) accept() (netfd *netFD, err error)
func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) error
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error
func (fd *netFD) listenDatagram(laddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) error
netFD除了具有读写socket的方法,还实现了listen、accept及dial方法。
fd.listenStream

socket创建成功后,进而就是进行端口绑定和监听,看代码:
src\net\sock_posix.go
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {

        ...

    if ctrlFn != nil {
        c, err := newRawConn(fd)
        if err != nil {
            return err
        }
        if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
            return err
        }
    }
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }

        ...

    return nil
}
省略去了一些初始化和地址转换的代码。
syscall.Bind又一个系统调用,注意fd.pfd.Sysfd就是我们新创建的socket的fd,lsa则是我们最初传入的ip:port经过转换后的地址,Bind将这个地址绑定到我们创建的socket上。
listenFunc是一个方法变量,存储各种操作系统的Listen方法:
src\net\hook_unix.go
listenFunc        func(int, int) error              = syscall.Listen
经过Listen系统调用,我们的socket就被激活了,内核将接收连接到此socket的连接请求。下一步调用accept就可以取到连接请求的socket了。
呼呼 ,终于把端口绑定和监听的大体代码流程捋完了。看下面这张图,本文对应到了TCP Server的监听socket创建和bind、listen,下一章将继续介绍accept。


最后将开头ListenConfig的Controller属性的调用时机补上,netFD.listenStream方法中的ctrlFn就是这个属性,可见它是在监听socket创建后,bind调用之前被回调的。应该是开放给应用层个性化设置socket的属性的。
最最后再把backlog说一下 ,在netFD.listenStream方法中的listenFunc(fd.pfd.Sysfd, backlog)这一行中的backlog参数控制着待处理连接队列的长度,如果队列已满,新的连接请求将被忽略。backlog的值取自系统参数(linux系统)/proc/sys/net/core/somaxconn,如果读取失败,默认设置为128。如果值超过backlog可以存储的最大值(内核版本4.1以下backlog使用uint16存储,高版本使用uint32存储),将被设置为可存储的最大值。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表