#title RDD 연산 [[TableOfContents]] 작성 중.. pyspark를 사용할 때 되도록 built-in 함수를 사용하는 것이 좋다. 그렇지 않으면 JVM을 왔다갔다 하는 오버헤드가 발생할 수 있다. == RDD(Resilient Distributed Dataset) == * 분산되어 있는 변경 불가능(immutable)한 객체 모음 * 클러스터의 여러 노드에 여러 파티션으로 나뉨 == Transformation == RDD를 만든다. 하지만, 즉시 수행되지 않고 "액션"을 수행할 때에 비로소 실행된다. ==== textFile() ==== c:\data\test.txt파일을 2개의 파티션을 나눠 RDD를 만든다. {{{ lines = sc.textFile("c:\\data\\test.txt", 2) lines.collect() }}} ==== map() ==== ==== union() ==== 2개의 RDD 합치기, 아래 예제는 3개의 RDD 합치기 {{{ lines1 = sc.parallelize(['a', 'b', 'c']) lines2 = sc.parallelize(['d', 'e', 'f']) lines3 = sc.parallelize(['g', 'h', 'i']) lines = lines1.union(lines2).union(lines3) for line in lines.collect(): print(line) }}} 결과 {{{ >>> lines1 = sc.parallelize(['a', 'b', 'c']) >>> lines2 = sc.parallelize(['d', 'e', 'f']) >>> lines3 = sc.parallelize(['g', 'h', 'i']) >>> lines = lines1.union(lines2).union(lines3) >>> for line in lines.collect(): ... print(line) ... a b c d e f g h i >>> }}} ==== filter() ==== 필터 {{{ lines = sc.parallelize(['가지', '무', '배추', '상추']) choo = lines.filter(lambda x: "추" in x) choo.collect() }}} 결과 {{{ >>> lines = sc.parallelize(['가지', '무', '배추', '상추']) >>> choo = lines.filter(lambda x: "추" in x) >>> choo.collect() ['배추', '상추'] >>> }}} == Action == 액션은 RDD에 어떤 처리를 하여 결과를 리턴한다. 리턴은 드라이버 프로그램이나 HDFS같은 외부 스토리지에 저장한다. ==== take() ==== take(2)는 RDD에서 2개 불러오라는 것 {{{ lines = sc.parallelize(['a', 'b', 'c']) for line in lines.take(2): print(line) }}} 결과 {{{ >>> lines = sc.parallelize(['a', 'b', 'c']) >>> for line in lines.take(2): ... print(line) ... a b >>> }}} ==== collection() ==== 전체 불러오기 {{{ lines = sc.parallelize(['a', 'b', 'c']) for line in lines.collect(): print(line) }}} 결과 {{{ >>> lines = sc.parallelize(['a', 'b', 'c']) >>> for line in lines.collect(): ... print(line) ... a b c }}}