반응형
설명
- 전체 정렬이란?
- 정렬된 전체 데이터를 파일로 만들 때 사용하는 방법
- 하나의 파티션만 사용하므로 리듀스 태스크를 실행하는 데이터노드에 부하가 집중되는 현상이 발생할 수 있음(?)
- 입력 파일과 출력 파일 모두 시퀀스 파일로 구성
- 분산 처리의 장점을 살리면서 전체 정렬을 하는 방법
- 입력 데이터를 샘플링해서 데이터의 분포도를 조사
- 데이터의 분포도에 맞게 파티션 정보를 미리 생성
- 미리 생성한 파티션 정보에 맞게 출력 데이터를 생성
- 각 출력 데이터를 병합
- 예제 설명
- 입력 파일로 시퀀스 파일이 필요하므로 부분 정렬 예제 부분에서 시퀀스 파일을 생성하는 부분까지 수행 후 해당 실습
시퀀스 파일 생성 및 확인
- [예제] 부분 정렬에서 시퀀스 파일 생성 및 확인 부분을 진행
SequenceFileTotalSort 코드 작성
package com.airline.totalsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class SequenceFileTotalSort extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SequenceFileTotalSort(), args);
System.out.println("## RESULT : " + res);
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), SequenceFileTotalSort.class);
// 입출력 경로 설정
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setJobName(SequenceFileTotalSort.class.getName());
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
// 파티션 정보를 직접 설정하기 위한 설정
conf.setPartitionerClass(TotalOrderPartitioner.class);
// 시퀀스 파일 압축 포맷 설정
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);
// 파티션 경로 설정
// 경로 : 입력 데이터_partitions
Path inputDir = FileInputFormat.getInputPaths(conf)[0];
inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
Path partitionFile = new Path(inputDir, "_partitions");
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
// 샘플 데이터 추출
// 10개의 입력 스플릿에서 0.1%의 확률로 1000건의 데이터를 샘플링
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 1000, 10);
InputSampler.writePartitionFile(conf, sampler);
// 각 task가 파티션 정보를 참조할 수 있도록 분산 캐시에 파티션 정보를 등록
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
// 실행
JobClient.runJob(conf);
return 0;
}
}
전체 정렬 파일 생성 및 확인
- hadoop jar hadoop-airline.jar com.airline.totalsort.SequenceFileTotalSort /airline/output/sequence /airline/output/totalSort
- hdfs dfs -text /airline/output/totalSort/part-00000 | head -10
반응형
'Development > Hadoop' 카테고리의 다른 글
[Hadoop] 부분 정렬(Partial Sort) (0) | 2018.10.14 |
---|---|
[Hadoop] MultipleOutputs (0) | 2018.10.13 |
[Hadoop] 카운터 사용하기 (0) | 2018.10.13 |
[Hadoop] 예제 (0) | 2018.09.02 |
[Hadoop] 용어 (0) | 2018.09.01 |