April 26th, 2017 감정 분석 최근 트윗 정보를 가져와 3가지 파일(긍정적, 부정적, 쓸모없는 단어들)로 구성된 단어를 통해 정제를 하여 정제된(감정적인 단어가 사용된 문장) 정보를 추출 한다. AWS S3 S3를 통해 추츨한 정보를 적재한다. AWS S3 감정적 단어들 감정 분석 코드 import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SentimentScore extends App{ import Utils._ val conf = new SparkConf() .setAppName("spark-twitter-stream-example") .setMaster("local[*]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, batchDuration = Seconds(10)) // DStream[Status] 생성 val tweets = TwitterUtils.createStream(ssc, None) // 각 파일을 읽어 broadcast 변수에 담는다. val uselessWords = sc.broadcast(load("/stop-words.dat")) val positiveWords = sc.broadcast(load("/pos-words.dat")) val negativeWords = sc.broadcast(load("/neg-words.dat")) // 튜플타입으로 text와 공백을 제거한 Sentence 생성 val textAndSentences: DStream[(TweetText, Sentence)] = tweets .map(_.getText) .map(tweetText => (tweetText, wordsOf(tweetText))) // Sentence 데이터 정제 => .dat에 명시된 단어를 얻기 위해서 val textAndMeaningfulSentences: DStream[(TweetText, Sentence)] = textAndSentences .mapValues(toLowercase) .mapValues(keepActualWords) .mapValues(words => keepMeaningfulWords(words, uselessWords.value)) .filter { case (_, sentence) => sentence.length > 0 } // xx.dat에 명시된 positive, negative words에 대해 각 합을 구한다. val textAndNonNeutralScore: DStream[(TweetText, Int)] = textAndMeaningfulSentences .mapValues(sentence => computeScore(sentence, positiveWords.value, negativeWords.value)) .filter { case (_, score) => score != 0 } // textAndNonNeutralScore.map(makeReadable).print // negativeWords의 스코어(합)가 높은 순서대로 정렬을 한다. val tweetscores = textAndNonNeutralScore .map{case (tweetText, score) => (score, tweetText)} .transform(_.sortByKey(true)) .map{case (score, tweetText) => (tweetText, score)} .map(makeReadable) // 출력 tweetscores.print // s3에 저장 tweetscores.saveAsTextFiles("s3n://path") ssc.start() ssc.awaitTermination() } 유틸리티 코드 import twitter4j.Status import scala.io.{AnsiColor, Source} object Utils { // type alias을 통해 타입을 명시해준다. => type safe에 좋다. type Tweet = Status type TweetText = String type Sentence = Seq[String] private def format(n: Int): String = f"$n%2d" private def wrapScore(s: String): String = s"[ $s ] " private def makeReadable(n: Int): String = if (n > 0) s"${AnsiColor.GREEN + format(n) + AnsiColor.RESET}" else if (n < 0) s"${AnsiColor.RED + format(n) + AnsiColor.RESET}" else s"${format(n)}" private def makeReadable(s: String): String = s.takeWhile(_ != '\n').take(80) + "..." def makeReadable(sn: (String, Int)): String = sn match { case (tweetText, score) => s"${wrapScore(makeReadable(score))}${makeReadable(tweetText)}" } def load(resourcePath: String): Set[String] = { val source = Source.fromInputStream(getClass.getResourceAsStream(resourcePath)) val words = source.getLines.toSet source.close() words } def wordsOf(tweet: TweetText): Sentence = tweet.split(" ") def toLowercase(sentence: Sentence): Sentence = sentence.map(_.toLowerCase) def keepActualWords(sentence: Sentence): Sentence = sentence.filter(_.matches("[a-z]+")) def extractWords(sentence: Sentence): Sentence = sentence.map(_.toLowerCase).filter(_.matches("[a-z]+")) def keepMeaningfulWords(sentence: Sentence, uselessWords: Set[String]): Sentence = sentence.filterNot(word => uselessWords.contains(word)) def computeScore(words: Sentence, positiveWords: Set[String], negativeWords: Set[String]): Int = words.map(word => computeWordScore(word, positiveWords, negativeWords)).sum def computeWordScore(word: String, positiveWords: Set[String], negativeWords: Set[String]): Int = if (positiveWords.contains(word)) 1 else if (negativeWords.contains(word)) -1 else 0 } 추출 결과 S3 저장 회고 적재된 S3파일을 활용하여 dataFrame을 통해 분석을 하여 추가적인 분석을 해봐야겠다. general (10) , demo (10) demo (10) , dbyll (10) , dbtek (10) Share Post Twitter Facebook Google+ SoftWare Developer IT 기술 정리 및 공유를 위한 블로그 입니다. ← Previous Next →