반응형

설명

  • 전체 정렬이란?
    • 정렬된 전체 데이터를 파일로 만들 때 사용하는 방법
    • 하나의 파티션만 사용하므로 리듀스 태스크를 실행하는 데이터노드에 부하가 집중되는 현상이 발생할 수 있음(?)
    • 입력 파일과 출력 파일 모두 시퀀스 파일로 구성
  • 분산 처리의 장점을 살리면서 전체 정렬을 하는 방법
    • 입력 데이터를 샘플링해서 데이터의 분포도를 조사
    • 데이터의 분포도에 맞게 파티션 정보를 미리 생성
    • 미리 생성한 파티션 정보에 맞게 출력 데이터를 생성
    • 각 출력 데이터를 병합
  • 예제 설명
    • 입력 파일로 시퀀스 파일이 필요하므로 부분 정렬 예제 부분에서 시퀀스 파일을 생성하는 부분까지 수행 후 해당 실습 

시퀀스 파일 생성 및 확인

SequenceFileTotalSort 코드 작성

package com.airline.totalsort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class SequenceFileTotalSort extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new SequenceFileTotalSort(), args);
        System.out.println("## RESULT : " + res);
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), SequenceFileTotalSort.class);

        // 입출력 경로 설정
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setJobName(SequenceFileTotalSort.class.getName());
        conf.setInputFormat(SequenceFileInputFormat.class);
        conf.setOutputFormat(SequenceFileOutputFormat.class);
        conf.setOutputKeyClass(IntWritable.class);

        // 파티션 정보를 직접 설정하기 위한 설정
        conf.setPartitionerClass(TotalOrderPartitioner.class);

        // 시퀀스 파일 압축 포맷 설정
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);

        // 파티션 경로 설정
        // 경로 : 입력 데이터_partitions
        Path inputDir = FileInputFormat.getInputPaths(conf)[0];
        inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
        Path partitionFile = new Path(inputDir, "_partitions");
        TotalOrderPartitioner.setPartitionFile(conf, partitionFile);

        // 샘플 데이터 추출
        // 10개의 입력 스플릿에서 0.1%의 확률로 1000건의 데이터를 샘플링
        InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 1000, 10);
        InputSampler.writePartitionFile(conf, sampler);

        // 각 task가 파티션 정보를 참조할 수 있도록 분산 캐시에 파티션 정보를 등록
        URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
        DistributedCache.addCacheFile(partitionUri, conf);
        DistributedCache.createSymlink(conf);

        // 실행
        JobClient.runJob(conf);

        return 0;
    }
}

전체 정렬 파일 생성 및 확인

  • hadoop jar hadoop-airline.jar com.airline.totalsort.SequenceFileTotalSort /airline/output/sequence /airline/output/totalSort
  • hdfs dfs -text /airline/output/totalSort/part-00000 | head -10
반응형

'Development > Hadoop' 카테고리의 다른 글

[Hadoop] 부분 정렬(Partial Sort)  (0) 2018.10.14
[Hadoop] MultipleOutputs  (0) 2018.10.13
[Hadoop] 카운터 사용하기  (0) 2018.10.13
[Hadoop] 예제  (0) 2018.09.02
[Hadoop] 용어  (0) 2018.09.01

+ Recent posts