반응형
설명
- 앞 예제에서는 출발, 도착 지연시간을 각각(SingleOutput) 서로 다른 잡에서 분석을 수행
- MultipleOutputs은 한 Job에서 동시에 출발, 도착 지연시간을 분석하고 분석된 데이터를 각각 별도의 파일로 남기는 것이 가능
DelayType 코드 수정
package com.airline.delay.type;
import org.apache.commons.lang3.StringUtils;
public enum DelayType {
ARRIVAL("ARRIVAL"),
DEPARTURE("DEPARTURE");
private String type;
DelayType(String type) {
this.type = type;
}
public String getType() {
return type;
}
public static DelayType getDelayType(String type) {
for (DelayType delayType : values()) {
if (StringUtils.equals(delayType.type, type)) {
return delayType;
}
}
return null;
}
}
Mapper 코드 수정
package com.airline.delay.mapreduce;
import com.airline.delay.model.AirlineData;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/***
* [입력값]
* 연도,월,날짜,도착지연,출발지연
* 2008,1,1,0,10
* 2008,1,2,0,10
* 2008,2,0,10
* 2008,2,0,0
*
* [맵]
* 키,지연횟수
* 2008,1,<1, 1>
* 2008,2,<1>
*
*/
public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable OUTPUT_VALUE = new IntWritable(1);
private static final Text OUTPUT_KEY = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (key.get() <= 0) {
return;
}
AirlineData delayData = new AirlineData(value.toString().split(","));
if (delayData.getArrivalDelayTime() != null && delayData.getArrivalDelayTime() > 0) {
OUTPUT_KEY.set(DelayType.ARRIVAL.getType() + "," + delayData.getYear() + "," + delayData.getMonth());
context.write(OUTPUT_KEY, OUTPUT_VALUE);
}
if (delayData.getDepartureDelayTime() != null && delayData.getDepartureDelayTime() > 0) {
OUTPUT_KEY.set(DelayType.DEPARTURE.getType() + "," + delayData.getYear() + "," + delayData.getMonth());
context.write(OUTPUT_KEY, OUTPUT_VALUE);
}
}
}
Reducer 코드 수정
package com.airline.delay.mapreduce;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
/***
* [맵]
* 키,지연횟수
* 2008,1,<1, 1>
* 2008,2,<1>
*
* [결과]
* 키,지연횟수
* 2008,1,2
* 2008,2,1
*/
public class DelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final IntWritable RESULT = new IntWritable();
private static final Text OUTPUT_KEY = new Text();
private MultipleOutputs<Text, IntWritable> multipleOutputs;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<>(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
String[] columns = key.toString().split(",");
DelayType delayType = DelayType.getDelayType(columns[0]);
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
OUTPUT_KEY.set(columns[1] + "," + columns[2]);
RESULT.set(sum);
// ARRIVAL/DEPARTURE로 시작하는 파일명으로 분석 결과 작성
multipleOutputs.write(delayType.getType(), OUTPUT_KEY, RESULT);
}
}
Driver 코드 수정
package com.airline.delay;
import com.airline.delay.mapreduce.DelayCountMapper;
import com.airline.delay.mapreduce.DelayCountReducer;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new DelayCountDriver(), args);
}
@Override
public int run(String[] args) throws Exception {
// otherArgs : -D나 -f와 같은 설정 파라미터가 아닌 일반 파라미터
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage : DelayCount <inputDirectory> <outputDirectory>");
System.exit(1);
}
Job job = new Job(getConf(), "DelayCount");
// 담당 클래스 지정
job.setJarByClass(DelayCountDriver.class);
job.setMapperClass(DelayCountMapper.class);
job.setReducerClass(DelayCountReducer.class);
// 입출력 데이터 형식 지정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 출력키, 값 데이터 형식 지정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 입력 파일, 출력 디렉토리 지정
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// MultipleOutputs 설정
MultipleOutputs.addNamedOutput(job, DelayType.ARRIVAL.getType(), TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, DelayType.DEPARTURE.getType(), TextOutputFormat.class, Text.class, IntWritable.class);
// 분석 작업 진행
job.waitForCompletion(true);
return 0;
}
}
jar 파일 생성
- mvn clean compile jar:jar
masternode에 jar파일 업로드 후 실행
- hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver /airlineDelay/input/* /airlineDelay/output/
결과파일 확인
- hdfs dfs -cat /airlineDelay/output/ARRIVAL-*
- hdfs dfs -cat /airlineDelay/output/DEPARTURE-*
반응형
'Development > Hadoop' 카테고리의 다른 글
[Hadoop] 전체 정렬(Total Sort) (0) | 2018.10.14 |
---|---|
[Hadoop] 부분 정렬(Partial Sort) (0) | 2018.10.14 |
[Hadoop] 카운터 사용하기 (0) | 2018.10.13 |
[Hadoop] 예제 (0) | 2018.09.02 |
[Hadoop] 용어 (0) | 2018.09.01 |