package com.kute.akka.mapreduce
/**
*
* akka mapreduce 示例
*
* 目的:
* 输入3个字符串(代表要执行的三个子任务), 最后将任务结果汇总, 统计 每个单词出现的次数
*
* 过程:
* 1. 对每条消息(子任务) 进行单词分隔初始化计数为1(中间可以作其他操作,例如 过滤不需要统计的单词等 map操作)
* 2. 对每条消息(字符串) 进行单词频次累加(每个字符串中重复出现的单词次数累加-reduce)
* 3. 上面两步 是计算出了单个任务中单词的频次, 接下来再把 三个任务进行汇总-reduce
* 4. 结束, 这就是 map-reduce(总任务划分为子任务, 子任务 进行映射计算等, 最后汇总结果返回)
*
* @author kute
* @time 2016-01-23
*/
import java.util.StringTokenizer
import akka.actor._
import scala.collection.immutable._
class Word(val word: String, val count: Int)
case class Result()
class MapData(val dataList: List[Word])
class ReduceData(val reduceDataMap: Map[String, Int])
object MapReduceApplication extends App {
val _system = ActorSystem("MapReduceApp")
val master = _system.actorOf(Props[MasterActor], name = "master")
println("Scala begin!")
// 发送 三条消息(每条代表一个字子任务)
master ! "The quick brown fox tried to jump over the lazy dog and fell on the dog"
master ! "Dog is man's best friend"
master ! "Dog and Fox belong to the same family"
Thread.sleep(500)
master ! new Result
Thread.sleep(500)
_system.shutdown
println("Scala done!")
}
class MasterActor extends Actor with ActorLogging{
val aggregateActor: ActorRef = context.actorOf(Props[AggregateActor], name = "aggregate")
val reduceActor: ActorRef = context.actorOf(Props(classOf[ReduceActor], aggregateActor), name = "reduce")
val mapActor: ActorRef = context.actorOf(Props(classOf[MapActor], reduceActor), name = "map")
var tasksCount: Int = _
def receive: Receive = {
case message: String => {
tasksCount = tasksCount + 1
log.info(s"map-task-${tasksCount} begin exec msg:[${message}}]")
mapActor ! (message, tasksCount)
}
case message: Result => {
log.info("print result....")
aggregateActor ! message
}
}
}
/**
* 过滤,映射
* @param reduceActor
*/
class MapActor(reduceActor: ActorRef) extends Actor with ActorLogging{
val STOP_WORDS_LIST = List("a", "am", "an", "and", "are", "as", "at", "be",
"do", "go", "if", "in", "is", "it", "of", "on", "the", "to")
def receive: Receive = {
case (message: String, taskCount: Int) => {
log.info(s"map-task-${taskCount} begin map....")
reduceActor ! (evaluateExpression(message), taskCount)
}
}
/**
* 将 每条消息 按单词分隔,初始化计数为1
* @param line
* @return
*/
def evaluateExpression(line: String): MapData = {
var dataList = List[Word]()
val parser: StringTokenizer = new StringTokenizer(line)
while (parser.hasMoreTokens()) {
val word: String = parser.nextToken().toLowerCase()
if (!STOP_WORDS_LIST.contains(word)) {
dataList = new Word(word, 1) :: dataList
}
}
return new MapData(dataList)
}
}
/**
* 单map-task计算
* 将单条消息中的单词计数进行汇总
* @param aggregateActor
*/
class ReduceActor(aggregateActor: ActorRef) extends Actor with ActorLogging{
def receive: Receive = {
case (message: MapData, taskCount: Int) => {
log.info(s"map-task-${taskCount} begin reduce....")
aggregateActor ! (reduce(message.dataList), taskCount)
}
}
def reduce(dataList: List[Word]): ReduceData = {
var reducedMap = new HashMap[String, Int]
for (wc: Word <- dataList) {
val word: String = wc.word
if (reducedMap.contains(word)) {
val count: Int = reducedMap.get(word).get + wc.count
reducedMap += word -> count
} else {
reducedMap += word -> wc.count
}
}
return new ReduceData(reducedMap)
}
}
/**
* 汇总所有的map-task
*/
class AggregateActor extends Actor with ActorLogging{
var finalReducedMap = new HashMap[String, Int]
def receive: Receive = {
case (message: ReduceData, taskCount: Int) => {
log.info(s"receive reduce map-task-${taskCount} and to cal-sum")
if(taskCount == 3) {
log.info("tasks over,begin cal sum")
}
aggregateInMemoryReduce(message.reduceDataMap)
}
case message: Result =>
println(finalReducedMap.toString())
}
def aggregateInMemoryReduce(reducedMap: Map[String, Int]) {
var count: Int = 0
reducedMap.foreach((entry: (String, Int)) =>
if (finalReducedMap.contains(entry._1)) {
count = entry._2 + finalReducedMap.get(entry._1).getOrElse(0)
finalReducedMap += entry._1 -> count
} else
finalReducedMap += entry._1 -> entry._2)
}
}