Wednesday, April 29, 2015

6. Start Hadoop, Parse code by Musthafa + Kiran Kumar ...

The following are the instructions/steps  to follow to use hadoop.

1.Create a folder user in the hadoop file system
          hdfs dfs -mkdir /user

2.Create a folder with name shpould be same as username inside the user folder
          hdfs dfs -mkdir /user/<username>

3.Keep all the sample data files in a local folder (in local file system)
           e.g  /home/<username>/input/input1.dat, /home/<username>/input/input2.dat,......

4.Copy the input folder into hadoop file system
         hdfs dfs -put /home/<username>/input
    Note: by default it will copy the files and folders under hdfs://host:port/user/<username>/
    The result of the above command is it will copy the input folder to hdfs:/user/<username>/

5.To check whether the input files copied or not use the following command
    hdfs dfs -ls /user/<username>
    result : input
    hdfs dfs -ls /user/<username>/input
    result : input1.dat, input2.dat, .....

6.Write a java code to parse and reduce (Includes business logic)  in our example source code is :
  
import java.io.BufferedReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ByteCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private static IntWritable bytecounter = new IntWritable(1);
    private Text word = new Text();

    private Configuration conf;
    private BufferedReader fis;
    int bytecount=0;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();

    }


    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {


        String line = value.toString();
        Matcher ipMatcher =Pattern.compile(" dst=([^ \t]*)").matcher(line);
        Matcher bytesMatcher=Pattern.compile(" rcvd=([^ \t]*)").matcher(line);

    String ip="";
    String bytes="";

    if(ipMatcher.find() && bytesMatcher.find())
    {
        ip=ipMatcher.group(1);
        bytes=bytesMatcher.group(1);

    }

    System.out.println(" IP : "+ip);
    System.out.println(" BYTES : "+bytes);
        word.set(ip);
    int tmp=0;
    try{
        if(bytes.trim().isEmpty())
            tmp=0;
        else
            tmp=Integer.parseInt(bytes);
    }
    finally{

    }
        bytecounter=new IntWritable(tmp);
        context.write(word, bytecounter);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(tmp);

    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
      System.err.println("Usage: bytecount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(ByteCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("bytecount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


       
7.Compile sources.
    hadoop com.sun.tools.javac.Main ByteCount.java

8.Make a jar file with all class files generated by above comand.
    jar cf wc.jar ByteCount*.class

9.Run hadoop tasks
    hadoop jar wc.jar ByteCount /user/<username>/input/ output
   
    Note: by default it create the output folder under
hdfs:/user/<username>/
   
    in this case the output  folder is
hdfs:/user/<username>/output

10.Copy the output folder into local file system
    hdfs dfs -get output ./hadoop_output

11.Display the output
    cat ./hadoop_output/*

Result :

103.1.124.7    1234
103.1.124.8    314
103.1.124.9    3893
103.20.92.129    337565
103.229.206.84    134
103.243.222.155    5773
103.243.222.32    17127
103.243.222.41    5677
103.243.222.51    2330
103.243.222.81    4334
103.243.222.85    5703
103.243.222.93    1133
103.243.222.95    19137
103.245.222.134    12489
103.245.222.143    109683
103.245.222.166    1718
103.245.222.175    64680
103.245.222.249    53584
103.30.235.115    243
103.31.6.35    21930
104.155.232.138    11925
104.156.81.217    11026
104.156.85.134    8513
104.156.85.217    11289
104.16.12.13    20912
104.16.13.13    2101
....
....
....
....
....
....
....
....


Thanks and regards
Kirankumar  and Musthafa

No comments:

Post a Comment