본문 바로가기
Big Data

몇 번째 이벤트에 반응했는지 확인하는 Scala 코드 예제

by csk 2017. 2. 12.

오늘은 Spark + Zeppelin 상에서 돌아가는 scala 코드 예제를 좀 보여드리려고 합니다. 

이걸 보시면 map reduce의 개념과 로그 데이터 처리를 어떤 식으로 하는지에 대한 대략의 감을 잡으실 수 있을것 같아서요.

코드가 어렵지 않습니다. 포기하지 말고 읽어보세요. :)



아래와 같이 유저별 이벤트 시간과 반응여부 데이터를 가지고 있을때, 각각의 유저별로 몇번째 이벤트에서 처음 반응(O)을 했는지 알아보려고 합니다. 

val test_rdd = sc.makeRDD(List(

    ("user1","2016-07-01 03:03:00","X")

    ,("user1","2016-07-01 15:15:00","O")

    ,("user1","2016-07-01 16:16:00","X")

    ,("user1","2016-07-01 17:00:00","X")

    ,("user1","2016-07-01 00:00:00","X")

    ,("user2","2016-07-01 00:00:00","X")

    ,("user2","2016-07-01 03:02:00","O")

    ,("user3","2016-07-01 05:02:00","X")

    ,("user3","2016-07-01 04:02:00","O")

    ,("user3","2016-07-01 03:02:00","O")

    ))

이렇게 하면 RDD가 만들어 집니다. sc는 SparkContext로 스팍 쉘이 뜨면 자동 제공되는 변수이고, RDD는...까지 설명하면 너무 길어지니 이건 여러 자료를 통해 확인하시길 바래요. 다만 한가지 꼭 부탁하고 싶은건 이렇게 테스트 데이터를 만들고 이걸 돌려보면서 코딩을 하는 것을 아예 습관으로 만드시라는 겁니다.  

test_rdd: org.apache.spark.rdd.RDD[(String, String, String)] = ParallelCollectionRDD[742] at makeRDD at <console>:30

이건 결과 메시지인데요, zeppelin이 spark에서 받아와서 뿌려주기만 하는 겁니다. 스트링 세개가 들어간 RDD가 생성되었다는 말이네요.


val test_map_rdd = test_rdd.map(r => (r._1, (r._2, r._3)))

이제 map 함수를 씁니다. map은 여러가지 변형을 가할 수 있는 가장 기본적인 함수인데, 기본적으로 (key, value) 형태 이어야 하니까 기준이 될 값인 user id 부분 즉 r._1만 키로 만들고 나머지는 다 괄호로 묶어서 value 로 넣어줍니다. 

이렇게 하고 take를 통해 데이터를 가져와서 확인 해봅니다.

test_map_rdd: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[5] at map at <console>:31
res3: Array[(String, (String, String))] = Array((user1,(2016-07-01 03:03:00,X)), (user1,(2016-07-01 15:15:00,O)), (user1,(2016-07-01 16:16:00,X)))

이벤트 시간과 반응여부가 ()로 묶여서 들어간 것 보이시죠?

val test_group_rdd = test_map_rdd.groupByKey

그리고 나서 키를 기준으로 그룹핑을 했네요.

test_group_rdd: org.apache.spark.rdd.RDD[(String, Iterable[(String, String)])] = ShuffledRDD[8] at groupByKey at <console>:33

res6: Array[(String, Iterable[(String, String)])] = Array((user1,CompactBuffer((2016-07-01 00:00:00,X), (2016-07-01 03:03:00,X), (2016-07-01 17:00:00,X), (2016-07-01 15:15:00,O), (2016-07-01 16:16:00,X))), (user2,CompactBuffer((2016-07-01 03:02:00,O), (2016-07-01 00:00:00,X))), (user3,CompactBuffer((2016-07-01 03:02:00,O), (2016-07-01 05:02:00,X), (2016-07-01 04:02:00,O))))

위 결과에서 유저별로 compact buffer라는 타입으로 value 부분이 묶인것이 보이실거에요.

val test_group_sort_tdd = test_group_rdd.map{r=>

    val newValue = r._2.toList.sortBy(_._1)

    (r._1, newValue)

}

자 이제는 user 별로 그룹핑 된 내용에서 시간순으로 정렬해주어야 하는데요, 여기서 user 내에서! 정렬한다는 것이 중요한거죠. 전체에 대해서 정렬하라고 하면 다시한번 map을 써서 시간 필드를 키로 보낸뒤 sortByKey를 하면 간단하겠지만 user 내에서의 정렬이기 때문에 고민이 필요합니다. 그래서 두번째 줄을 보면 map 안에서 r._2를 List형태로 만든 뒤 그안에서 sortBy를 하게됩니다. 그리고 나서 sort가 된 List를 리턴하구요.


test_group_sort_tdd.take(3)


 잘 되었는지 3개만 가져와서 화면에 뿌려봐서 결과를 확인합니다. 


test_group_rdd: org.apache.spark.rdd.RDD[(String, Iterable[(String, String)])] = ShuffledRDD[754] at groupByKey at <console>:32
test_group_sort_tdd: org.apache.spark.rdd.RDD[(String, List[(String, String)])] = MapPartitionsRDD[755] at map at <console>:34
res32: Array[(String, List[(String, String)])] = Array((user1,List((2016-07-01 00:00:00,X), (2016-07-01 03:03:00,X), (2016-07-01 15:15:00,O), (2016-07-01 16:16:00,X), (2016-07-01 17:00:00,X))), (user2,List((2016-07-01 00:00:00,X), (2016-07-01 03:02:00,O))), (user3,List((2016-07-01 03:02:00,O), (2016-07-01 04:02:00,O), (2016-07-01 05:02:00,X)))) 

결과는 역시 RDD로 나왔고, 내용을 보면 user1,2,3 별로 그룹핑되어 그 안에서 시간순으로 잘 정렬된 것을 확인할 수 있네요. 

이게 바로 interactive coding interface의 장점입니다. 하나하나 진행하면서 원하는대로 되어가는지 눈으로 보면서 진행할 수 있다는 거. 이걸 테스트 데이터로 쭉 따라가며 코딩해놓고 완성되면 실 데이터를 걸어놓고 밥먹으러 가면 되는 겁니다. :)


val refine_map = test_group_sort_tdd.map{r=>

    var index = 0

    var firstO = 0

    var isFirst = true

    for(v <- r._2) {

        index = index + 1 

        if (v._2 == "O" && isFirst) {

            firstO = index

            isFirst = false

        }

    }

    (r._1, firstO)

}

refine_map.take(3)

이제 정렬은 되었으니 첫번째로 반응한 이벤트가 몇 번째에 있는지 확인하면 되는데요. 역시 map 함수 내에서 처리하는데 user 내에서 첫번째  O가 등장한게 몇번째 이벤트인지를 구하는 코드를 for 문으로 구현했습니다. 뭐 로직이 복잡한건 전혀 아니니 읽어보면 아실거에요.  

아까는 맵의 두번째 리스트를 정렬만 해서 그대로 리턴했으나 이번에는 한개 값만 필요하므로 리턴문이 (r._1, firstO)로 단순해졌다는것이 차이점이겠네요. 

refine_map: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[761] at map at <console>:36

res42: Array[(String, Int)] = Array((user1,3), (user2,2), (user3,1))

자 이제 우리가 원했던 각 user가 몇번째의 이벤트에서  반응했는지 하는 숫자가 나왔습니다.


어렵지 않죠? :)

이 코드에 약간씩만 변형을 가하면 필요한 거의 모든 작업을 할 수 있습니다. 그러니까 잘 살펴보시고 이용해 보세요.