반응형
설명
- 부분 정렬이란?
- 말이 정렬이지 실제 정렬보다는 데이터 검색에서 사용되는 방법을 말함
- 정렬된 결과에서 데이터를 검색할 때 사용하는 방식
- 부분 정렬 수행 단계
- 입력 데이터 -> 시퀀스 파일 생성
- 시퀀스 파일 -> 맵 파일 생성
- 맵 파일 -> 데이터 검색
- 예제 설명
- 항공운항데이터 중에서 운항거리를 키값으로 정렬하고 검색하는 예제를 실습
AirlineData 코드 작성
package com.airline.common.model;
import lombok.Data;
@Data
public class AirlineData {
private static final int ARRIVAL_INDEX = 14;
private static final int DEPARTURE_INDEX = 15;
private static final int DISTANCE_INDEX = 18;
private int year;
private int month;
private Integer arrivalDelayTime;
private Integer departureDelayTime;
private Integer distance;
public AirlineData(String[] columns) {
year = Integer.parseInt(columns[0]);
month = Integer.parseInt(columns[1]);
if (!columns[ARRIVAL_INDEX].equals("NA")) {
arrivalDelayTime = Integer.parseInt(columns[ARRIVAL_INDEX]);
}
if (!columns[DEPARTURE_INDEX].equals("NA")) {
departureDelayTime = Integer.parseInt(columns[DEPARTURE_INDEX]);
}
if (!columns[DISTANCE_INDEX].equals("NA")) {
distance = Integer.parseInt(columns[DISTANCE_INDEX]);
}
}
}
SequenceFileCreator 코드 작성
package com.airline.partialsort;
import com.airline.common.model.AirlineData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/***
* 입력 데이터 -> 시퀀스 파일
*/
public class SequenceFileCreator extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SequenceFileCreator(), args);
System.out.println("## RESULT : " + res);
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(SequenceFileCreator.class);
conf.setJobName(SequenceFileCreator.class.getName());
conf.setMapperClass(DistanceMapper.class);
// 리듀서가 필요없으므로 0
conf.setNumReduceTasks(0);
// 출력 포맷을 SequenceFile로 설정
conf.setOutputFormat(SequenceFileOutputFormat.class);
// 출력 키를 항공운항거리 타입(정수)으로 설정
conf.setOutputKeyClass(IntWritable.class);
// 출력 값을 라인 타입(문자열)으로 설정
conf.setOutputValueClass(Text.class);
// 시퀀스 파일 압축 포맷 설정
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);
// 입출력 경로 설정
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
// 실행
JobClient.runJob(conf);
return 0;
}
private static class DistanceMapper extends MapReduceBase implements Mapper<LongWritable, org.apache.hadoop.io.Text, IntWritable, org.apache.hadoop.io.Text> {
private static final IntWritable OUTPUT_KEY = new IntWritable();
@Override
public void map(LongWritable key, org.apache.hadoop.io.Text value, OutputCollector<IntWritable, org.apache.hadoop.io.Text> output, Reporter reporter) throws IOException {
if (key.get() <= 0) {
return;
}
AirlineData airlineData = new AirlineData(value.toString().split(","));
if (airlineData.getDistance() != null) {
OUTPUT_KEY.set(airlineData.getDistance());
output.collect(OUTPUT_KEY, value);
}
}
}
}
MapFileCreator 코드 작성
package com.airline.partialsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/***
* 시퀀스 파일 -> 맵 파일
*/
public class MapFileCreator extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MapFileCreator(), args);
System.out.println("## RESULT : " + res);
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(MapFileCreator.class);
conf.setJobName(MapFileCreator.class.getName());
// 입력 데이터를 SequenceFile로 설정
conf.setInputFormat(SequenceFileInputFormat.class);
// 출력 데이터를 MapFile로 설정
conf.setOutputFormat(MapFileOutputFormat.class);
// 출력 데이터의 키를 항공운항거리 타입(정수)으로 설정
conf.setOutputKeyClass(IntWritable.class);
// 시퀀스 파일 압축 포맷 설정
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);
// 입출력 경로 설정
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
// 실행
JobClient.runJob(conf);
return 0;
}
}
MapFileValueSearcher 코드 작성
package com.airline.partialsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/***
* 맵 파일에서 검색
*/
public class MapFileValueSearcher extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MapFileValueSearcher(), args);
System.out.println("## RESULT : " + res);
}
@Override
public int run(String[] args) throws Exception {
Path path = new Path(args[0]);
FileSystem fileSystem = path.getFileSystem(getConf());
// MapFile 조회
MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fileSystem, path, getConf());
// 검색 키를 저장할 객체 선언
IntWritable key = new IntWritable();
key.set(Integer.parseInt(args[1]));
// 검색 값을 저장할 객체를 선언
Text value = new Text();
// 파티셔너를 이용해 검색 키가 저장된 MapFile 조회
Partitioner<IntWritable, Text> partitioner = new HashPartitioner<>();
MapFile.Reader reader = readers[partitioner.getPartition(key, value, readers.length)];
// 검색 결과 확인
Writable entry = reader.get(key, value);
if (entry == null) {
System.out.println("The requested key was not found.");
}
// MapFile을 순회하며 키와 값을 출력
IntWritable nextKey = new IntWritable();
do {
System.out.println(value.toString());
} while (reader.next(nextKey, value) && key.equals(nextKey));
return 0;
}
}
시퀀스 파일 생성 및 확인
- hadoop jar hadoop-airline.jar com.airline.partialsort.SequenceFileCreator /airline/input /airline/output/sequence
- hdfs dfs -text /airline/output/sequence/part-00000 | head -15
맵 파일 생성 및 확인
- hadoop jar hadoop-airline.jar com.airline.partialsort.MapFileCreator /airline/output/sequence /airline/output/map
- hdfs dfs -text /airline/output/map/part-00000/data | head -10
운항거리 데이터 검색
- hadoop jar hadoop-airline.jar com.airline.partialsort.MapFileValueSearcher /airline/output/map 100
반응형
'Development > Hadoop' 카테고리의 다른 글
[Hadoop] 전체 정렬(Total Sort) (0) | 2018.10.14 |
---|---|
[Hadoop] MultipleOutputs (0) | 2018.10.13 |
[Hadoop] 카운터 사용하기 (0) | 2018.10.13 |
[Hadoop] 예제 (0) | 2018.09.02 |
[Hadoop] 용어 (0) | 2018.09.01 |