반응형

설명

  • 부분 정렬이란?
    • 말이 정렬이지 실제 정렬보다는 데이터 검색에서 사용되는 방법을 말함
    • 정렬된 결과에서 데이터를 검색할 때 사용하는 방식
  • 부분 정렬 수행 단계
    • 입력 데이터 -> 시퀀스 파일 생성
    • 시퀀스 파일 -> 맵 파일 생성
    • 맵 파일 -> 데이터 검색
  • 예제 설명
    • 항공운항데이터 중에서 운항거리를 키값으로 정렬하고 검색하는 예제를 실습

AirlineData 코드 작성

package com.airline.common.model;

import lombok.Data;

@Data
public class AirlineData {
    private static final int ARRIVAL_INDEX = 14;
    private static final int DEPARTURE_INDEX = 15;
    private static final int DISTANCE_INDEX = 18;

    private int year;
    private int month;
    private Integer arrivalDelayTime;
    private Integer departureDelayTime;
    private Integer distance;

    public AirlineData(String[] columns) {
        year = Integer.parseInt(columns[0]);
        month = Integer.parseInt(columns[1]);

        if (!columns[ARRIVAL_INDEX].equals("NA")) {
            arrivalDelayTime = Integer.parseInt(columns[ARRIVAL_INDEX]);
        }

        if (!columns[DEPARTURE_INDEX].equals("NA")) {
            departureDelayTime = Integer.parseInt(columns[DEPARTURE_INDEX]);
        }

        if (!columns[DISTANCE_INDEX].equals("NA")) {
            distance = Integer.parseInt(columns[DISTANCE_INDEX]);
        }
    }
}

SequenceFileCreator 코드 작성

package com.airline.partialsort;

import com.airline.common.model.AirlineData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/***
 * 입력 데이터 -> 시퀀스 파일
 */
public class SequenceFileCreator extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new SequenceFileCreator(), args);
        System.out.println("## RESULT : " + res);
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(SequenceFileCreator.class);
        conf.setJobName(SequenceFileCreator.class.getName());
        conf.setMapperClass(DistanceMapper.class);

        // 리듀서가 필요없으므로 0
        conf.setNumReduceTasks(0);
        // 출력 포맷을 SequenceFile로 설정
        conf.setOutputFormat(SequenceFileOutputFormat.class);
        // 출력 키를 항공운항거리 타입(정수)으로 설정
        conf.setOutputKeyClass(IntWritable.class);
        // 출력 값을 라인 타입(문자열)으로 설정
        conf.setOutputValueClass(Text.class);

        // 시퀀스 파일 압축 포맷 설정
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);

        // 입출력 경로 설정
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        // 실행
        JobClient.runJob(conf);

        return 0;
    }

    private static class DistanceMapper extends MapReduceBase implements Mapper<LongWritable, org.apache.hadoop.io.Text, IntWritable, org.apache.hadoop.io.Text> {
        private static final IntWritable OUTPUT_KEY = new IntWritable();

        @Override
        public void map(LongWritable key, org.apache.hadoop.io.Text value, OutputCollector<IntWritable, org.apache.hadoop.io.Text> output, Reporter reporter) throws IOException {
            if (key.get() <= 0) {
                return;
            }

            AirlineData airlineData = new AirlineData(value.toString().split(","));

            if (airlineData.getDistance() != null) {
                OUTPUT_KEY.set(airlineData.getDistance());
                output.collect(OUTPUT_KEY, value);
            }
        }
    }
}

MapFileCreator 코드 작성

package com.airline.partialsort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/***
 * 시퀀스 파일 -> 맵 파일
 */
public class MapFileCreator extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new MapFileCreator(), args);
        System.out.println("## RESULT : " + res);
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(MapFileCreator.class);
        conf.setJobName(MapFileCreator.class.getName());

        // 입력 데이터를 SequenceFile로 설정
        conf.setInputFormat(SequenceFileInputFormat.class);
        // 출력 데이터를 MapFile로 설정
        conf.setOutputFormat(MapFileOutputFormat.class);
        // 출력 데이터의 키를 항공운항거리 타입(정수)으로 설정
        conf.setOutputKeyClass(IntWritable.class);

        // 시퀀스 파일 압축 포맷 설정
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(conf, SequenceFile.CompressionType.BLOCK);

        // 입출력 경로 설정
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        // 실행
        JobClient.runJob(conf);

        return 0;
    }
}

MapFileValueSearcher 코드 작성

package com.airline.partialsort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/***
 * 맵 파일에서 검색
 */
public class MapFileValueSearcher extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new MapFileValueSearcher(), args);
        System.out.println("## RESULT : " + res);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path path = new Path(args[0]);
        FileSystem fileSystem = path.getFileSystem(getConf());

        // MapFile 조회
        MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fileSystem, path, getConf());

        // 검색 키를 저장할 객체 선언
        IntWritable key = new IntWritable();
        key.set(Integer.parseInt(args[1]));

        // 검색 값을 저장할 객체를 선언
        Text value = new Text();

        // 파티셔너를 이용해 검색 키가 저장된 MapFile 조회
        Partitioner<IntWritable, Text> partitioner = new HashPartitioner<>();
        MapFile.Reader reader = readers[partitioner.getPartition(key, value, readers.length)];

        // 검색 결과 확인
        Writable entry = reader.get(key, value);
        if (entry == null) {
            System.out.println("The requested key was not found.");
        }

        // MapFile을 순회하며 키와 값을 출력
        IntWritable nextKey = new IntWritable();
        do {
            System.out.println(value.toString());
        } while (reader.next(nextKey, value) && key.equals(nextKey));

        return 0;
    }
}

시퀀스 파일 생성 및 확인

  • hadoop jar hadoop-airline.jar com.airline.partialsort.SequenceFileCreator /airline/input /airline/output/sequence
  • hdfs dfs -text /airline/output/sequence/part-00000 | head -15

맵 파일 생성 및 확인

  • hadoop jar hadoop-airline.jar com.airline.partialsort.MapFileCreator /airline/output/sequence /airline/output/map
  • hdfs dfs -text /airline/output/map/part-00000/data | head -10

운항거리 데이터 검색

  • hadoop jar hadoop-airline.jar com.airline.partialsort.MapFileValueSearcher /airline/output/map 100
반응형

'Development > Hadoop' 카테고리의 다른 글

[Hadoop] 전체 정렬(Total Sort)  (0) 2018.10.14
[Hadoop] MultipleOutputs  (0) 2018.10.13
[Hadoop] 카운터 사용하기  (0) 2018.10.13
[Hadoop] 예제  (0) 2018.09.02
[Hadoop] 용어  (0) 2018.09.01

+ Recent posts