Akka + Servlet 3 = Comet

23 апреля 2011 г.

Логотип

Вступление

Не так давно моё внимание привлекла библиотека Akka — реализация модели акторов для Scala и Java. Она мне показалась достаточно интересной, тем более до этого с акторами мне ещё не доводилось работать. И вот я наконец победил лень и сел реализовать на Akka что-нибудь простое и бесполезное. Напрмер, асинхронную обработку http-запросов. К тому же уже давно вышла реализация Servlet 3, которую тоже надо посмотреть. Итак, предлагаю вашему вниманию реализацию на Scala простейшего comet-чата.

Что для этого понадобится

  • Scala — отличный язык программирования, особенно если вы уже долго работаете с Java и вам уже стало скучно. Или вы смотрите roadmap развития Java и вам становится грустно.
  • Akka — реализация акторов для Scala и Java. Позволяет разрабатывать параллельные программы на манер Erlang. Согласно заявлением разработчиков и некоторым тестам является лучшей в мире scala
  • Servlet 3 — последняя на данный момент спецификация сервлетов. В частности поддерживает асинхронную обработку запросов и конфигурирование с помощью аннотаций (можно совсем без web.xml). Я использую Tomcat 7

Приступим

На схеме изображена блок схема компонентов сервера.

Схема

Начнём с центральной и самой простой части нашего чата — актора Chat:

class Chat extends Actor{
  
     val subscribers = scala.collection.mutable.ListBuffer.empty[ActorRef]
  
     protected def receive = {
         case Subscribe(a) =>{
             subscribers += a
         }
         case UnSubscribe(a) =>{
             subscribers -= a
         }
         case m @ ChatMessage(_,_)  =>{
             for(s <- subscribers) s ! m
         }
     }
}

Метод recieve является основным в классе актора. Он взвращает partial функцию (PartialFunction[Any, Unit]), которая будет вызываться при поступлении сообщения. Сообщения — это обычно case классы, которые потом легко разделяются с помощью сопоставления с образцом (pattern matching).

Этот актор хранит у себя список сессий пользователей. Он обрабатывает три типа сообщений: Subscribe и UnSubscribe, с помощью которых акторы сессий пользователей (UserSession) подписываются и отписываются от получения сообщений чата; и ChatMessage — сообщение в чате, которое просто ретранслируется всем участникам чата (включая самого отправителя).

Теперь перейдём к другому концу нашего сервера, сервлету:

@WebServlet(urlPatterns = Array("/comet"), asyncSupported = true, loadOnStartup = 1)
class CometServlet extends HttpServlet{
     override def service(req: HttpServletRequest, resp: HttpServletResponse) {
         req.setCharacterEncoding(«UTF-8»)
         resp.setCharacterEncoding(«UTF-8»)
         resp.setContentType(«application/json»)
         val ctx = req.startAsync(req, resp)
         ctx.setTimeout(1000*60*2)
         RequestDispatcher.instance ! Request(new RequestContext(ctx))
     }
}

Настройки сервлета заданы с помощью аннотации (больше никакого xml). Параметр asyncSupported указывает на то, что данный сервлет может обрабатывать запросы асинхронно. Само начало асинхронной обработки происходит при вызове метода startAsync у запроса. После этого вызова запрос не будет закрыт при завершении метода service, а будет ждать вызова метода complete (у класса AsyncContext, объект которого вернул startAsync) или наступления таймаута обработки запроса. При этом поток, в контексте которого происходил вызов метода service сервлета, не будет ждать завершения обработки запроса, а отправится обратно в пул дожидаться нового запроса.

В нашем случае запрос после небольшой предварительной подготовки отправляется актору RequestDispatcher. Оператор ‘!’ посылает сообщение актору асинхронно, то есть не ждёт результата обработки сообщения.

class RequestDispatcher extends Actor{

    self.faultHandler =  OneForOneStrategy(List(classOf[Exception]), 5, 5000)

    val sessions = collection.mutable.HashMap.empty[String, ActorRef]
    protected def receive = {
        case Request(ctx) => {
            var userId = ctx.param("userId")
            if(userId == null) userId = UUID.randomUUID.toString


            val session = sessions.getOrElseUpdate(userId, {
                val a = Actor.actorOf(new UserSession(userId))
                self.startLink(a)
                a
            })

            ctx.ctx.addListener(new AsyncListener(){
                def onStartAsync(p1: AsyncEvent) {}

                def onError(p1: AsyncEvent) {
                    session ! CloseRequest()
                }

                def onTimeout(p1: AsyncEvent) {
                }

                def onComplete(p1: AsyncEvent) {}
            })

            session ! Request(ctx)

        }

        case CloseSession(userId) => {
            for(s <- sessions.get(userId)) {
                self.unlink(s)
                s.stop()
                sessions -= userId
            }
        }
    }
}
class RequestDispatcher extends Actor{
     val sessions = collection.mutable.HashMap.empty[String, ActorRef]
     protected def receive = {
         case Request(ctx) => {
             var userId = ctx.param(«userId»)
             if(userId == null) userId = UUID.randomUUID.toString
             val session = sessions.getOrElseUpdate(userId, {
                 val a = Actor.actorOf(new UserSession(userId))
                 self.startLink(a)
                 a
             })
             session ! Request(ctx)
         }
     }
}
class UserSession(val userId:String) extends Actor{
    implicit val formats = Serialization.formats(NoTypeHints)

    var waitRequest:Option[RequestContext] = None
    var filters:Seq[String] = Nil
    var messagesQueue = List.empty[ChatMessage]
    var lastAccess:Long = System.currentTimeMillis()

    val okResponse = write(Map("ok"->true))
    val timeout = 1000*60*10L

    protected def receive = {
        case Request(ctx) => {
            lastAccess = System.currentTimeMillis()
            val action = ctx.param("action")
            action match {
                case "sendMessage" => {
                    val msg = ctx.param("message")
                    Chat.instance ! ChatMessage(prepareMessage(msg))
                    ctx.writeAndComplete(okResponse)
                }
                case "changeFilter" => {
                    val filterString = ctx.param("filter")
                    val newFilters = filterString.split(' ') .map(_.trim)
                        .filter(s => s.startsWith("#"))
                    filters = newFilters
                    ctx.writeAndComplete(okResponse)
                }
                case "getMessages" =>{
                    closeLastRequest()
                    if(messagesQueue.isEmpty){
                        waitRequest = Some(ctx)
                    } else{
                        sendMessagesToClient(ctx)
                    }
                }
                case "getUserId" => {
                    val json: String = write(Map("ok" -> true, "userId" -> userId))
                    ctx.writeAndComplete(json)
                }
            }
        }

        case msg @ ChatMessage(_,_) => {
            if(checkFilter(msg)){
                messagesQueue = msg :: messagesQueue
                if(waitRequest.isDefined){
                    sendMessagesToClient(waitRequest.get)
                    waitRequest = None
                }
            }
            checkTimeOut()
        }

        case CloseRequest() =>{
            waitRequest = None
        }
    }

    override def preStart {
        Chat.instance ! Subscribe(self)
    }


    override def postStop {
        closeLastRequest()
        Chat.instance ! UnSubscribe(self)
    }


    override def preRestart(reason: Throwable) {
        postStop
    }

    private def closeLastRequest() {
        try {
            waitRequest.foreach(r => {
                r.writeAndComplete(write((Map("ok" -> false))))
            })
        }
        catch {
            case _ => {}
        }
        waitRequest = None
    }

    private def sendMessagesToClient(ctx:RequestContext){
        val msgs =  messagesQueue.reverse.map(m => Map("msg" -> m.msg, "time" -> m.time))
        val response = Map("ok" -> true,
                            "messages"-> msgs)

        var json = write(response)
        ctx.writeAndComplete(json)
        messagesQueue = List.empty[ChatMessage]
    }

    private def prepareMessage(m:String):String = {
        val m1 = m.replaceAll("\n", "\\\\n")
        Jsoup.parse(m1).text().take(200)
    }

    private def checkFilter(msg:ChatMessage) = {
        if(filters.isEmpty) true
        else filters.exists(f => msg.tags.contains(f))
    }

    private def checkTimeOut(){
        if(System.currentTimeMillis - lastAccess > timeout)
            RequestDispatcher.instance ! CloseSession(userId)
    }

}
class UserSession(val userId:String) extends Actor{
     var waitRequest:Option[RequestContext] = None
     var messagesQueue = List.empty[ChatMessage]
  
     protected def receive = {
         case Request(ctx) => {
             val action = ctx.param(«action»)
             action match {
                 case «sendMessage» => {
                     val msg = ctx.param(«message»)
                     Chat.instance ! ChatMessage(prepareMessage(msg))
                     ctx.writeAndComplete(okResponse)
                 }
                 case «getMessages» =>{
                     closeLastRequest()
                     if(messagesQueue.isEmpty){
                         waitRequest = Some(ctx)
                     } else{
                         sendMessagesToClient(ctx)
                     }
                 }
             }
         }
         case msg @ ChatMessage(_,_) => {
                 messagesQueue = msg :: messagesQueue
                 if(waitRequest.isDefined){
                     sendMessagesToClient(waitRequest.get)
                     waitRequest = None
                 }
         }
         case CloseRequest() =>{
             waitRequest = None
         }
     }
     override def preStart {
         Chat.instance ! Subscribe(self)
     }
     override def postStop {
         closeLastRequest()
         Chat.instance ! UnSubscribe(self)
     }
}

Для каждого пользователя создаётся отдельный экземпляр UserSession. Для получения сообщений клиент отправляет запрос getMessages, который, если очередь неотправленных сообщений не пуста, сразу обрабатывается; если сообщений нет, то запрос сохраняется и ожидает их поступления. Таким образом от клиента почти всё время есть ожидающий запрос. Отправка сообщений производится отдельным post-запросом. Данные на клиент передаются в виде json. В качестве серализатора использую lift-json(кусок веб-фреймворка для scala — lift).

На клиенте всё предельно просто. Запрос сообщений с помощью jQuery.ajax происходит примерно так

$.ajax({
     data:{
         userId:userId,
         action:«getMessages»
     },
     success:function(res){
         //показ сообщений
         delayedReceiveMessages(10);
     },
     error:function(){
         delayedReceiveMessages(1000);
     }
})

Результат

Всё что получилось разместил здесь. Развёрнуто на бесплатном микроинстансе Amazon EC2.
Субъективно, разрабатвывать с использованием акторов оказалось довольно приятно. Отсутствие общего состояния очень облегчает жизнь.

Ни на что не претенндующие замеры показали, что на 10 ожидающих соединений расходуется 1 Mb памяти кучи (heap). Эта память почти полностью расходуется на содержание соединения томкатом. Потребность акторов на порядок-два меньше. На мой взгляд для comet-сервера это много.

PS Для заинтересовавшихся Akka и владеющими английским советую посмотреть доклад на Scala Days 2010 и ознакомится с официальным руководством.
Те, кому нужен настоящий comet-сервер для java/scala (а не этот велосипед), предлагаю поискать что-то специализированное, например Atmpsphere.

Теги: рубрика Java