반응형

설명

  • 앞 예제에서는 출발, 도착 지연시간을 각각(SingleOutput) 서로 다른 잡에서 분석을 수행
  • MultipleOutputs은 한 Job에서 동시에 출발, 도착 지연시간을 분석하고 분석된 데이터를 각각 별도의 파일로 남기는 것이 가능

DelayType 코드 수정

package com.airline.delay.type;

import org.apache.commons.lang3.StringUtils;

public enum DelayType {
    ARRIVAL("ARRIVAL"),
    DEPARTURE("DEPARTURE");

    private String type;

    DelayType(String type) {
        this.type = type;
    }

    public String getType() {
        return type;
    }

    public static DelayType getDelayType(String type) {
        for (DelayType delayType : values()) {
            if (StringUtils.equals(delayType.type, type)) {
                return delayType;
            }
        }

        return null;
    }
}

Mapper 코드 수정

package com.airline.delay.mapreduce;

import com.airline.delay.model.AirlineData;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/***
 * [입력값]
 * 연도,월,날짜,도착지연,출발지연
 * 2008,1,1,0,10
 * 2008,1,2,0,10
 * 2008,2,0,10
 * 2008,2,0,0
 *
 * [맵]
 * 키,지연횟수
 * 2008,1,<1, 1>
 * 2008,2,<1>
 *
 */
public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable OUTPUT_VALUE = new IntWritable(1);
    private static final Text OUTPUT_KEY = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (key.get() <= 0) {
            return;
        }

        AirlineData delayData = new AirlineData(value.toString().split(","));

        if (delayData.getArrivalDelayTime() != null && delayData.getArrivalDelayTime() > 0) {
            OUTPUT_KEY.set(DelayType.ARRIVAL.getType() + "," + delayData.getYear() + "," + delayData.getMonth());
            context.write(OUTPUT_KEY, OUTPUT_VALUE);
        }

        if (delayData.getDepartureDelayTime() != null && delayData.getDepartureDelayTime() > 0) {
            OUTPUT_KEY.set(DelayType.DEPARTURE.getType() + "," + delayData.getYear() + "," + delayData.getMonth());
            context.write(OUTPUT_KEY, OUTPUT_VALUE);
        }
    }
}

Reducer 코드 수정

package com.airline.delay.mapreduce;

import com.airline.delay.type.DelayType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.IOException;

/***
 * [맵]
 * 키,지연횟수
 * 2008,1,<1, 1>
 * 2008,2,<1>
 *
 * [결과]
 * 키,지연횟수
 * 2008,1,2
 * 2008,2,1
 */
public class DelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private static final IntWritable RESULT = new IntWritable();
    private static final Text OUTPUT_KEY = new Text();

    private MultipleOutputs<Text, IntWritable> multipleOutputs;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs<>(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        String[] columns = key.toString().split(",");
        DelayType delayType = DelayType.getDelayType(columns[0]);

        int sum = 0;

        for (IntWritable value : values) {
            sum += value.get();
        }

        OUTPUT_KEY.set(columns[1] + "," + columns[2]);
        RESULT.set(sum);

        // ARRIVAL/DEPARTURE로 시작하는 파일명으로 분석 결과 작성
        multipleOutputs.write(delayType.getType(), OUTPUT_KEY, RESULT);
    }
}

Driver 코드 수정

package com.airline.delay;

import com.airline.delay.mapreduce.DelayCountMapper;
import com.airline.delay.mapreduce.DelayCountReducer;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DelayCountDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new DelayCountDriver(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        // otherArgs : -D나 -f와 같은 설정 파라미터가 아닌 일반 파라미터
        String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

        if (otherArgs.length != 2) {
            System.err.println("Usage : DelayCount <inputDirectory> <outputDirectory>");
            System.exit(1);
        }

        Job job = new Job(getConf(), "DelayCount");

        // 담당 클래스 지정
        job.setJarByClass(DelayCountDriver.class);
        job.setMapperClass(DelayCountMapper.class);
        job.setReducerClass(DelayCountReducer.class);

        // 입출력 데이터 형식 지정
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 출력키, 값 데이터 형식 지정
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 입력 파일, 출력 디렉토리 지정
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        // MultipleOutputs 설정
        MultipleOutputs.addNamedOutput(job, DelayType.ARRIVAL.getType(), TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, DelayType.DEPARTURE.getType(), TextOutputFormat.class, Text.class, IntWritable.class);

        // 분석 작업 진행
        job.waitForCompletion(true);

        return 0;
    }
}

jar 파일 생성

  • mvn clean compile jar:jar

masternode에 jar파일 업로드 후 실행

  • hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver /airlineDelay/input/* /airlineDelay/output/

결과파일 확인

  • hdfs dfs -cat /airlineDelay/output/ARRIVAL-*
  • hdfs dfs -cat /airlineDelay/output/DEPARTURE-*
반응형

'Development > Hadoop' 카테고리의 다른 글

[Hadoop] 전체 정렬(Total Sort)  (0) 2018.10.14
[Hadoop] 부분 정렬(Partial Sort)  (0) 2018.10.14
[Hadoop] 카운터 사용하기  (0) 2018.10.13
[Hadoop] 예제  (0) 2018.09.02
[Hadoop] 용어  (0) 2018.09.01

+ Recent posts