Akka + Servlet 3 = Comet

Вступление
Не так давно моё внимание привлекла библиотека Akka — реализация модели акторов для Scala и Java. Она мне показалась достаточно интересной, тем более до этого с акторами мне ещё не доводилось работать. И вот я наконец победил лень и сел реализовать на Akka что-нибудь простое и бесполезное. Напрмер, асинхронную обработку http-запросов. К тому же уже давно вышла реализация Servlet 3, которую тоже надо посмотреть. Итак, предлагаю вашему вниманию реализацию на Scala простейшего comet-чата.
Что для этого понадобится
- Scala — отличный язык программирования, особенно если вы уже долго работаете с Java и вам уже стало скучно. Или вы смотрите roadmap развития Java и вам становится грустно.
- Akka — реализация акторов для Scala и Java. Позволяет разрабатывать параллельные программы на манер Erlang. Согласно заявлением разработчиков и некоторым тестам является лучшей в мире scala
- Servlet 3 — последняя на данный момент спецификация сервлетов. В частности поддерживает асинхронную обработку запросов и конфигурирование с помощью аннотаций (можно совсем без web.xml). Я использую Tomcat 7
Приступим
На схеме изображена блок схема компонентов сервера.
Начнём с центральной и самой простой части нашего чата — актора Chat:
class Chat extends Actor{
val subscribers = scala.collection.mutable.ListBuffer.empty[ActorRef]
protected def receive = {
case Subscribe(a) =>{
subscribers += a
}
case UnSubscribe(a) =>{
subscribers -= a
}
case m @ ChatMessage(_,_) =>{
for(s <- subscribers) s ! m
}
}
}
Метод recieve является основным в классе актора. Он взвращает partial функцию (PartialFunction[Any, Unit]), которая будет вызываться при поступлении сообщения. Сообщения — это обычно case классы, которые потом легко разделяются с помощью сопоставления с образцом (pattern matching).
Этот актор хранит у себя список сессий пользователей. Он обрабатывает три типа сообщений: Subscribe и UnSubscribe, с помощью которых акторы сессий пользователей (UserSession) подписываются и отписываются от получения сообщений чата; и ChatMessage — сообщение в чате, которое просто ретранслируется всем участникам чата (включая самого отправителя).
Теперь перейдём к другому концу нашего сервера, сервлету:
@WebServlet(urlPatterns = Array("/comet"), asyncSupported = true, loadOnStartup = 1)
class CometServlet extends HttpServlet{
override def service(req: HttpServletRequest, resp: HttpServletResponse) {
req.setCharacterEncoding(«UTF-8»)
resp.setCharacterEncoding(«UTF-8»)
resp.setContentType(«application/json»)
val ctx = req.startAsync(req, resp)
ctx.setTimeout(1000*60*2)
RequestDispatcher.instance ! Request(new RequestContext(ctx))
}
}
Настройки сервлета заданы с помощью аннотации (больше никакого xml). Параметр asyncSupported указывает на то, что данный сервлет может обрабатывать запросы асинхронно. Само начало асинхронной обработки происходит при вызове метода startAsync у запроса. После этого вызова запрос не будет закрыт при завершении метода service, а будет ждать вызова метода complete (у класса AsyncContext, объект которого вернул startAsync) или наступления таймаута обработки запроса. При этом поток, в контексте которого происходил вызов метода service сервлета, не будет ждать завершения обработки запроса, а отправится обратно в пул дожидаться нового запроса.
В нашем случае запрос после небольшой предварительной подготовки отправляется актору RequestDispatcher. Оператор ‘!’ посылает сообщение актору асинхронно, то есть не ждёт результата обработки сообщения.
class RequestDispatcher extends Actor{
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
val sessions = collection.mutable.HashMap.empty[String, ActorRef]
protected def receive = {
case Request(ctx) => {
var userId = ctx.param("userId")
if(userId == null) userId = UUID.randomUUID.toString
val session = sessions.getOrElseUpdate(userId, {
val a = Actor.actorOf(new UserSession(userId))
self.startLink(a)
a
})
ctx.ctx.addListener(new AsyncListener(){
def onStartAsync(p1: AsyncEvent) {}
def onError(p1: AsyncEvent) {
session ! CloseRequest()
}
def onTimeout(p1: AsyncEvent) {
}
def onComplete(p1: AsyncEvent) {}
})
session ! Request(ctx)
}
case CloseSession(userId) => {
for(s <- sessions.get(userId)) {
self.unlink(s)
s.stop()
sessions -= userId
}
}
}
}
class RequestDispatcher extends Actor{
val sessions = collection.mutable.HashMap.empty[String, ActorRef]
protected def receive = {
case Request(ctx) => {
var userId = ctx.param(«userId»)
if(userId == null) userId = UUID.randomUUID.toString
val session = sessions.getOrElseUpdate(userId, {
val a = Actor.actorOf(new UserSession(userId))
self.startLink(a)
a
})
session ! Request(ctx)
}
}
}
class UserSession(val userId:String) extends Actor{
implicit val formats = Serialization.formats(NoTypeHints)
var waitRequest:Option[RequestContext] = None
var filters:Seq[String] = Nil
var messagesQueue = List.empty[ChatMessage]
var lastAccess:Long = System.currentTimeMillis()
val okResponse = write(Map("ok"->true))
val timeout = 1000*60*10L
protected def receive = {
case Request(ctx) => {
lastAccess = System.currentTimeMillis()
val action = ctx.param("action")
action match {
case "sendMessage" => {
val msg = ctx.param("message")
Chat.instance ! ChatMessage(prepareMessage(msg))
ctx.writeAndComplete(okResponse)
}
case "changeFilter" => {
val filterString = ctx.param("filter")
val newFilters = filterString.split(' ') .map(_.trim)
.filter(s => s.startsWith("#"))
filters = newFilters
ctx.writeAndComplete(okResponse)
}
case "getMessages" =>{
closeLastRequest()
if(messagesQueue.isEmpty){
waitRequest = Some(ctx)
} else{
sendMessagesToClient(ctx)
}
}
case "getUserId" => {
val json: String = write(Map("ok" -> true, "userId" -> userId))
ctx.writeAndComplete(json)
}
}
}
case msg @ ChatMessage(_,_) => {
if(checkFilter(msg)){
messagesQueue = msg :: messagesQueue
if(waitRequest.isDefined){
sendMessagesToClient(waitRequest.get)
waitRequest = None
}
}
checkTimeOut()
}
case CloseRequest() =>{
waitRequest = None
}
}
override def preStart {
Chat.instance ! Subscribe(self)
}
override def postStop {
closeLastRequest()
Chat.instance ! UnSubscribe(self)
}
override def preRestart(reason: Throwable) {
postStop
}
private def closeLastRequest() {
try {
waitRequest.foreach(r => {
r.writeAndComplete(write((Map("ok" -> false))))
})
}
catch {
case _ => {}
}
waitRequest = None
}
private def sendMessagesToClient(ctx:RequestContext){
val msgs = messagesQueue.reverse.map(m => Map("msg" -> m.msg, "time" -> m.time))
val response = Map("ok" -> true,
"messages"-> msgs)
var json = write(response)
ctx.writeAndComplete(json)
messagesQueue = List.empty[ChatMessage]
}
private def prepareMessage(m:String):String = {
val m1 = m.replaceAll("\n", "\\\\n")
Jsoup.parse(m1).text().take(200)
}
private def checkFilter(msg:ChatMessage) = {
if(filters.isEmpty) true
else filters.exists(f => msg.tags.contains(f))
}
private def checkTimeOut(){
if(System.currentTimeMillis - lastAccess > timeout)
RequestDispatcher.instance ! CloseSession(userId)
}
}
class UserSession(val userId:String) extends Actor{
var waitRequest:Option[RequestContext] = None
var messagesQueue = List.empty[ChatMessage]
protected def receive = {
case Request(ctx) => {
val action = ctx.param(«action»)
action match {
case «sendMessage» => {
val msg = ctx.param(«message»)
Chat.instance ! ChatMessage(prepareMessage(msg))
ctx.writeAndComplete(okResponse)
}
case «getMessages» =>{
closeLastRequest()
if(messagesQueue.isEmpty){
waitRequest = Some(ctx)
} else{
sendMessagesToClient(ctx)
}
}
}
}
case msg @ ChatMessage(_,_) => {
messagesQueue = msg :: messagesQueue
if(waitRequest.isDefined){
sendMessagesToClient(waitRequest.get)
waitRequest = None
}
}
case CloseRequest() =>{
waitRequest = None
}
}
override def preStart {
Chat.instance ! Subscribe(self)
}
override def postStop {
closeLastRequest()
Chat.instance ! UnSubscribe(self)
}
}
Для каждого пользователя создаётся отдельный экземпляр UserSession. Для получения сообщений клиент отправляет запрос getMessages, который, если очередь неотправленных сообщений не пуста, сразу обрабатывается; если сообщений нет, то запрос сохраняется и ожидает их поступления. Таким образом от клиента почти всё время есть ожидающий запрос. Отправка сообщений производится отдельным post-запросом. Данные на клиент передаются в виде json. В качестве серализатора использую lift-json(кусок веб-фреймворка для scala — lift).
На клиенте всё предельно просто. Запрос сообщений с помощью jQuery.ajax происходит примерно так
$.ajax({
data:{
userId:userId,
action:«getMessages»
},
success:function(res){
//показ сообщений
delayedReceiveMessages(10);
},
error:function(){
delayedReceiveMessages(1000);
}
})
Результат
Всё что получилось разместил здесь. Развёрнуто на бесплатном микроинстансе Amazon EC2.
Субъективно, разрабатвывать с использованием акторов оказалось довольно приятно. Отсутствие общего состояния очень облегчает жизнь.
Ни на что не претенндующие замеры показали, что на 10 ожидающих соединений расходуется 1 Mb памяти кучи (heap). Эта память почти полностью расходуется на содержание соединения томкатом. Потребность акторов на порядок-два меньше. На мой взгляд для comet-сервера это много.
PS Для заинтересовавшихся Akka и владеющими английским советую посмотреть доклад на Scala Days 2010 и ознакомится с официальным руководством.
Те, кому нужен настоящий comet-сервер для java/scala (а не этот велосипед), предлагаю поискать что-то специализированное, например Atmpsphere.