Golang websocket 聊天室

/Users/lizhe/works/aus/alivediochat/go.mod

module alivediochat

 

go 1.14

 

require (

github.com/tidwall/gjson v1.6.0

github.com/tidwall/sjson v1.1.1

golang.org/x/net v0.0.0-20201224014010-6772e930b67b

 

)

 /Users/lizhe/works/aus/alivediochat/main.go

index.html

 

<!DOCTYPE html>

<html lang="en">

 

<head>

<meta charset="UTF-8">

<meta name="viewport" content="width=device-width, initial-scale=1.0">

<meta http-equiv="X-UA-Compatible" content="ie=edge">

<link rel="icon" href="./favicon.ico" type="image/x-icon" />

<title>在线QA</title>

<style type="text/css">

.talk_con {

width: 100%;

height: 100%;

border: 1px solid #666;

margin: 50px auto 0;

background: #f9f9f9;

}

 

.talk_show {

width: 100%;

height: 420px;

border: 1px solid #666;

background: #fff;

margin: 10px auto 0;

overflow: auto;

}

 

.talk_input {

width: 100%;

}

 

.talk_word {

width: 90%;

height: 26px;

float: left;

text-indent: 10px;

margin: 2% 5%;

}

 

.talk_sub {

width: 100%;

height: 30px;

float: left;

}

 

.atalk {

margin: 10px;

}

 

.atalk span {

display: inline-block;

background: #0181cc;

border-radius: 10px;

color: #fff;

padding: 5px 10px;

}

 

.btalk {

margin: 10px;

text-align: right;

}

 

.btalk span {

display: inline-block;

background: #ef8201;

border-radius: 10px;

color: #fff;

padding: 5px 10px;

}

</style>

<script src="https://cdn.bootcss.com/jquery/1.11.3/jquery.min.js"></script>

<script type="text/javascript">

$(function () {

// 询问框获取用户昵称

let username = localStorage.getItem("username") ?

localStorage.getItem("username") : disp_prompt();

let words = $("#words");

let talkWords = $("#talkwords");

let talkSubmit = $("#talksub");

// webSocket

let wsURL = "ws://alipoc.dev.awspactera.com:8010/webSocket";

ws = new WebSocket(wsURL);

try {

// 监听连接服务器

ws.onopen = function () {

console.log("已连接服务器")

let request = { "username": username, "message": " 已经加入聊天" };

ws.send(JSON.stringify(request));

};

 

// 监听关闭服务器

ws.onclose = function () {

if (ws) {

ws.close();

ws = null

}

console.log("关闭服务器连接")

};

 

// 监听信息

ws.onmessage = function (result) {

let data = JSON.parse(result.data);

let className = "atalk";

let user = data.username

// 如果是本人,放在右边 不是本人 放在左边

if (data.username === username) {

className = "btalk";

user = "";

}

str = words.html() +

'<div class=" + className + ">' + user + ':<span>'

+ data.message + '</span></div>';

words.html(str);

var scrollHeight = words.prop("scrollHeight");

words.scrollTop(scrollHeight);

console.log("开始监听消息")

};

 

// 监听错误

ws.onerror = function () {

if (ws) {

ws.close();

ws = null;

}

console.log("服务器连接失败")

}

} catch (e) {

console.log(e.message)

}

 

document.onkeydown = function (event) {

let e = event || window.event;

if (e && e.keyCode === 13) { //回车键的键值为13

talkSubmit.click()

}

};

 

talkSubmit.click(function () {

// 获取输入框内容

let content = talkWords.val();

if (content === "") {

// 消息为空时弹窗

alert("消息不能为空");

return;

}

 

// 发送数据

if (ws == null) {

alert("连接服务器失败,请刷新页面");

window.location.reload();

return

}

let request = { "username": username, "message": content };

ws.send(JSON.stringify(request));

// 清空输入框

talkWords.val("")

})

});

 

function disp_prompt() {

let username = prompt("请输入昵称");

if (username == null || username === "") {

disp_prompt()

} else {

localStorage.setItem("username", username);

return username;

}

}

</script>

</head>

 

<body>

<h1>在线QA</h1>

<div class="talk_con">

<div class="talk_show" id="words">

 

</div>

<div class="talk_input">

<input type="text" class="talk_word" id="talkwords" placeholder="输入聊天内容">

<input type="button" value="发送" class="talk_sub" id="talksub">

</div>

</div>

</body>

 

</html>

/Users/lizhe/works/aus/alivediochat/Dockerfile

FROM golang:alpine as golang

 

RUN mkdir -p /root/vedioChat

RUN mkdir -p /root/vedioChat/output

COPY ./ /root/vedioChat

 

WORKDIR /root/vedioChat/output

RUN go build /root/vedioChat/main.go


 

FROM alpine as alpine

 

RUN mkdir -p /root/vedioChat

COPY --from=golang --chown=root:root /root/vedioChat/output /root/vedioChat

COPY --from=golang --chown=root:root /root/vedioChat/static /root/vedioChat/static

 

WORKDIR /root/vedioChat

CMD /root/vedioChat/main

gRPC 2 kubernetes

这里因为 为了更好的看出 被访问的目标pod,我对server端代码进行了一些修改,所以干脆就把所有代码再贴一遍

先看目录结构

/home/lizhe/works/grpc_helloworld_golang/pbfiles/Hi.proto

syntax="proto3";  //
package services;
option go_package = "../services";
message HiRequest {
  string say = 1;
}
message HiResponse {
  string responed = 1;
}
service HiService {
    rpc GetHiResponed(HiRequest) returns (HiResponse);
}

生成 proto 代码

cd pbfiles

protoc –go_out=plugins=grpc:. Hi.proto

这里改动比较大,主要是为了输出 pod 的 ip 地址

/home/lizhe/works/grpc_helloworld_golang/services/HiService.go

package services

import (
    "context"
    "errors"
    "fmt"
    "net"
)

type HiService struct {
}

func externalIP() (net.IP, error) {
    ifaces, err := net.Interfaces()
    if err != nil {
        return nil, err
    }
    for _, iface := range ifaces {
        if iface.Flags&net.FlagUp == 0 {
            continue // interface down
        }
        if iface.Flags&net.FlagLoopback != 0 {
            continue // loopback interface
        }
        addrs, err := iface.Addrs()
        if err != nil {
            return nil, err
        }
        for _, addr := range addrs {
            ip := getIpFromAddr(addr)
            if ip == nil {
                continue
            }
            return ip, nil
        }
    }
    return nil, errors.New("connected to the network?")
}

func getIpFromAddr(addr net.Addr) net.IP {
    var ip net.IP
    switch v := addr.(type) {
    case *net.IPNet:
        ip = v.IP
    case *net.IPAddr:
        ip = v.IP
    }
    if ip == nil || ip.IsLoopback() {
        return nil
    }
    ip = ip.To4()
    if ip == nil {
        return nil // not an ipv4 address
    }

    return ip
}

func (hs HiService) GetHiResponed(ctx context.Context, request *HiRequest) (*HiResponse, error) {
    fmt.Println("Say:" + request.Say)
    ip, _ := externalIP()

    return &HiResponse{Responed: " helloworld " + ip.To4().String()}, nil
}

/home/lizhe/works/grpc_helloworld_golang/client.go 

package main

import (
    "context"
    "fmt"
    "helloworld/services"
    "log"
    "os"

    "google.golang.org/grpc"
)

func main() {
    args := os.Args
    url := args[1]
    conn, err := grpc.Dial(url+":30010", grpc.WithInsecure())
    if err != nil {
        log.Println(err)
    }
    defer conn.Close()
    client := services.NewHiServiceClient(conn)
    resp, err := client.GetHiResponed(context.Background(), &services.HiRequest{Say: "Hello"})
    if err != nil {
        log.Println(err)
    }
    fmt.Println(resp.Responed)

}

/home/lizhe/works/grpc_helloworld_golang/server.go

package main

import (
    "helloworld/services"
    "net"

    "google.golang.org/grpc"
)

func main() {
    rpcServer := grpc.NewServer()                                        // 创建grpc服务
    services.RegisterHiServiceServer(rpcServer, new(services.HiService)) // 注册,new方法中的是services包中定义的结构体HiService
    lis, _ := net.Listen("tcp", ":30010")                                // 开启一个监听端口
    rpcServer.Serve(lis)                                                 // 启动服务
}
 

/home/lizhe/works/grpc_helloworld_golang/go.mod

module helloworld

require (
    google.golang.org/grpc v1.29.1
    google.golang.org/protobuf v1.25.0
)

go 1.13
 

/home/lizhe/works/grpc_helloworld_golang/Dockerfile

FROM golang as golang   

RUN mkdir -p /root/grpc
COPY ./ /root/grpc

WORKDIR /root/grpc/output
RUN go build /root/grpc/server.go
RUN go build /root/grpc/client.go

FROM ubuntu as ubuntu

RUN mkdir -p /root/grpc
COPY --from=golang --chown=root:root /root/grpc /root/grpc
COPY --from=golang --chown=root:root /root/grpc/output /root/grpc

CMD /bin/bash
 

给 server 端和 client 端分别创建多个 pod 

这里实际是为了测试 grpc 在 k8s 上的负载均衡状态

/root/grpc

/bin/bash -c ./server

client使用 go run ./client.go 192.168.204.128 30010

gRPC 1 helloworld go-lang

运行起来的话大概是这样

从零开始

1. 先安装 golang

sudo apt install golang

添加环境变量

export GOROOT=/usr/lib/go
export GOPATH=$HOME/go
export GOBIN=$GOPATH/bin
export PATH=$PATH:$GOROOT:$GOPATH:$GOBIN

2. 安装git

sudo apt install git

3. 安装 grpc 依赖

go get -u github.com/golang/protobuf/protoc-gen-go

4. 添加 go.mod 文件

module helloworld

require (
    google.golang.org/grpc v1.29.1
    google.golang.org/protobuf v1.25.0
)

go 1.13
 

5. 安装 protoc 工具

sudo apt-get install autoconf automake libtool curl make g++ unzip

git clone https://github.com/google/protobuf.git

cd protobuf

git submodule update –init –recursive

./autogen.sh

./configure

make

make check

sudo make install

sudo ldconfig

protoc –version

切换到项目工作目录

cd grpc_go/

创建文件/home/lizhe/works/grpc_go/pbfiles/Hi.proto

syntax="proto3";  //
package services;
option go_package = "../services";
message HiRequest {
  string say = 1;
}
message HiResponse {
  string responed = 1;
}
service HiService {
    rpc GetHiResponed(HiRequest) returns (HiResponse);
}

切换到 pbfiles 文件夹

cd pbfiles

protoc –go_out=plugins=grpc:. Hi.proto

会生成 /home/lizhe/works/grpc_go/services/Hi.pb.go

创建文件 /home/lizhe/works/grpc_go/services/HiService.go

package services

import (
    "context"
    "fmt"
)

type HiService struct {
}

func (hs HiService) GetHiResponed(ctx context.Context, request *HiRequest) (*HiResponse, error) {
    fmt.Println("Say:" + request.Say)
    return &HiResponse{Responed: "helloworld"}, nil
}

创建 server.go

package main

import (
    "helloworld/services"
    "net"

    "google.golang.org/grpc"
)

func main() {
    rpcServer := grpc.NewServer()                                        // 创建grpc服务
    services.RegisterHiServiceServer(rpcServer, new(services.HiService)) // 注册,new方法中的是services包中定义的结构体HiService
    lis, _ := net.Listen("tcp", ":8081")                                 // 开启一个监听端口
    rpcServer.Serve(lis)                                                 // 启动服务
}
 

创建 client.go

package main

import (
    "context"
    "fmt"
    "helloworld/services"
    "log"

    "google.golang.org/grpc"
)

func main() {

    conn, err := grpc.Dial(":8081", grpc.WithInsecure())
    if err != nil {
        log.Println(err)
    }
    defer conn.Close()
    client := services.NewHiServiceClient(conn)
    resp, err := client.GetHiResponed(context.Background(), &services.HiRequest{Say: "Hello"})
    if err != nil {
        log.Println(err)
    }
    fmt.Println(resp.Responed)

}
 

Go 多线程 (5) RWLock 读写锁

读写锁的读行为不会造成同步

但是写行为会同时禁止 读/写 操作

例子很简单, 懒得写了网上找了一个, 下面可以看到读的时候没有阻塞

package main

import (
    "sync"
    "time"
)

var m *sync.RWMutex

func main() {
    m = new(sync.RWMutex)
    // 多个同时读
    go read(1)
    go read(2)
    time.Sleep(2 * time.Second)
}

func read(i int) {
    println(i, "read start")
    m.RLock()
    println(i, "reading")
    time.Sleep(1 * time.Second)
    m.RUnlock()
    println(i, "read over")
}

下面是写操作

package main

import (
    "sync"
    "time"
)

var m *sync.RWMutex

func main() {
    m = new(sync.RWMutex)
    // 写的时候啥也不能干
    go write(1)
    go read(2)
    go write(3)
    time.Sleep(5 * time.Second)
}

func read(i int) {
    println(i, "read start")
    m.RLock()
    println(i, "reading")
    time.Sleep(1 * time.Second)
    m.RUnlock()
    println(i, "read over")
}

func write(i int) {
    println(i, "write start")
    m.Lock()
    println(i, "writing")
    time.Sleep(1 * time.Second)
    m.Unlock()
    println(i, "write over")
}

Go 单元测试

package lizhe

import (
    "errors"
)

func Division(a, b float64) (float64, error) {
    if b == 0 {
        return 0, errors.New("除数不能为0")
    }
    return a / b, nil
}

package lizhe

import (
    "testing"
)

func Test_Division_1(t *testing.T) {
    if i, e := Division(6, 2); i != 3 || e != nil {
        t.Error("除法函数测试没通过")
    } else {
        t.Log("第一个测试通过了")
    }
}

 

func Test_Division_2(t *testing.T) {
    t.Error("就是不通过")
}

Go 反射

package main

import (
    "fmt"
    "reflect"
)

type User struct {
    Id int
    Name string
    Age int
}

func (u User) Hello() {
    fmt.Println("Hello world!")
}

func Info(o interface{}) {
    t := reflect.TypeOf(o)
    fmt.Println("Type:", t.Name())
    v := reflect.ValueOf(o)
    fmt.Println("Fields:")
    for i := 0; i < t.NumField(); i++ {
        f := t.Field(i)
        val := v.Field(i).Interface()
        fmt.Printf("%6s:%v =%v\n", f.Name, f.Type, val)
    }
    for i := 0; i < t.NumMethod(); i++ {
        m := t.Method(i)
        fmt.Printf("%6s:%v\n", m.Name, m.Type)
    }
}

func main() {
    u := User{1, "Jack", 23}
    Info(u)
}

Go 多线程 (4) sync.Pool

sync.Pool 用于缓存

1. pool 可以减少GC对高并发的性能影响
2. 提供了对象重用
3. 并发安全
4. 仅受限于内存大小
5. 减少GC
6. 对象被删除时不会通知
7. 动态扩容或者收缩

package main

 

import (

    "log"

    "sync"

)

 

func main() {

    var intPool = sync.Pool{}

    for i := 0; i < 5; i++ {

        intPool.Put(i)

    }

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

}

上面的例子中如果pool被pop空了, 会直接返回nil

可以通过添加New方法来控制这种行为

package main

 

import (

    "log"

    "sync"

)

 

func main() {

    var intPool = sync.Pool{New: func() interface{} {

        return "Hello world"

    }}

    for i := 0; i < 5; i++ {

        intPool.Put(i)

    }

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

    log.Println(intPool.Get())

}

interface{}作为Go的重要特性之一,它代表的是一个类似*void的指针,可以指向不同类型的数据

Go 多线程 (3) channel 通道

通道

atomic和互斥锁都可以保证线程安全, 不过在go语言里, 还有一种有趣的实现就是通道

package main

 

import (

    "fmt"

)

 

func main() {

    buffered := make(chan string, 10)

    buffered <- "hello world"

    value := <-buffered

    fmt.Println(value)

}

当一个资源需要在goroutine之间共享时, 通道在goroutine之间传递数据, 并且还提供了同步交换数据的机制.

在go语言中, 你需要使用 内置函数 make 来创建通道, 创建时需要指定数据类型

无缓冲的整数型通道

unbuffered := make ( chan int )

有缓冲的字符串通道

buffered := make ( chan string , 10 )

无缓冲通道

unbuffered channel 是指在接收前没有能力保存任何值的通道. 这种类型的通道要求发送方(goroutine) 和接收方(goroutine)同时准备好, 才能完成发送和接收操作.
如果两个goroutine没有同时准备好, 通道会导致先执行发送或者接收的goroutine阻塞等待. 这种对通道进行发送和接收的交互行为本身就是同步的.

其中任意一个操作都无法离开另一方操作独立存在

之前的例子里我们写过一个线程不安全的版本

package main

 
import (

    "fmt"

    "runtime"

    "sync"

)

 
var (

    count int32

    wg sync.WaitGroup

)

 
func main() {

    wg.Add(2)

    go addValue()

    go addValue()

    wg.Wait()

    fmt.Println(count)

}

 
func addValue() {

    defer wg.Done()

    for i := 0; i < 2; i++ {

        value := count

        runtime.Gosched()

        value++

        count = value

    }

}

上面这段代码使用channel的线程安全版本是

package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    count int32
    channel chan int32
    wg sync.WaitGroup
)

func main() {
    runtime.GOMAXPROCS(2)
    channel = make(chan int32, 10)
    count = 0
    channel <- count
    wg.Add(2)
    go addValue()
    go addValue()
    wg.Wait()
    count = <-channel
    fmt.Println(count)
}

func addValue() {
    defer wg.Done()
    for i := 0; i < 200; i++ {
        value := <-channel
        runtime.Gosched()
        value++
        channel <- value
    }
}

 

Go 多线程 (2) sync.Mutex

线程不安全的版本

package main

 
import (

    "fmt"

    "runtime"

    "sync"

)

 
var (

    count int32

    wg sync.WaitGroup

)

 
func main() {

    wg.Add(2)

    go addValue()

    go addValue()

    wg.Wait()

    fmt.Println(count)

}

 
func addValue() {

    defer wg.Done()

    for i := 0; i < 2; i++ {

        value := count

        runtime.Gosched()

        value++

        count = value

    }

}

线程安全的版本

package main

 

import (

    "fmt"

    "runtime"

    "sync"

)

 

var (

    count int32

    wg sync.WaitGroup

    mutex sync.Mutex

)

 

func main() {

    wg.Add(2)

    go addValue()

    go addValue()

    wg.Wait()

    fmt.Println(count)

}

 

func addValue() {

    defer wg.Done()

    for i := 0; i < 2; i++ {

        mutex.Lock()

        value := count

        runtime.Gosched()

        value++

        count = value

        mutex.Unlock()

    }

}

使用atomic

package main

 

import (

    "fmt"

    "sync"

    "sync/atomic"

)

 

var (

    count int32

    wg sync.WaitGroup

    mutex sync.Mutex

)

 

func main() {

    wg.Add(2)

    go addValue()

    go addValue()

    wg.Wait()

    fmt.Println(count)

}

 

func addValue() {

    defer wg.Done()

    for i := 0; i < 200; i++ {

        atomic.AddInt32(&count, 1)

        //runtime.Gosched()

    }

}

Go 多线程 (1) waitgroup

下面的代码创建了两个匿名的异步函数, 然后通过waitgroup来等待这两个异步方法

package main

 

import (

    "fmt"

    "sync"

)

 

func main() {

    var wg sync.WaitGroup

    wg.Add(2)

    fmt.Println("Started...")

 

    go func() {

        defer wg.Done()

        fmt.Println("first thread finished")

    }()

 

    go func() {

        defer wg.Done()

        fmt.Println("second thread finished")

    }()

 

    fmt.Println("waiting...")

    wg.Wait()

    fmt.Println("done")

}