반응형

항공 운항 데이터 분석 예제

설명

  • 항공운항데이터를 다운로드하여 지연시간을 분석하는 실습
  • 출발 지연과 도착 지연 횟수를 분석하기 위해 파라미터로 어떤 지연인지 입력하면 해당하는 지연시간을 분석하는 프로그램 실습

항공 운항 데이터 다운로드

namenode로 데이터 업로드

  • 경로 : /home/ubuntu/airlineDelay

HDFS에 업로드

  • hdfs dfs -mkdir -p /airlineDelay/input
  • hdfs dfs -put /home/ubuntu/airlineDelay/* /airlineDelay/input

프로젝트 생성

  • 프로젝트명 : hadoop-airline-delay
  • 패키지 : com.airline.delay

pom.xml 파일 수정

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.airline.delay</groupId>
    <artifactId>hadoop-airline-delay</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <finalName>${artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>
    </dependencies>
</project>

AirlineData 코드 작성

package com.airline.delay.model;

import lombok.Data;

@Data
public class AirlineData {
    private static final int ARRIVAL_INDEX = 14;
    private static final int DEPARTURE_INDEX = 15;

    private int year;
    private int month;
    private Integer arrivalDelayTime;
    private Integer departureDelayTime;

    public AirlineData(String[] columns) {
        year = Integer.parseInt(columns[0]);
        month = Integer.parseInt(columns[1]);

        if (!columns[ARRIVAL_INDEX].equals("NA")) {
            arrivalDelayTime = Integer.parseInt(columns[ARRIVAL_INDEX]);
        }

        if (!columns[DEPARTURE_INDEX].equals("NA")) {
            departureDelayTime = Integer.parseInt(columns[DEPARTURE_INDEX]);
        }
    }
}

Mapper 코드 작성

package com.airline.delay.mapreduce;

import com.airline.delay.model.AirlineData;
import com.airline.delay.type.DelayTimeStatus;
import com.airline.delay.type.DelayType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/***
 * [입력값]
 * 연도,월,날짜,도착지연,출발지연
 * 2008,1,1,0,10
 * 2008,1,2,0,10
 * 2008,2,0,10
 * 2008,2,0,0
 *
 * [맵]
 * 키,지연횟수
 * 2008,1,<1, 1>
 * 2008,2,<1>
 *
 */
public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable OUTPUT_VALUE = new IntWritable(1);
    private static final Text OUTPUT_KEY = new Text();

    private String delayType;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        delayType = context.getConfiguration().get("delayType");
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        DelayType type = DelayType.getDelayType(delayType);

        if (key.get() <= 0 || type == null) {
            return;
        }

        AirlineData delayData = new AirlineData(value.toString().split(","));
        Integer delayTime = type.getDelayTime(delayData);

        if (delayTime != null && delayTime > 0) {
            OUTPUT_KEY.set(delayData.getYear() + "," + delayData.getMonth());
            context.write(OUTPUT_KEY, OUTPUT_VALUE);
        }
    }
}

Reducer 코드 작성

package com.airline.delay.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/***
 * [맵]
 * 키,지연횟수
 * 2008,1,<1, 1>
 * 2008,2,<1>
 *
 * [결과]
 * 키,지연횟수
 * 2008,1,2
 * 2008,2,1
 */
public class DelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private static final IntWritable RESULT = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        for (IntWritable value : values) {
            sum += value.get();
        }

        RESULT.set(sum);
        context.write(key, RESULT);
    }
}

Main 코드 작성

package com.airline.delay;

import com.airline.delay.mapreduce.DelayCountMapper;
import com.airline.delay.mapreduce.DelayCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DelayCountDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new DelayCountDriver(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        // otherArgs : -D나 -f와 같은 설정 파라미터가 아닌 일반 파라미터
        String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

        if (otherArgs.length != 2) {
            System.err.println("Usage : DelayCount <inputDirectory> <outputDirectory>");
            System.exit(1);
        }

        Job job = new Job(getConf(), "DelayCount");

        // 담당 클래스 지정
        job.setJarByClass(DelayCountDriver.class);
        job.setMapperClass(DelayCountMapper.class);
        job.setReducerClass(DelayCountReducer.class);

        // 입출력 데이터 형식 지정
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 출력키, 값 데이터 형식 지정
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 입력 파일, 출력 디렉토리 지정
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        // 분석 작업 진행
        job.waitForCompletion(true);

        return 0;
    }
}

jar 파일 생성

mvn clean compile jar:jar

masternode에 jar파일 업로드 후 실행

hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver -DdelayType=arrival /airlineDelay/input/* /airlineDelay/output/arrival
hadoop jar hadoop-airline-delay.jar com.airline.delay.DelayCountDriver -DdelayType=departure /airlineDelay/input/* /airlineDelay/output/departure

결과 파일 확인

hdfs dfs -cat /airlineDelay/output/arrival/part-*
hdfs dfs -cat /airlineDelay/output/departure/part-*

기후 데이터 분석 예제(맵리듀스)

분석 데이터

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hadoop-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.test.skip>true</maven.test.skip>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <!-- public static void main() 함수가 있는 클래스 지정 -->
                            <mainClass>org.example.hadoop.example.MaxTemperature</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
    </dependencies>
</project>

Mapper

package org.example.hadoop.example;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final String MISSING = "****";

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (key.get() == 0) {
            return;
        }

        String line = value.toString();
        String date = line.substring(13, 21);
        String temp = line.substring(68, 72);

        if (!temp.equals(MISSING)) {
            context.write(new Text(date), new IntWritable(Integer.parseInt(temp.trim())));
        }
    }
}

Reducer

package org.example.hadoop.example;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;

        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }

        context.write(key, new IntWritable(maxValue));
    }
}

Main

package org.example.hadoop.example;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MaxTemperature {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

태스크 jar 파일 생성

mvn clean compile package

태스크 jar 파일 실행

  • 샘플 데이터 파일과 태스크 jar 파일을 Hadoop이 설치된 서버에 업로드 후 아래 명령어 실행
hadoop jar hadoop-example.jar 3505dat.txt result

주의사항

  • result 디렉토리가 이미 존재하는 디렉토리일 경우 제거 후 다시 테스트해야함
반응형

'Development > Hadoop' 카테고리의 다른 글

[Hadoop] MultipleOutputs  (0) 2018.10.13
[Hadoop] 카운터 사용하기  (0) 2018.10.13
[Hadoop] 용어  (0) 2018.09.01
[Hadoop] 이슈  (0) 2018.09.01
[Hadoop] 명령어  (0) 2018.09.01

+ Recent posts