반응형
항공 운항 데이터 분석 예제
설명
- 항공운항데이터를 다운로드하여 지연시간을 분석하는 실습
- 출발 지연과 도착 지연 횟수를 분석하기 위해 파라미터로 어떤 지연인지 입력하면 해당하는 지연시간을 분석하는 프로그램 실습
항공 운항 데이터 다운로드
- URL
- 2006 ~ 2008년치 데이터로 테스트 진행
namenode로 데이터 업로드
- 경로 : /home/ubuntu/airlineDelay
HDFS에 업로드
- hdfs dfs -mkdir -p /airlineDelay/input
- hdfs dfs -put /home/ubuntu/airlineDelay/* /airlineDelay/input
프로젝트 생성
- 프로젝트명 : hadoop-airline-delay
- 패키지 : com.airline.delay
pom.xml 파일 수정
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.airline.delay</groupId>
<artifactId>hadoop-airline-delay</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.0</version>
</dependency>
</dependencies>
</project>
AirlineData 코드 작성
package com.airline.delay.model;
import lombok.Data;
@Data
public class AirlineData {
private static final int ARRIVAL_INDEX = 14;
private static final int DEPARTURE_INDEX = 15;
private int year;
private int month;
private Integer arrivalDelayTime;
private Integer departureDelayTime;
public AirlineData(String[] columns) {
year = Integer.parseInt(columns[0]);
month = Integer.parseInt(columns[1]);
if (!columns[ARRIVAL_INDEX].equals("NA")) {
arrivalDelayTime = Integer.parseInt(columns[ARRIVAL_INDEX]);
}
if (!columns[DEPARTURE_INDEX].equals("NA")) {
departureDelayTime = Integer.parseInt(columns[DEPARTURE_INDEX]);
}
}
}
Mapper 코드 작성
package com.airline.delay.mapreduce;
import com.airline.delay.model.AirlineData;
import com.airline.delay.type.DelayTimeStatus;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/***
* [입력값]
* 연도,월,날짜,도착지연,출발지연
* 2008,1,1,0,10
* 2008,1,2,0,10
* 2008,2,0,10
* 2008,2,0,0
*
* [맵]
* 키,지연횟수
* 2008,1,<1, 1>
* 2008,2,<1>
*
*/
public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable OUTPUT_VALUE = new IntWritable(1);
private static final Text OUTPUT_KEY = new Text();
private String delayType;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
delayType = context.getConfiguration().get("delayType");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
DelayType type = DelayType.getDelayType(delayType);
if (key.get() <= 0 || type == null) {
return;
}
AirlineData delayData = new AirlineData(value.toString().split(","));
Integer delayTime = type.getDelayTime(delayData);
if (delayTime != null && delayTime > 0) {
OUTPUT_KEY.set(delayData.getYear() + "," + delayData.getMonth());
context.write(OUTPUT_KEY, OUTPUT_VALUE);
}
}
}
Reducer 코드 작성
package com.airline.delay.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/***
* [맵]
* 키,지연횟수
* 2008,1,<1, 1>
* 2008,2,<1>
*
* [결과]
* 키,지연횟수
* 2008,1,2
* 2008,2,1
*/
public class DelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final IntWritable RESULT = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
RESULT.set(sum);
context.write(key, RESULT);
}
}
Main 코드 작성
package com.airline.delay;
import com.airline.delay.mapreduce.DelayCountMapper;
import com.airline.delay.mapreduce.DelayCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new DelayCountDriver(), args);
}
@Override
public int run(String[] args) throws Exception {
// otherArgs : -D나 -f와 같은 설정 파라미터가 아닌 일반 파라미터
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage : DelayCount <inputDirectory> <outputDirectory>");
System.exit(1);
}
Job job = new Job(getConf(), "DelayCount");
// 담당 클래스 지정
job.setJarByClass(DelayCountDriver.class);
job.setMapperClass(DelayCountMapper.class);
job.setReducerClass(DelayCountReducer.class);
// 입출력 데이터 형식 지정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 출력키, 값 데이터 형식 지정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 입력 파일, 출력 디렉토리 지정
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 분석 작업 진행
job.waitForCompletion(true);
return 0;
}
}
jar 파일 생성
mvn clean compile jar:jar
masternode에 jar파일 업로드 후 실행
hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver -DdelayType=arrival /airlineDelay/input/* /airlineDelay/output/arrival
hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver -DdelayType=departure /airlineDelay/input/* /airlineDelay/output/departure
결과 파일 확인
hdfs dfs -cat /airlineDelay/output/arrival/part-*
hdfs dfs -cat /airlineDelay/output/departure/part-*
기후 데이터 분석 예제(맵리듀스)
분석 데이터
- 기후 데이터 센터
- 테스트 샘플 데이터
- 2003년도의 각 날마다 최고 기온을 구하는 태스크 예제
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>hadoop-example</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.test.skip>true</maven.test.skip>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<archive>
<manifest>
<!-- public static void main() 함수가 있는 클래스 지정 -->
<mainClass>org.example.hadoop.example.MaxTemperature</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
</project>
Mapper
package org.example.hadoop.example;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final String MISSING = "****";
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (key.get() == 0) {
return;
}
String line = value.toString();
String date = line.substring(13, 21);
String temp = line.substring(68, 72);
if (!temp.equals(MISSING)) {
context.write(new Text(date), new IntWritable(Integer.parseInt(temp.trim())));
}
}
}
Reducer
package org.example.hadoop.example;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
Main
package org.example.hadoop.example;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxTemperature {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
태스크 jar 파일 생성
mvn clean compile package
태스크 jar 파일 실행
- 샘플 데이터 파일과 태스크 jar 파일을 Hadoop이 설치된 서버에 업로드 후 아래 명령어 실행
hadoop jar hadoop-example.jar 3505dat.txt result
주의사항
- result 디렉토리가 이미 존재하는 디렉토리일 경우 제거 후 다시 테스트해야함
반응형
'Development > Hadoop' 카테고리의 다른 글
[Hadoop] MultipleOutputs (0) | 2018.10.13 |
---|---|
[Hadoop] 카운터 사용하기 (0) | 2018.10.13 |
[Hadoop] 용어 (0) | 2018.09.01 |
[Hadoop] 이슈 (0) | 2018.09.01 |
[Hadoop] 명령어 (0) | 2018.09.01 |