반응형

설명

  • 맵리듀스 실행 완료 후 로그로 출력되도록 카운터를 세팅하는 실습
  • 성공, 실패, 유효함 등 상태를 카운터로 증가시켜 얼마나 성공, 실패 등을 했는지를 확인하기 위해 카운터를 사용함

DelayTimeStatus 코드 추가

package com.airline.delay.type;

public enum DelayTimeStatus {
    // 지연시간이 NA
    NOT_AVAILABLE {
        @Override
        boolean checkCondition(Integer delayTime) {
            return delayTime == null;
        }
    },

    // 예정 보다 일찍 도착/출발
    EARLY {
        @Override
        boolean checkCondition(Integer delayTime) {
            return delayTime < 0;
        }
    },

    // 지연
    DELAY {
        @Override
        boolean checkCondition(Integer delayTime) {
            return delayTime > 0;
        }
    },

    // 정시 도착/출발
    SCHEDULED {
        @Override
        boolean checkCondition(Integer delayTime) {
            return delayTime == 0;
        }
    };

    abstract boolean checkCondition(Integer delayTime);

    public static DelayTimeStatus getStatus(Integer delayTime) {
        for (DelayTimeStatus type : values()) {
            if (type.checkCondition(delayTime)) {
                return type;
            }
        }

        throw new IllegalArgumentException(delayTime + " is not invalid argument.");
    }
}

Mapper 코드 수정

package com.airline.delay.mapreduce;

import com.airline.delay.model.AirlineData;
import com.airline.delay.type.DelayTimeStatus;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/***
 * [입력값]
 * 연도,월,날짜,도착지연,출발지연
 * 2008,1,1,0,10
 * 2008,1,2,0,10
 * 2008,2,0,10
 * 2008,2,0,0
 *
 * [맵]
 * 키,지연횟수
 * 2008,1,<1, 1>
 * 2008,2,<1>
 *
 */
public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable OUTPUT_VALUE = new IntWritable(1);
    private static final Text OUTPUT_KEY = new Text();

    private String delayType;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        delayType = context.getConfiguration().get("delayType");
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        DelayType type = DelayType.getDelayType(delayType);

        if (key.get() <= 0 || type == null) {
            return;
        }

        AirlineData delayData = new AirlineData(value.toString().split(","));
        Integer delayTime = type.getDelayTime(delayData);

        // 지연시간이 NA인지, 예정보다 일찍인지, 정시인지 횟수를 카운터로 출력
        context.getCounter(DelayTimeStatus.getStatus(delayTime)).increment(1);

        if (delayTime != null && delayTime > 0) {
            OUTPUT_KEY.set(delayData.getYear() + "," + delayData.getMonth());
            context.write(OUTPUT_KEY, OUTPUT_VALUE);
        }
    }
}

jar 파일 생성

  • mvn clean compile jar:jar

masternode에 jar파일 업로드 후 실행

  • hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver -DdelayType=arrival /airlineDelay/input/* /airlineDelay/output/arrival
  • hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver -DdelayType=departure /airlineDelay/input/* /airlineDelay/output/departure

카운터 로그 확인

com.airline.delay.type.DelayTimeStatus
    DELAY=2979504
    EARLY=3690606
    NOT_AVAILABLE=154699
    SCHEDULED=184919

결과파일 확인

  • hdfs dfs -cat /airlineDelay/output/arrival/part-*
  • hdfs dfs -cat /airlineDelay/output/departure/part-*
반응형

'Development > Hadoop' 카테고리의 다른 글

[Hadoop] 부분 정렬(Partial Sort)  (0) 2018.10.14
[Hadoop] MultipleOutputs  (0) 2018.10.13
[Hadoop] 예제  (0) 2018.09.02
[Hadoop] 용어  (0) 2018.09.01
[Hadoop] 이슈  (0) 2018.09.01

+ Recent posts