Akka + Servlet 3 = Comet
Вступление
Не так давно моё внимание привлекла библиотека Akka — реализация модели акторов для Scala и Java. Она мне показалась достаточно интересной, тем более до этого с акторами мне ещё не доводилось работать. И вот я наконец победил лень и сел реализовать на Akka что-нибудь простое и бесполезное. Напрмер, асинхронную обработку http-запросов. К тому же уже давно вышла реализация Servlet 3, которую тоже надо посмотреть. Итак, предлагаю вашему вниманию реализацию на Scala простейшего comet-чата.
Что для этого понадобится
- Scala — отличный язык программирования, особенно если вы уже долго работаете с Java и вам уже стало скучно. Или вы смотрите roadmap развития Java и вам становится грустно.
- Akka — реализация акторов для Scala и Java. Позволяет разрабатывать параллельные программы на манер Erlang. Согласно заявлением разработчиков и некоторым тестам является лучшей в мире scala
- Servlet 3 — последняя на данный момент спецификация сервлетов. В частности поддерживает асинхронную обработку запросов и конфигурирование с помощью аннотаций (можно совсем без web.xml). Я использую Tomcat 7
Приступим
На схеме изображена блок схема компонентов сервера.
Начнём с центральной и самой простой части нашего чата — актора Chat:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 | class Chat extends Actor{ val subscribers = scala.collection.mutable.ListBuffer.empty[ActorRef] protected def receive = { case Subscribe(a) = >{ subscribers + = a } case UnSubscribe(a) = >{ subscribers - = a } case m @ ChatMessage( _ , _ ) = >{ for (s <- subscribers) s ! m } } } |
Метод recieve является основным в классе актора. Он взвращает partial функцию (PartialFunction[Any, Unit]), которая будет вызываться при поступлении сообщения. Сообщения — это обычно case классы, которые потом легко разделяются с помощью сопоставления с образцом (pattern matching).
Этот актор хранит у себя список сессий пользователей. Он обрабатывает три типа сообщений: Subscribe и UnSubscribe, с помощью которых акторы сессий пользователей (UserSession) подписываются и отписываются от получения сообщений чата; и ChatMessage — сообщение в чате, которое просто ретранслируется всем участникам чата (включая самого отправителя).
Теперь перейдём к другому концу нашего сервера, сервлету:
01 02 03 04 05 06 07 08 09 10 11 | @ WebServlet(urlPatterns = Array( "/comet" ), asyncSupported = true , loadOnStartup = 1 ) class CometServlet extends HttpServlet{ override def service(req : HttpServletRequest, resp : HttpServletResponse) { req.setCharacterEncoding(«UTF- 8 ») resp.setCharacterEncoding(«UTF- 8 ») resp.setContentType(«application/json») val ctx = req.startAsync(req, resp) ctx.setTimeout( 1000 * 60 * 2 ) RequestDispatcher.instance ! Request( new RequestContext(ctx)) } } |
Настройки сервлета заданы с помощью аннотации (больше никакого xml). Параметр asyncSupported указывает на то, что данный сервлет может обрабатывать запросы асинхронно. Само начало асинхронной обработки происходит при вызове метода startAsync у запроса. После этого вызова запрос не будет закрыт при завершении метода service, а будет ждать вызова метода complete (у класса AsyncContext, объект которого вернул startAsync) или наступления таймаута обработки запроса. При этом поток, в контексте которого происходил вызов метода service сервлета, не будет ждать завершения обработки запроса, а отправится обратно в пул дожидаться нового запроса.
В нашем случае запрос после небольшой предварительной подготовки отправляется актору RequestDispatcher. Оператор ‘!’ посылает сообщение актору асинхронно, то есть не ждёт результата обработки сообщения.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | class RequestDispatcher extends Actor{ self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5 , 5000 ) val sessions = collection.mutable.HashMap.empty[String, ActorRef] protected def receive = { case Request(ctx) = > { var userId = ctx.param( "userId" ) if (userId == null ) userId = UUID.randomUUID.toString val session = sessions.getOrElseUpdate(userId, { val a = Actor.actorOf( new UserSession(userId)) self.startLink(a) a }) ctx.ctx.addListener( new AsyncListener(){ def onStartAsync(p 1 : AsyncEvent) {} def onError(p 1 : AsyncEvent) { session ! CloseRequest() } def onTimeout(p 1 : AsyncEvent) { } def onComplete(p 1 : AsyncEvent) {} }) session ! Request(ctx) } case CloseSession(userId) = > { for (s <- sessions.get(userId)) { self.unlink(s) s.stop() sessions - = userId } } } } |
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | class RequestDispatcher extends Actor{ val sessions = collection.mutable.HashMap.empty[String, ActorRef] protected def receive = { case Request(ctx) = > { var userId = ctx.param(«userId») if (userId == null ) userId = UUID.randomUUID.toString val session = sessions.getOrElseUpdate(userId, { val a = Actor.actorOf( new UserSession(userId)) self.startLink(a) a }) session ! Request(ctx) } } } |
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 | class UserSession( val userId : String) extends Actor{ implicit val formats = Serialization.formats(NoTypeHints) var waitRequest : Option[RequestContext] = None var filters : Seq[String] = Nil var messagesQueue = List.empty[ChatMessage] var lastAccess : Long = System.currentTimeMillis() val okResponse = write(Map( "ok" -> true )) val timeout = 1000 * 60 * 10 L protected def receive = { case Request(ctx) = > { lastAccess = System.currentTimeMillis() val action = ctx.param( "action" ) action match { case "sendMessage" = > { val msg = ctx.param( "message" ) Chat.instance ! ChatMessage(prepareMessage(msg)) ctx.writeAndComplete(okResponse) } case "changeFilter" = > { val filterString = ctx.param( "filter" ) val newFilters = filterString.split( ' ' ) .map( _ .trim) .filter(s = > s.startsWith( "#" )) filters = newFilters ctx.writeAndComplete(okResponse) } case "getMessages" = >{ closeLastRequest() if (messagesQueue.isEmpty){ waitRequest = Some(ctx) } else { sendMessagesToClient(ctx) } } case "getUserId" = > { val json : String = write(Map( "ok" -> true , "userId" -> userId)) ctx.writeAndComplete(json) } } } case msg @ ChatMessage( _ , _ ) = > { if (checkFilter(msg)){ messagesQueue = msg :: messagesQueue if (waitRequest.isDefined){ sendMessagesToClient(waitRequest.get) waitRequest = None } } checkTimeOut() } case CloseRequest() = >{ waitRequest = None } } override def preStart { Chat.instance ! Subscribe(self) } override def postStop { closeLastRequest() Chat.instance ! UnSubscribe(self) } override def preRestart(reason : Throwable) { postStop } private def closeLastRequest() { try { waitRequest.foreach(r = > { r.writeAndComplete(write((Map( "ok" -> false )))) }) } catch { case _ = > {} } waitRequest = None } private def sendMessagesToClient(ctx : RequestContext){ val msgs = messagesQueue.reverse.map(m = > Map( "msg" -> m.msg, "time" -> m.time)) val response = Map( "ok" -> true , "messages" -> msgs) var json = write(response) ctx.writeAndComplete(json) messagesQueue = List.empty[ChatMessage] } private def prepareMessage(m : String) : String = { val m 1 = m.replaceAll( "\n" , "\\\\n" ) Jsoup.parse(m 1 ).text().take( 200 ) } private def checkFilter(msg : ChatMessage) = { if (filters.isEmpty) true else filters.exists(f = > msg.tags.contains(f)) } private def checkTimeOut(){ if (System.currentTimeMillis - lastAccess > timeout) RequestDispatcher.instance ! CloseSession(userId) } } |
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | class UserSession( val userId : String) extends Actor{ var waitRequest : Option[RequestContext] = None var messagesQueue = List.empty[ChatMessage] protected def receive = { case Request(ctx) = > { val action = ctx.param(«action») action match { case «sendMessage» = > { val msg = ctx.param(«message») Chat.instance ! ChatMessage(prepareMessage(msg)) ctx.writeAndComplete(okResponse) } case «getMessages» = >{ closeLastRequest() if (messagesQueue.isEmpty){ waitRequest = Some(ctx) } else { sendMessagesToClient(ctx) } } } } case msg @ ChatMessage( _ , _ ) = > { messagesQueue = msg :: messagesQueue if (waitRequest.isDefined){ sendMessagesToClient(waitRequest.get) waitRequest = None } } case CloseRequest() = >{ waitRequest = None } } override def preStart { Chat.instance ! Subscribe(self) } override def postStop { closeLastRequest() Chat.instance ! UnSubscribe(self) } } |
Для каждого пользователя создаётся отдельный экземпляр UserSession. Для получения сообщений клиент отправляет запрос getMessages, который, если очередь неотправленных сообщений не пуста, сразу обрабатывается; если сообщений нет, то запрос сохраняется и ожидает их поступления. Таким образом от клиента почти всё время есть ожидающий запрос. Отправка сообщений производится отдельным post-запросом. Данные на клиент передаются в виде json. В качестве серализатора использую lift-json(кусок веб-фреймворка для scala — lift).
На клиенте всё предельно просто. Запрос сообщений с помощью jQuery.ajax происходит примерно так
01 02 03 04 05 06 07 08 09 10 11 12 13 | $.ajax({ data : { userId : userId, action : «getMessages» }, success : function(res){ //показ сообщений delayedReceiveMessages( 10 ); }, error : function(){ delayedReceiveMessages( 1000 ); } }) |
Результат
Всё что получилось разместил здесь. Развёрнуто на бесплатном микроинстансе Amazon EC2.
Субъективно, разрабатвывать с использованием акторов оказалось довольно приятно. Отсутствие общего состояния очень облегчает жизнь.
Ни на что не претенндующие замеры показали, что на 10 ожидающих соединений расходуется 1 Mb памяти кучи (heap). Эта память почти полностью расходуется на содержание соединения томкатом. Потребность акторов на порядок-два меньше. На мой взгляд для comet-сервера это много.
PS Для заинтересовавшихся Akka и владеющими английским советую посмотреть доклад на Scala Days 2010 и ознакомится с официальным руководством.
Те, кому нужен настоящий comet-сервер для java/scala (а не этот велосипед), предлагаю поискать что-то специализированное, например Atmpsphere.