Golang Beego框架之WebIM例子分析

beego框架算是golang比较成熟的一个框架了,最近看了下这个框架其中的一个在线聊天室的例子,觉得还是有很多可以学习借鉴的地方,所以就总结下。@H_301_1@

这个例子的源码在这里,该例子配合bee工具可以很简单的进行运行。@H_301_1@

首先看下这个项目的结构:@H_301_1@

标准的beego框架,各个文件夹包含了不同的功能。@H_301_1@

然后我们从main.go(这里是WebIM.go)看起:@H_301_1@

package main

import (
    "github.com/astaxie/beego"
    "github.com/beego/i18n"

    "github.com/beego/samples/WebIM/controllers"
)

const (
    APP_VER = "0.1.1.0227"
)

func main() {
    beego.Info(beego.BConfig.AppName,APP_VER)

    // Register routers.
    beego.Router("/",&controllers.AppController{})
    // Indicate AppController.Join method to handle POST requests.
    beego.Router("/join",&controllers.AppController{},"post:Join")

    // Long polling.
    beego.Router("/lp",&controllers.LongPollingController{},"get:Join")
    beego.Router("/lp/post",&controllers.LongPollingController{})
    beego.Router("/lp/fetch","get:Fetch")

    // WebSocket.
    beego.Router("/ws",&controllers.WebSocketController{})
    beego.Router("/ws/join",&controllers.WebSocketController{},"get:Join")

    // Register template functions.
    beego.AddFuncMap("i18n",i18n.Tr)

    beego.Run()
}

虽然是从该文件看起,但是并非是从该文件开始执行的,而是import中的package中的init方法执行的,有关init方法的执行顺序可以看这里,好了,先收回来看这个文件,WebIM.go中主要是定义了路由,那我们来看下路由的细节:@H_301_1@

请求/跳转controllers.AppController对应的Get方法,该方法直接返回welcome.html页面页面如下:@H_301_1@

然后在此处输入用户名并选择使用的连接技术,用户名很简单就是用户ID,使用的技术这里采用长轮询或者WebSocket的方式,稍后会专门谈谈这两种方式。@H_301_1@

点击‘进入聊天室’后,就会请求/join,同时会携带两个参数,一个是用户名,另外一个参数是连接的方式(技术),该请求为post请求,根据beego.Router("/join","post:Join")可知,该请求会被AppController的Join方法:@H_301_1@

func (this *AppController) Join() {
    // Get form value.
    uname := this.GetString("uname")
    tech := this.GetString("tech")

    // Check valid.
    if len(uname) == 0 {
        this.Redirect("/",302)
        return
    }

    switch tech {
    case "longpolling":
        this.Redirect("/lp?uname="+uname,302)
    case "websocket":
        this.Redirect("/ws?uname="+uname,302)
    default:
        this.Redirect("/",302)
    }

    // Usually put return after redirect.
    return
}

方法首先获取到post请求的两个参数,然后判断用户名是否为空,若为空重新跳回欢迎页面,如不为空,开始判断当前连接方式(技术)是什么,根据不同分别跳转/lp?uname=uname/ws?uname=uname,然后WebIM.go中的路由路由就会根据本次请求的方式分别请求不同的控制器,/lp将会请求controllers.LongPollingController{}Join方法/ws将会请求controllers.WebSocketController{}默认的Get方法,这两种请求的方式返回的前端页面是相同的,但是后台的处理是不同的。@H_301_1@

那么,我们先区分一下这两种获取数据的方式有什么不同,首先是websocket的方式,websocket实现了服务器端主动向客户端浏览器进行消息的推送,实现了消息的同步,实质上是建立了一条socket连接,通过该链接进行通信,而长轮询则依旧是采用http的方式去请求数据,区别于轮询的方式,轮询是客户端浏览器每隔一段时间发起一次http请求,而长轮询则是发起一次请求,然后服务器端将该连接保持住(hold,暂时不拒绝也不响应),等到服务器端到达指定的情况(收到另一个消息)将该消息作为内容进行响应,客户端收到响应后再次发起长轮询请求,这样就能保证每次有新的消息到达的时候都能及时的收到响应。@H_301_1@

在这个WebIM中是通过chan来维持长轮询的,在介绍维持长轮询这块之前,我们按次序来看下在这个例子中使用的数据结构models:@H_301_1@

archive.go@H_301_1@

package models

import (
    "container/list"
)

//用int类型重新定义客户端产生的事件类型
type EventType int

//三种事件类型:加入、离开、消息
const (
    EVENT_JOIN = iota
    EVENT_LEAVE
    EVENT_MESSAGE
)

//定义事件结构(事件类型、用户名、事件、内容
type Event struct {
    Type      EventType // JOIN,LEAVE,MESSAGE
    User      string
    Timestamp int // Unix timestamp (secs)
    Content   string
}

//用来保存服务器上能够保存的消息记录,保存最新的20条
const archiveSize = 20

// 事件归档保存
var archive = list.New()

// 将一个新的事件保存在archive中,若事件的个数已经大于等于20则删除第一个,只保留最新的20个
func NewArchive(event Event) {
    if archive.Len() >= archiveSize {
        archive.Remove(archive.Front())
    }
    archive.PushBack(event)
}

// 根据传过来的时间戳返回该时间戳之后的所有事件消息
func GetEvents(lastReceived int) []Event {
    events := make([]Event, 0,archive.Len())
    for event := archive.Front(); event != nil; event = event.Next() {
        e := event.Value.(Event)
        if e.Timestamp > int(lastReceived) {
            events = append(events,e)
        }
    }
    return events
}

看完了models才想起来上面还说到WebIM.go并不是程序执行的其实,而应该是import包时调用的init方法,由于此处导入了github.com/beego/samples/WebIM/controllerspackage,所以我们来看下controllers中的init方法。@H_301_1@

在app.go中的init方法如下:@H_301_1@

func init() {
    // 从配置文件获取语言类型列表
    langTypes = strings.Split(beego.AppConfig.String("lang_types"),"|")

    // 根据语言类型加载语言环境文件
    for _,lang := range langTypes {
        beego.Trace("Loading language: " + lang)
        if err := i18n.SetMessage(lang,"conf/"+"locale_"+lang+".ini"); err != nil {
            beego.Error("Fail to set message file:",err)
            return
        }
    }
}

在chatroom.go中init方法如下:@H_301_1@

func init() {
    go chatroom()
}

方法直接启动一个goroutine,让其独立运行,下面我们就来看下这个chatroom方法的作用。@H_301_1@

首先在chatroom.go中定义了一系列的变量,如:subscribeunsubscribepublishwaitingListsubscribers,其中subscribeunsubscribepublish都是chan类型,三个变量代表的含义分别是:订阅者,未订阅者,以及要进行发布的消息,之所以设置为缓冲为10的chan,这是为了应对同时多个客户端发起的请求事件。watingList表示的是当前等待长轮询的list,该list中存储的类型为无缓冲的chan类型,也就是通过这样一个无缓冲的chan类型来保证了长轮询的“保持(hold)”:当有长轮询请求到达时,该list添加一个空的chan,然后从该ch中读取数据,由于刚才是添加的空的chan,所以这里直接从chan中读取数据自然不能读取到,那么执行到此处的无缓冲的chan自然就会阻塞,这也就是长轮询可以保持住的原因!@H_301_1@

那么,我们在看下当事件触发的情况下,长轮询返回响应的情况:@H_301_1@

// Notify waiting list.
    for ch := waitingList.Back(); ch != nil; ch = ch.Prev() {
        ch.Value.(chan bool) <- true
        waitingList.Remove(ch)
    }

当有事件触发时,此处循环waitingList,往list中的每个chan填入元素true,那么这时在上面使用chan读取数据来保持长轮询的部分将会被激活(启动),此时也将返回响应信息,完成一次长轮询的过程,进而开始下次的长轮询。此处使用chan来保持长轮询还是设计的很巧妙的!@H_301_1@

我们再看下使用websocket方式来进行通信的场景。@H_301_1@

wobsocket和longpolling共用了较多的数据,如所有的事件存档,当有新的websocket用户加入时,会调用如下函数:@H_301_1@

websocket.go@H_301_1@

// Join method handles WebSocket requests for WebSocketController.
//当有新的用户通过websocket方式加入时,调用执行该函数
func (this *WebSocketController) Join() {
    //获取加入的用户用户名并进行是否为空的校验
    uname := this.GetString("uname")
    if len(uname) == 0 {
        this.Redirect("/",302)
        return
    }

    // 从http请求升级到WebSocket。
    ws,err := websocket.Upgrade(this.Ctx.ResponseWriter,this.Ctx.Request,nil,1024,1024)
    if _,ok := err.(websocket.HandshakeError); ok {
        http.Error(this.Ctx.ResponseWriter,"Not a websocket handshake",400)
        return
    } else if err != nil {
        beego.Error("Cannot setup WebSocket connection:",err)
        return
    }

    // 将该请求转换成的websocket联通用户名一起加入chatroom
    Join(uname,ws)
    defer Leave(uname)

    // 循环从websocket中读取数据,无数据时阻塞,有数据到达时往publish chan中添加事件,从而引起其他事件的响应
    for {
        _,p,err := ws.ReadMessage()
        if err != nil {
            return
        }
        publish <- newEvent(models.EVENT_MESSAGE,uname,string(p))
    }
}

websocket.go中还有如下函数:@H_301_1@

func broadcastWebSocket(event models.Event) {
    //将要进行广播的事件json格式化
    data,err := json.Marshal(event)
    if err != nil {
        beego.Error("Fail to marshal event:",err)
        return
    }

    //循环遍历通过websocket方式加入聊天室的用户,广播该事件(单条)
    for sub := subscribers.Front(); sub != nil; sub = sub.Next() {
        // Immediately send event to WebSocket users.
        ws := sub.Value.(Subscriber).Conn //若是通过longpolling方式加入的,则ws为nil if ws != nil {
            //如下是将事件消息写入websocket中,若写入失败(返回err)则证明客户端已关闭websocket,此时从订阅列表中将该用户删除
            if ws.WriteMessage(websocket.TextMessage,data) != nil {
                // User disconnected.
                unsubscribe <- sub.Value.(Subscriber).Name } } } }

在chatroom.go中源码中有一段如下:@H_301_1@

chatroom.go@H_301_1@

//当有新的事件消息到达时,执行如下
case event := <-publish:
            // Notify waiting list.
            for ch := waitingList.Back(); ch != nil; ch = ch.Prev() {
                ch.Value.(chan bool) <- true
                waitingList.Remove(ch)
            }

            broadcastWebSocket(event)
            models.NewArchive(event)

            if event.Type == models.EVENT_MESSAGE {
                beego.Info("Message from",event.User,";Content:",event.Content)
            }

此时,我发现了如下的问题,这里的chatroom实际上已经是一个单独的goroutine,即就是此处的添加消息事件和长轮询的响应处理函数是两个goroutine,所以这里就存在了同步的问题,我们先来看下这里有的两个goroutine的执行情况:@H_301_1@

从图中可以看出,当chatroom的goroutine中收到事件消息时,首先是激活了长轮询的等待,然后依次是广播事件、添加事件消息到消息归档中,而在另一个goroutine中激活了长轮询的等待之后立刻就会去获取lastReceived之后新的事件消息,这里就有可能产生不同步,即就是事件消息在chatroom的goroutine中还未加入事件消息归档,这里就开始去获取,这样的话当然是不能获取到消息,也就是返回的消息响应为空!虽然这样理解但是并非每次都是这样,而且即便此时此刻返回的响应消息为空,依旧不会影响消息的获取,因为当前的请求返回空之后,立刻回发起一次新的fetch请求,这次请求的lastReceived依旧是上次请求的timestamp,所以消息不会遗漏,但是,消息事件响应的灵敏度就不够高,需要发起两次请求。@H_301_1@

为了测试,我添加了如下的代码,用来模拟chatroom的goroutine运行较慢的情况:@H_301_1@

// Notify waiting list.
            for ch := waitingList.Front(); ch != nil; ch = waitingList.Front() {
                ch.Value.(chan bool) <- true
                waitingList.Remove(ch)
            }
            time.Sleep(time.Second * 2)

            broadcastWebSocket(event)
            models.NewArchive(event)

如下是收到空的事件消息的情况:@H_301_1@

从请求中也可以看出,当返回响应消息为空时,下次请求的lastRecived依旧为上次请求的时间戳,所以消息并不会丢失。@H_301_1@

关于两个goroutine的运行差不多就这样了,在这个例子中,源码的作者有这样一处小失误,代码如下:@H_301_1@

// Notify waiting list.
            for ch := waitingList.Back(); ch != nil; ch = ch.Prev() {
                ch.Value.(chan bool) <- true
                waitingList.Remove(ch)
            }

本来是想要循环遍历所有的长轮询的等待队列,遍历之后删除节点,但是由于循环条件出错,并不能达到预想的情况,而是遗漏了很多的节点。修改如下:@H_301_1@

// Notify waiting list.
            for ch := waitingList.Front(); ch != nil; ch = waitingList.Front() {
                ch.Value.(chan bool) <- true
                waitingList.Remove(ch)
            }

关于前端是如何请求以及后台模板的响应这里并没有介绍,还有前端的websocket以及长轮询自动发起的请求还需要自己去体会。。。@H_301_1@

好了,就这些了,算是对这个例子的一点认识!!!@H_301_1@

相关文章

程序目录结构 简单实现,用户登录后返回一个jwt的token,下次请求带上token请求用户信息接口并返回信息...
本篇博客的主要内容是用go写一个简单的Proof-of-Work共识机制,不涉及到网络通信环节,只是一个本地的简...
简介 默克尔树(MerkleTree)是一种典型的二叉树结构,其主要特点为: 最下面的叶节点包含存储数据或其...
接下来学习并发编程, 并发编程是go语言最有特色的地方, go对并发编程是原生支持. goroutine是go中最近本...
先普及一下, 什么是广度优先搜索 广度优先搜索类似于树的层次遍历。从图中的某一顶点出发,遍历每一个顶...
第一天: 接口的定义和实现 第二天: 一. go语言是面向接口编程. 在学习继承的时候说过, go语言只有封装,...