今天延續昨天的項目,我想了解不同語言(go, rust)在ubuntu機器中的轉發器表現。
C:\USERS\ASUS\STEVEN\TCP_FORWARD_GO
│  Dockerfile
│  go.mod
│  main.go
│  README.md
│
├─cmd
│  └─bench_client
│          bench_client.go
│
└─tcpserver
        tcpserver.go
module tcp_forward
go 1.19
package main
import (
	"fmt"
	"net"
	"os"
	"os/signal"
	"strconv"
	"sync/atomic"
	"syscall"
	"tcp_forward/tcpserver"
	"time"
)
func main() {
	recvPort := uint16(7001)
	sendPort := uint16(7002)
	if len(os.Args) >= 2 {
		if p, err := strconv.Atoi(os.Args[1]); err == nil {
			recvPort = uint16(p)
		}
	}
	if len(os.Args) >= 3 {
		if p, err := strconv.Atoi(os.Args[2]); err == nil {
			sendPort = uint16(p)
		}
	}
	recvSrv, err := tcpserver.New(recvPort)
	if err != nil {
		fmt.Printf("Failed to create RecvServer: %v\n", err)
		os.Exit(1)
	}
	sendSrv, err := tcpserver.New(sendPort)
	if err != nil {
		fmt.Printf("Failed to create SendServer: %v\n", err)
		os.Exit(1)
	}
	var recvCount atomic.Uint64
	var sendCount atomic.Uint64
	recvSrv.SetOnOpen(func(addr net.Addr) {
		fmt.Printf("🔗 RecvServer OPEN: %v\n", addr)
	})
	recvSrv.SetOnClose(func(addr net.Addr) {
		fmt.Printf("❌ RecvServer CLOSE: %v\n", addr)
	})
	recvSrv.SetOnMsg(func(msg string, addr net.Addr) {
		recvCount.Add(1)
		line := msg + "\n"
		sentTo := sendSrv.Broadcast(line)
		sendCount.Add(uint64(sentTo))
	})
	sendSrv.SetOnOpen(func(addr net.Addr) {
		fmt.Printf("🔗 SendServer OPEN: %v\n", addr)
	})
	sendSrv.SetOnClose(func(addr net.Addr) {
		fmt.Printf("❌ SendServer CLOSE: %v\n", addr)
	})
	sendSrv.SetOnMsg(func(msg string, addr net.Addr) {
		fmt.Printf("📩 SendServer got (unexpected) from %v: %s\n", addr, msg)
	})
	recvSrv.Start()
	sendSrv.Start()
	// Statistics timer
	stopStats := make(chan struct{})
	go func() {
		ticker := time.NewTicker(60 * time.Second)
		defer ticker.Stop()
		lastTime := time.Now()
		var lastRecv, lastSend uint64
		for {
			select {
			case <-stopStats:
				return
			case <-ticker.C:
				now := time.Now()
				elapsed := now.Sub(lastTime).Seconds()
				lastTime = now
				currRecv := recvCount.Load()
				currSend := sendCount.Load()
				deltaRecv := currRecv - lastRecv
				deltaSend := currSend - lastSend
				lastRecv = currRecv
				lastSend = currSend
				rps := float64(deltaRecv) / elapsed
				sps := float64(deltaSend) / elapsed
				fmt.Printf("⏱️  Interval %.0fs | Recv: %d (%.2f/s), Sent: %d (%.2f/s) | SendConn=%d RecvConn=%d\n",
					elapsed, deltaRecv, rps, deltaSend, sps,
					sendSrv.ConnectionCount(), recvSrv.ConnectionCount())
			}
		}
	}()
	// Signal handling
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	sig := <-sigChan
	fmt.Printf("\nSignal %v received, shutting down now...\n", sig)
	close(stopStats)
	recvSrv.Stop()
	sendSrv.Stop()
	fmt.Println("Bye.")
}
package tcpserver
import (
	"bufio"
	"fmt"
	"io"
	"net"
	"sync"
	"sync/atomic"
)
// TcpServer represents a TCP server that can accept connections and broadcast messages
type TcpServer struct {
	listener   net.Listener
	port       uint16
	sessions   map[*Session]struct{}
	mu         sync.Mutex
	onOpen     func(net.Addr)
	onClose    func(net.Addr)
	onMsg      func(string, net.Addr)
	stopChan   chan struct{}
	wg         sync.WaitGroup
	connCount  atomic.Int64
}
// Session represents a single client connection
type Session struct {
	conn   net.Conn
	server *TcpServer
}
// New creates a new TcpServer
func New(port uint16) (*TcpServer, error) {
	listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
	if err != nil {
		return nil, err
	}
	return &TcpServer{
		listener: listener,
		port:     port,
		sessions: make(map[*Session]struct{}),
		stopChan: make(chan struct{}),
	}, nil
}
// SetOnOpen sets the callback for when a connection is opened
func (s *TcpServer) SetOnOpen(cb func(net.Addr)) {
	s.onOpen = cb
}
// SetOnClose sets the callback for when a connection is closed
func (s *TcpServer) SetOnClose(cb func(net.Addr)) {
	s.onClose = cb
}
// SetOnMsg sets the callback for when a message is received
func (s *TcpServer) SetOnMsg(cb func(string, net.Addr)) {
	s.onMsg = cb
}
// Start starts the server and begins accepting connections
func (s *TcpServer) Start() {
	fmt.Printf("✅ Server listening on port %d\n", s.port)
	s.wg.Add(1)
	go s.acceptLoop()
}
// Stop stops the server and closes all connections
func (s *TcpServer) Stop() {
	close(s.stopChan)
	s.listener.Close()
	s.mu.Lock()
	for sess := range s.sessions {
		sess.conn.Close()
	}
	s.sessions = make(map[*Session]struct{})
	s.mu.Unlock()
	s.wg.Wait()
}
// Broadcast sends a message to all connected clients
// Returns the number of clients the message was sent to
func (s *TcpServer) Broadcast(msg string) int {
	s.mu.Lock()
	snapshot := make([]*Session, 0, len(s.sessions))
	for sess := range s.sessions {
		snapshot = append(snapshot, sess)
	}
	s.mu.Unlock()
	sent := 0
	for _, sess := range snapshot {
		_, err := sess.conn.Write([]byte(msg))
		if err == nil {
			sent++
		}
	}
	return sent
}
// ConnectionCount returns the current number of active connections
func (s *TcpServer) ConnectionCount() int {
	s.mu.Lock()
	defer s.mu.Unlock()
	return len(s.sessions)
}
func (s *TcpServer) acceptLoop() {
	defer s.wg.Done()
	for {
		conn, err := s.listener.Accept()
		if err != nil {
			select {
			case <-s.stopChan:
				return
			default:
				continue
			}
		}
		sess := &Session{
			conn:   conn,
			server: s,
		}
		s.mu.Lock()
		s.sessions[sess] = struct{}{}
		s.mu.Unlock()
		s.connCount.Add(1)
		if s.onOpen != nil {
			s.onOpen(conn.RemoteAddr())
		}
		s.wg.Add(1)
		go sess.handleConnection()
	}
}
func (sess *Session) handleConnection() {
	defer sess.server.wg.Done()
	defer func() {
		addr := sess.conn.RemoteAddr()
		sess.conn.Close()
		sess.server.mu.Lock()
		delete(sess.server.sessions, sess)
		sess.server.mu.Unlock()
		if sess.server.onClose != nil {
			sess.server.onClose(addr)
		}
	}()
	reader := bufio.NewReader(sess.conn)
	for {
		select {
		case <-sess.server.stopChan:
			return
		default:
		}
		line, err := reader.ReadString('\n')
		if err != nil {
			if err != io.EOF {
				// Connection error
			}
			return
		}
		// Remove the trailing newline
		if len(line) > 0 && line[len(line)-1] == '\n' {
			line = line[:len(line)-1]
		}
		if len(line) > 0 && line[len(line)-1] == '\r' {
			line = line[:len(line)-1]
		}
		if sess.server.onMsg != nil {
			sess.server.onMsg(line, sess.conn.RemoteAddr())
		}
	}
}
package main
import (
	"bufio"
	"context"
	"flag"
	"fmt"
	"io"
	"net"
	"os"
	"os/signal"
	"runtime"
	"sort"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"syscall"
	"time"
)
type Args struct {
	PubHost  string
	PubPort  uint16
	SubHost  string
	SubPort  uint16
	Pubs     int
	Subs     int
	Rate     int
	MsgSize  int
	Duration int
}
type LatencyStats struct {
	mu        sync.Mutex
	samplesMs []float64
	cap       int
}
func NewLatencyStats() *LatencyStats {
	return &LatencyStats{
		samplesMs: make([]float64, 0, 200000),
		cap:       200000,
	}
}
func (ls *LatencyStats) Add(ms float64) {
	ls.mu.Lock()
	defer ls.mu.Unlock()
	if len(ls.samplesMs) < ls.cap {
		ls.samplesMs = append(ls.samplesMs, ms)
	}
}
func (ls *LatencyStats) Print() {
	ls.mu.Lock()
	v := make([]float64, len(ls.samplesMs))
	copy(v, ls.samplesMs)
	ls.mu.Unlock()
	if len(v) == 0 {
		fmt.Println("latency: no samples")
		return
	}
	// Calculate average
	sum := 0.0
	for _, x := range v {
		sum += x
	}
	avg := sum / float64(len(v))
	// Calculate percentiles
	sort.Float64s(v)
	p50 := percentile(v, 0.50)
	p90 := percentile(v, 0.90)
	p99 := percentile(v, 0.99)
	fmt.Printf("Latency (ms): avg=%.3f p50=%.3f p90=%.3f p99=%.3f\n", avg, p50, p90, p99)
}
func percentile(sorted []float64, p float64) float64 {
	if len(sorted) == 0 {
		return 0.0
	}
	idx := int(p * float64(len(sorted)-1))
	if idx >= len(sorted) {
		idx = len(sorted) - 1
	}
	return sorted[idx]
}
type Shared struct {
	sent     atomic.Uint64
	received atomic.Uint64
	lat      *LatencyStats
}
func nowNs() uint64 {
	return uint64(time.Now().UnixNano())
}
// Publisher sends messages at a fixed rate
type Publisher struct {
	conn    net.Conn
	id      int
	rate    int
	msgSize int
	seq     uint64
	shared  *Shared
	stop    <-chan struct{}
}
func NewPublisher(host string, port uint16, id, rate, msgSize int, shared *Shared, stop <-chan struct{}) (*Publisher, error) {
	conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
	if err != nil {
		return nil, err
	}
	return &Publisher{
		conn:    conn,
		id:      id,
		rate:    rate,
		msgSize: msgSize,
		shared:  shared,
		stop:    stop,
	}, nil
}
func (p *Publisher) Start(ctx context.Context) {
	defer p.conn.Close()
	interval := time.Second / time.Duration(p.rate)
	if p.rate <= 0 {
		interval = 0
	}
	ticker := time.NewTicker(interval)
	defer ticker.Stop()
	for {
		select {
		case <-p.stop:
			return
		case <-ctx.Done():
			return
		case <-ticker.C:
			p.sendOne()
		}
	}
}
func (p *Publisher) sendOne() {
	p.seq++
	ts := nowNs()
	// Format: pubId,seq,ts,<padding>\n
	msg := fmt.Sprintf("%d,%d,%d,", p.id, p.seq, ts)
	if len(msg)+1 < p.msgSize {
		padding := strings.Repeat("x", p.msgSize-len(msg)-1)
		msg += padding
	}
	msg += "\n"
	_, err := p.conn.Write([]byte(msg))
	if err != nil {
		return
	}
	p.shared.sent.Add(1)
}
// Subscriber receives messages and calculates latency
type Subscriber struct {
	conn   net.Conn
	shared *Shared
	stop   <-chan struct{}
}
func NewSubscriber(host string, port uint16, shared *Shared, stop <-chan struct{}) (*Subscriber, error) {
	conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
	if err != nil {
		return nil, err
	}
	return &Subscriber{
		conn:   conn,
		shared: shared,
		stop:   stop,
	}, nil
}
func (s *Subscriber) Start(ctx context.Context) {
	defer s.conn.Close()
	reader := bufio.NewReader(s.conn)
	for {
		select {
		case <-s.stop:
			return
		case <-ctx.Done():
			return
		default:
		}
		line, err := reader.ReadString('\n')
		if err != nil {
			if err != io.EOF {
				// Connection error
			}
			return
		}
		// Parse: pubId,seq,ts,...
		parts := strings.Split(line, ",")
		if len(parts) >= 3 {
			if sentNs, err := strconv.ParseUint(parts[2], 10, 64); err == nil && sentNs > 0 {
				recvNs := nowNs()
				ms := float64(recvNs-sentNs) / 1e6
				s.shared.lat.Add(ms)
			}
		}
		s.shared.received.Add(1)
	}
}
func parseArgs() Args {
	args := Args{}
	flag.StringVar(&args.PubHost, "pub-host", "127.0.0.1", "RecvServer host")
	flag.Func("pub-port", "RecvServer port", func(s string) error {
		p, err := strconv.Atoi(s)
		if err != nil {
			return err
		}
		args.PubPort = uint16(p)
		return nil
	})
	flag.StringVar(&args.SubHost, "sub-host", "127.0.0.1", "SendServer host")
	flag.Func("sub-port", "SendServer port", func(s string) error {
		p, err := strconv.Atoi(s)
		if err != nil {
			return err
		}
		args.SubPort = uint16(p)
		return nil
	})
	flag.IntVar(&args.Pubs, "pub", 1, "Publishers count")
	flag.IntVar(&args.Subs, "sub", 1, "Subscribers count")
	flag.IntVar(&args.Rate, "rate", 1000, "Msgs/sec per publisher")
	flag.IntVar(&args.MsgSize, "msg-size", 64, "Bytes per message incl. newline")
	flag.IntVar(&args.Duration, "duration", 10, "Duration in seconds")
	flag.Parse()
	// Defaults
	if args.PubPort == 0 {
		args.PubPort = 7001
	}
	if args.SubPort == 0 {
		args.SubPort = 7002
	}
	if args.MsgSize < 16 {
		args.MsgSize = 16
	}
	return args
}
func main() {
	args := parseArgs()
	fmt.Printf("bench_client start\n")
	fmt.Printf("pubs=%d subs=%d rate=%d/pub msg_size=%d duration=%ds threads=%d\n",
		args.Pubs, args.Subs, args.Rate, args.MsgSize, args.Duration, runtime.NumCPU())
	shared := &Shared{
		lat: NewLatencyStats(),
	}
	stopChan := make(chan struct{})
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(args.Duration)*time.Second)
	defer cancel()
	var wg sync.WaitGroup
	// Create publishers
	for i := 0; i < args.Pubs; i++ {
		pub, err := NewPublisher(args.PubHost, args.PubPort, i, args.Rate, args.MsgSize, shared, stopChan)
		if err != nil {
			fmt.Printf("Failed to create publisher %d: %v\n", i, err)
			continue
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			pub.Start(ctx)
		}()
	}
	// Create subscribers
	for i := 0; i < args.Subs; i++ {
		sub, err := NewSubscriber(args.SubHost, args.SubPort, shared, stopChan)
		if err != nil {
			fmt.Printf("Failed to create subscriber %d: %v\n", i, err)
			continue
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			sub.Start(ctx)
		}()
	}
	// Signal handling
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		// Duration expired
	case <-sigChan:
		// Signal received
	}
	close(stopChan)
	cancel()
	wg.Wait()
	// Print statistics
	sent := shared.sent.Load()
	recv := shared.received.Load()
	secs := float64(args.Duration)
	sendRate := float64(sent) / secs
	recvRate := float64(recv) / secs
	fmt.Println("==== bench result ====")
	fmt.Printf("Sent: %d msgs (%.2f msg/s)\n", sent, sendRate)
	fmt.Printf("Recv: %d msgs (%.2f msg/s)\n", recv, recvRate)
	shared.lat.Print()
	fmt.Println("======================")
}
# Multi-stage build for optimal image size
FROM golang:1.19-bullseye AS builder
WORKDIR /build
# Copy go mod files
COPY go.mod go.sum* ./
RUN go mod download
# Copy source code
COPY . .
# Build main application
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o tcp_forward .
# Build bench_client
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o bench_client ./cmd/bench_client
# Final stage - Ubuntu 22.04
FROM ubuntu:22.04
# Install ca-certificates for HTTPS connections (if needed)
RUN apt-get update && \
    apt-get install -y --no-install-recommends ca-certificates && \
    rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Copy binaries from builder
COPY --from=builder /build/tcp_forward .
COPY --from=builder /build/bench_client .
# Expose default ports
EXPOSE 7001 7002
# Default command runs the main tcp_forward server
CMD ["./tcp_forward"]
docker build -t tcp-forward-go .
docker run -p 7001:7001 -p 7002:7002 tcp-forward-go
# 透過vs code 的dev container進入
cd /app
./bench_client --pub-host 0.0.0.0 --pub 1 --sub 3 --rate 10000 --msg-size 80 --duration 15
# 執行結果
bench_client start
pubs=1 subs=3 rate=10000/pub msg_size=80 duration=15s threads=16
==== bench result ====
Sent: 115613 msgs (7707.53 msg/s)
Recv: 346839 msgs (23122.60 msg/s)
Latency (ms): avg=0.171 p50=0.169 p90=0.220 p99=0.296
======================
意外的發現性能竟然是 go 語言的比較出色,但這部分目前我是沒辦法解釋的。
# C++ 版本的執行結果
bench_client start
pubs=1 subs=3 rate=10000/pub msg_size=80 duration=15s threads=16
==== bench result ====
Sent: 54754 msgs (3650.27 msg/s)
Recv: 164262 msgs (10950.8 msg/s)
Latency (ms): avg=0.257809 p50=0.249847 p90=0.331499 p99=0.437018
======================
有了AI工具,現在換語言的實作真的變超輕鬆,就算對go語言只是懵懵懂懂,也是能順利移植過來,雖然中間會遇到一些問題,但AI都會處理,不過用下來除非本身很懂要AI做的東西是什麼,不然在本身不太懂的情況下使用,真的會有很空虛的現象,變成完全依賴AI才能運作,我想這就是未來工程師真正的考驗吧。