Akka + Servlet 3 = Comet
Вступление
Не так давно моё внимание привлекла библиотека Akka — реализация модели акторов для Scala и Java. Она мне показалась достаточно интересной, тем более до этого с акторами мне ещё не доводилось работать. И вот я наконец победил лень и сел реализовать на Akka что-нибудь простое и бесполезное. Напрмер, асинхронную обработку http-запросов. К тому же уже давно вышла реализация Servlet 3, которую тоже надо посмотреть. Итак, предлагаю вашему вниманию реализацию на Scala простейшего comet-чата.
Что для этого понадобится
- Scala — отличный язык программирования, особенно если вы уже долго работаете с Java и вам уже стало скучно. Или вы смотрите roadmap развития Java и вам становится грустно.
- Akka — реализация акторов для Scala и Java. Позволяет разрабатывать параллельные программы на манер Erlang. Согласно заявлением разработчиков и некоторым тестам является лучшей в мире scala
- Servlet 3 — последняя на данный момент спецификация сервлетов. В частности поддерживает асинхронную обработку запросов и конфигурирование с помощью аннотаций (можно совсем без web.xml). Я использую Tomcat 7
Приступим
На схеме изображена блок схема компонентов сервера.
Начнём с центральной и самой простой части нашего чата — актора Chat:
class Chat extends Actor{ val subscribers = scala.collection.mutable.ListBuffer.empty[ActorRef] protected def receive = { case Subscribe(a) =>{ subscribers += a } case UnSubscribe(a) =>{ subscribers -= a } case m @ ChatMessage(_,_) =>{ for(s <- subscribers) s ! m } } }
Метод recieve является основным в классе актора. Он взвращает partial функцию (PartialFunction[Any, Unit]), которая будет вызываться при поступлении сообщения. Сообщения — это обычно case классы, которые потом легко разделяются с помощью сопоставления с образцом (pattern matching).
Этот актор хранит у себя список сессий пользователей. Он обрабатывает три типа сообщений: Subscribe и UnSubscribe, с помощью которых акторы сессий пользователей (UserSession) подписываются и отписываются от получения сообщений чата; и ChatMessage — сообщение в чате, которое просто ретранслируется всем участникам чата (включая самого отправителя).
Теперь перейдём к другому концу нашего сервера, сервлету:
@WebServlet(urlPatterns = Array("/comet"), asyncSupported = true, loadOnStartup = 1) class CometServlet extends HttpServlet{ override def service(req: HttpServletRequest, resp: HttpServletResponse) { req.setCharacterEncoding(«UTF-8») resp.setCharacterEncoding(«UTF-8») resp.setContentType(«application/json») val ctx = req.startAsync(req, resp) ctx.setTimeout(1000*60*2) RequestDispatcher.instance ! Request(new RequestContext(ctx)) } }
Настройки сервлета заданы с помощью аннотации (больше никакого xml). Параметр asyncSupported указывает на то, что данный сервлет может обрабатывать запросы асинхронно. Само начало асинхронной обработки происходит при вызове метода startAsync у запроса. После этого вызова запрос не будет закрыт при завершении метода service, а будет ждать вызова метода complete (у класса AsyncContext, объект которого вернул startAsync) или наступления таймаута обработки запроса. При этом поток, в контексте которого происходил вызов метода service сервлета, не будет ждать завершения обработки запроса, а отправится обратно в пул дожидаться нового запроса.
В нашем случае запрос после небольшой предварительной подготовки отправляется актору RequestDispatcher. Оператор ‘!’ посылает сообщение актору асинхронно, то есть не ждёт результата обработки сообщения.
class RequestDispatcher extends Actor{ self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000) val sessions = collection.mutable.HashMap.empty[String, ActorRef] protected def receive = { case Request(ctx) => { var userId = ctx.param("userId") if(userId == null) userId = UUID.randomUUID.toString val session = sessions.getOrElseUpdate(userId, { val a = Actor.actorOf(new UserSession(userId)) self.startLink(a) a }) ctx.ctx.addListener(new AsyncListener(){ def onStartAsync(p1: AsyncEvent) {} def onError(p1: AsyncEvent) { session ! CloseRequest() } def onTimeout(p1: AsyncEvent) { } def onComplete(p1: AsyncEvent) {} }) session ! Request(ctx) } case CloseSession(userId) => { for(s <- sessions.get(userId)) { self.unlink(s) s.stop() sessions -= userId } } } }
class RequestDispatcher extends Actor{ val sessions = collection.mutable.HashMap.empty[String, ActorRef] protected def receive = { case Request(ctx) => { var userId = ctx.param(«userId») if(userId == null) userId = UUID.randomUUID.toString val session = sessions.getOrElseUpdate(userId, { val a = Actor.actorOf(new UserSession(userId)) self.startLink(a) a }) session ! Request(ctx) } } }
class UserSession(val userId:String) extends Actor{ implicit val formats = Serialization.formats(NoTypeHints) var waitRequest:Option[RequestContext] = None var filters:Seq[String] = Nil var messagesQueue = List.empty[ChatMessage] var lastAccess:Long = System.currentTimeMillis() val okResponse = write(Map("ok"->true)) val timeout = 1000*60*10L protected def receive = { case Request(ctx) => { lastAccess = System.currentTimeMillis() val action = ctx.param("action") action match { case "sendMessage" => { val msg = ctx.param("message") Chat.instance ! ChatMessage(prepareMessage(msg)) ctx.writeAndComplete(okResponse) } case "changeFilter" => { val filterString = ctx.param("filter") val newFilters = filterString.split(' ') .map(_.trim) .filter(s => s.startsWith("#")) filters = newFilters ctx.writeAndComplete(okResponse) } case "getMessages" =>{ closeLastRequest() if(messagesQueue.isEmpty){ waitRequest = Some(ctx) } else{ sendMessagesToClient(ctx) } } case "getUserId" => { val json: String = write(Map("ok" -> true, "userId" -> userId)) ctx.writeAndComplete(json) } } } case msg @ ChatMessage(_,_) => { if(checkFilter(msg)){ messagesQueue = msg :: messagesQueue if(waitRequest.isDefined){ sendMessagesToClient(waitRequest.get) waitRequest = None } } checkTimeOut() } case CloseRequest() =>{ waitRequest = None } } override def preStart { Chat.instance ! Subscribe(self) } override def postStop { closeLastRequest() Chat.instance ! UnSubscribe(self) } override def preRestart(reason: Throwable) { postStop } private def closeLastRequest() { try { waitRequest.foreach(r => { r.writeAndComplete(write((Map("ok" -> false)))) }) } catch { case _ => {} } waitRequest = None } private def sendMessagesToClient(ctx:RequestContext){ val msgs = messagesQueue.reverse.map(m => Map("msg" -> m.msg, "time" -> m.time)) val response = Map("ok" -> true, "messages"-> msgs) var json = write(response) ctx.writeAndComplete(json) messagesQueue = List.empty[ChatMessage] } private def prepareMessage(m:String):String = { val m1 = m.replaceAll("\n", "\\\\n") Jsoup.parse(m1).text().take(200) } private def checkFilter(msg:ChatMessage) = { if(filters.isEmpty) true else filters.exists(f => msg.tags.contains(f)) } private def checkTimeOut(){ if(System.currentTimeMillis - lastAccess > timeout) RequestDispatcher.instance ! CloseSession(userId) } }
class UserSession(val userId:String) extends Actor{ var waitRequest:Option[RequestContext] = None var messagesQueue = List.empty[ChatMessage] protected def receive = { case Request(ctx) => { val action = ctx.param(«action») action match { case «sendMessage» => { val msg = ctx.param(«message») Chat.instance ! ChatMessage(prepareMessage(msg)) ctx.writeAndComplete(okResponse) } case «getMessages» =>{ closeLastRequest() if(messagesQueue.isEmpty){ waitRequest = Some(ctx) } else{ sendMessagesToClient(ctx) } } } } case msg @ ChatMessage(_,_) => { messagesQueue = msg :: messagesQueue if(waitRequest.isDefined){ sendMessagesToClient(waitRequest.get) waitRequest = None } } case CloseRequest() =>{ waitRequest = None } } override def preStart { Chat.instance ! Subscribe(self) } override def postStop { closeLastRequest() Chat.instance ! UnSubscribe(self) } }
Для каждого пользователя создаётся отдельный экземпляр UserSession. Для получения сообщений клиент отправляет запрос getMessages, который, если очередь неотправленных сообщений не пуста, сразу обрабатывается; если сообщений нет, то запрос сохраняется и ожидает их поступления. Таким образом от клиента почти всё время есть ожидающий запрос. Отправка сообщений производится отдельным post-запросом. Данные на клиент передаются в виде json. В качестве серализатора использую lift-json(кусок веб-фреймворка для scala — lift).
На клиенте всё предельно просто. Запрос сообщений с помощью jQuery.ajax происходит примерно так
$.ajax({ data:{ userId:userId, action:«getMessages» }, success:function(res){ //показ сообщений delayedReceiveMessages(10); }, error:function(){ delayedReceiveMessages(1000); } })
Результат
Всё что получилось разместил здесь. Развёрнуто на бесплатном микроинстансе Amazon EC2.
Субъективно, разрабатвывать с использованием акторов оказалось довольно приятно. Отсутствие общего состояния очень облегчает жизнь.
Ни на что не претенндующие замеры показали, что на 10 ожидающих соединений расходуется 1 Mb памяти кучи (heap). Эта память почти полностью расходуется на содержание соединения томкатом. Потребность акторов на порядок-два меньше. На мой взгляд для comet-сервера это много.
PS Для заинтересовавшихся Akka и владеющими английским советую посмотреть доклад на Scala Days 2010 и ознакомится с официальным руководством.
Те, кому нужен настоящий comet-сервер для java/scala (а не этот велосипед), предлагаю поискать что-то специализированное, например Atmpsphere.