大数据Hadoop实践

发布于 2024-10-09  1315 次阅读


词频统计代码

// 用于统计文本文件中每个单词出现的次数
package WC;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.util.StringTokenizer;

public class WordCount {

  // 定义Mapper类
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    
    // 一个IntWritable实例,表示计数1
    private final static IntWritable one = new IntWritable(1);
    // 用于存储当前处理的单词
    private Text word = new Text();
      
    // map方法:将输入的一行文本拆分成单词,并为每个单词生成一个键值对
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      // 使用空格作为分隔符将输入行拆分为单词
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        // 设置当前单词
        word.set(itr.nextToken());
        // 输出键值对 (单词, 1)
        context.write(word, one);
      }
    }
  }
  
  // 定义Reducer类
  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    // 用于存储单词的总出现次数
    private IntWritable result = new IntWritable();

    // reduce方法:汇总相同单词的所有计数值
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        // 累加所有相同的单词的计数值
        sum += val.get();
      }
      // 设置结果计数值
      result.set(sum);
      // 输出最终的键值对 (单词, 总计数)
      context.write(key, result);
    }
  }

  // 主方法
  public static void main(String[] args) throws Exception {
    // 创建配置对象
    Configuration conf = new Configuration();
    // 解析命令行参数,获取Hadoop通用选项(如输入输出路径)
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    // 检查是否提供了足够的参数
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    // 创建Job实例
    Job job = Job.getInstance(conf, "word count");
    // 设置主类
    job.setJarByClass(WordCount.class);
    // 设置Mapper类
    job.setMapperClass(TokenizerMapper.class);
    // 可选:设置Combiner类(这里被注释掉了)
    // job.setCombinerClass(IntSumReducer.class);
    // 设置Reducer类
    job.setReducerClass(IntSumReducer.class);
    // 设置输出键的类型
    job.setOutputKeyClass(Text.class);
    // 设置输出值的类型
    job.setOutputValueClass(IntWritable.class);
    // 添加输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    // 设置输出路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
    // 提交作业并等待完成
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

倒排索引代码

// 统计每个单词在哪些文件中出现以及具体的行偏移量。

package II;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; // 使用 mapreduce API 中的 FileSplit
import java.util.StringTokenizer;

public class Inverted {

  // 定义Mapper类
  public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text> {
      
    // map方法:将输入的一行文本拆分成单词,并为每个单词生成一个键值对
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      // 获取当前处理的文件分片
      FileSplit fileSplit = (FileSplit) context.getInputSplit();
      String fileName = fileSplit.getPath().getName(); // 获取文件名
      long offset = key.get(); // 获取行偏移量
      
      // 使用空格作为分隔符将输入行拆分为单词
      StringTokenizer itr = new StringTokenizer(value.toString());
      
      while (itr.hasMoreTokens()) {
        String word = itr.nextToken(); // 获取当前单词
        // 将文件名和行偏移量组合成一个字符串
        String fileName_offset = fileName + ":" + offset;
        // 输出键值对 (单词, 文件名:行偏移量)
        context.write(new Text(word), new Text(fileName_offset));
      }
    }
  }
  
  // 定义Reducer类
  public static class IndexReducer extends Reducer<Text, Text, Text, Text> {
    private Text result = new Text();

    // reduce方法:汇总相同单词的所有文件名和行偏移量
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      StringBuilder nameList = new StringBuilder();
      for (Text val : values) {
        if (nameList.length() > 0) {
          nameList.append(";"); // 用分号分隔不同的文件名和行偏移量
        }
        nameList.append(val.toString()); // 添加当前的文件名和行偏移量
      }
      result.set(nameList.toString()); // 设置结果字符串
      context.write(key, result); // 输出最终的键值对 (单词, 文件名1:行偏移量1;文件名2:行偏移量2;...)
    }
  }

  // 主方法
  public static void main(String[] args) throws Exception {
    // 创建配置对象
    Configuration conf = new Configuration();
    // 解析命令行参数,获取Hadoop通用选项(如输入输出路径)
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    // 检查是否提供了足够的参数
    if (otherArgs.length < 2) {
      System.err.println("Usage: inverted index <in> [<in>...] <out>");
      System.exit(2);
    }
    // 创建Job实例
    Job job = Job.getInstance(conf, "inverted index");
    // 设置主类
    job.setJarByClass(Inverted.class);
    // 设置Mapper类
    job.setMapperClass(TokenizerMapper.class);
    // 设置Reducer类
    job.setReducerClass(IndexReducer.class);
    // 设置输出键的类型
    job.setOutputKeyClass(Text.class);
    // 设置输出值的类型
    job.setOutputValueClass(Text.class);

    // 添加多个输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }

    // 设置输出路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    // 提交作业并等待完成
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

(该功能存在待解决问题,如分词未清洗,同一文章同一单词未去重等)

专利引用

package pt;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Patent {

  // 定义Mapper类
  public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text> {
    // 用于存储引用专利
    private Text Citing = new Text();
    // 用于存储被引用专利
    private Text Cited = new Text();

    // map方法:将输入的一行文本拆分成引用专利和被引用专利,并生成键值对
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      // 使用逗号作为分隔符将输入行拆分为两个部分
      StringTokenizer itr = new StringTokenizer(value.toString(), ",");
      
      while (itr.hasMoreTokens()) {
        if (itr.hasMoreTokens()) {
          Citing.set(itr.nextToken()); // 获取引用专利
        }
        if (itr.hasMoreTokens()) {
          Cited.set(itr.nextToken()); // 获取被引用专利
        }
        // 输出键值对 (被引用专利, 引用专利)
        context.write(new Text(Cited), new Text(Citing));
      }
    }
  }

  // 定义Reducer类
  public static class IndexReducer extends Reducer<Text, Text, Text, Text> {
    // 用于存储结果字符串
    private Text result = new Text();

    // reduce方法:汇总相同被引用专利的所有引用专利,并在最后加上引用次数
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      StringBuilder nameList = new StringBuilder();
      long num = 0; // 用于计数引用次数

      for (Text val : values) {
        if (nameList.length() > 0) {
          nameList.append(","); // 用逗号分隔不同的引用专利
        }
        nameList.append(val.toString()); // 添加当前的引用专利
        num++; // 增加引用次数
      }

      // 追加引用次数
      nameList.append(" (").append(num).append(")");
      result.set(nameList.toString()); // 设置结果字符串
      context.write(key, result); // 输出最终的键值对 (被引用专利, 引用专利1,引用专利2,... (引用次数))
    }
  }

  // 主方法
  public static void main(String[] args) throws Exception {
    // 创建配置对象
    Configuration conf = new Configuration();
    // 解析命令行参数,获取Hadoop通用选项(如输入输出路径)
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    // 检查是否提供了足够的参数
    if (otherArgs.length < 2) {
      System.err.println("Usage: patent index <in> [<in>...] <out>");
      System.exit(2);
    }
    // 创建Job实例
    Job job = Job.getInstance(conf, "Cited   Citing");
    // 设置主类
    job.setJarByClass(Patent.class);
    // 设置Mapper类
    job.setMapperClass(TokenizerMapper.class);
    // 设置Reducer类
    job.setReducerClass(IndexReducer.class);
    // 设置输出键的类型
    job.setOutputKeyClass(Text.class);
    // 设置输出值的类型
    job.setOutputValueClass(Text.class);

    // 添加多个输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }

    // 设置输出路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    // 提交作业并等待完成
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

专利描述数据集分析

一共有三个代码,分别实现:

年份专利数统计、国家专利数统计、每年得到美国授权的国家统计

年份专利数统计

package PP;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Patent_Plus {

	public static class TokenizerMapper 
    extends Mapper<LongWritable, Text, Text, IntWritable> {
 
 private Text[] Parameters; // 用于存储分割后的各个字段

 @Override
 protected void setup(Context context) throws IOException, InterruptedException {
   // 初始化Parameters数组,大小与字段数量相同
   Parameters = new Text[23]; // 假设有22个字段
   for (int i = 0; i < Parameters.length; i++) {
     Parameters[i] = new Text();
   }
 }

 public void map(LongWritable key, Text value, Context context)
                 throws IOException, InterruptedException {
   String line = value.toString();
   String[] fields = line.split(","); // 使用-1参数来确保尾部空字符串也被包含

   // 检查是否确实有22个字段
   if (fields.length <= 23) {
     for (int i = 0; i < fields.length; i++) {
       Parameters[i].set(fields[i]);
     }

     context.write(Parameters[1], new IntWritable(1));
   } else {
     // 如果不是22个字段,可以选择记录错误或跳过这条记录
     System.err.println("Invalid record: " + fields.length);
   }
 }
}
  

  public static class IndexReducer
       extends Reducer<Text, IntWritable, Text, IntWritable> {
	  private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
      
}
  

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: inverted index <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "Cited   Citing");
    job.setJarByClass(Patent_Plus.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(IndexReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);  

    // 添加多个输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }

    // 设置输出路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}


国家专利数统计

package PP;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Patent_Country {

	public static class TokenizerMapper 
    extends Mapper<LongWritable, Text, Text, IntWritable> {
 
 private Text[] Parameters; // 用于存储分割后的各个字段

 @Override
 protected void setup(Context context) throws IOException, InterruptedException {
   // 初始化Parameters数组,大小与字段数量相同
   Parameters = new Text[23]; // 假设有22个字段
   for (int i = 0; i < Parameters.length; i++) {
     Parameters[i] = new Text();
   }
 }

 public void map(LongWritable key, Text value, Context context)
                 throws IOException, InterruptedException {
   String line = value.toString();
   String[] fields = line.split(","); // 使用-1参数来确保尾部空字符串也被包含

   // 检查是否确实有22个字段
   if (fields.length <= 23) {
     for (int i = 0; i < fields.length; i++) {
       Parameters[i].set(fields[i]);
     }

     // 输出 "Country" 字段和计数器
     context.write(Parameters[4], new IntWritable(1));
   } else {
     // 如果不是22个字段,可以选择记录错误或跳过这条记录
     System.err.println("Invalid record: " + fields.length);
   }
 }
}
  

  public static class IndexReducer
       extends Reducer<Text, IntWritable, Text, IntWritable> {
	  private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
      
}
  

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: inverted index <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "Cited   Citing");
    job.setJarByClass(Patent_Country.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(IndexReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);  

    // 添加多个输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }

    // 设置输出路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

每年得到美国授权的国家统计

package PP;

import java.io.IOException;
import java.util.StringTokenizer;

import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Patent_US {

	public static class TokenizerMapper 
    extends Mapper<LongWritable, Text, Text, Text> {
 
 private Text[] Parameters; // 用于存储分割后的各个字段

 @Override
 protected void setup(Context context) throws IOException, InterruptedException {
   // 初始化Parameters数组,大小与字段数量相同
   Parameters = new Text[23]; // 假设有22个字段
   for (int i = 0; i < Parameters.length; i++) {
     Parameters[i] = new Text();
   }
 }

 public void map(LongWritable key, Text value, Context context)
                 throws IOException, InterruptedException {
   String line = value.toString();
   String[] fields = line.split(","); // 使用-1参数来确保尾部空字符串也被包含

   // 检查是否确实有22个字段
   if (fields.length <= 23) {
     for (int i = 0; i < fields.length; i++) {
       Parameters[i].set(fields[i]);
     }
     context.write(Parameters[1], Parameters[4]);
   } else {
     // 如果不是22个字段,可以选择记录错误或跳过这条记录
     System.err.println("Invalid record: " + fields.length);
   }
 }
}
  

	public static class IndexReducer
    extends Reducer<Text, Text, Text, IntWritable> {
 
		private IntWritable count = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
            Set<Text> uniqueValues = new HashSet<>(); // 使用HashSet来存储不同的值
            for (Text value : values) {
                uniqueValues.add(value);
            }
            // 输出键和对应的不同值的数量
            count.set(uniqueValues.size());
            context.write(key, count);
        }
    }
   
  

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: inverted index <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "Cited   Citing");
    job.setJarByClass(Patent_US.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(IndexReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);  

    // 添加多个输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }

    // 设置输出路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

词频统计并去除停用词

分布式缓存分发

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package WP;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;



public class WordCount_1 {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    protected List<String> stops = new ArrayList<>();
    
    @Override
    protected void setup(Context context)throws IOException,InterruptedException{
    	//get path road
    	Path [] paths=DistributedCache.getLocalCacheFiles(context.getConfiguration());
    	//read the data
    	BufferedReader sbr = new BufferedReader(new FileReader(paths[0].toUri().getPath()));
    	String line;
        while ((line = sbr.readLine()) != null) {
          // 假设每行只有一个停用词
          stops.add(line.trim());
        }
    	sbr.close();
    }
    
    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
    	  word.set(itr.nextToken());
          String cleanedWord = word.toString().replaceAll("[^a-zA-Z]", "").toLowerCase();
          if(!stops.contains(cleanedWord)) {
              word.set(cleanedWord);
              context.write(word, one);
          }
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    private int k;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 从配置中读取频率阈值
        k = context.getConfiguration().getInt("wordcount.frequency.threshold", 10);
    }
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      if(sum > k)
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    int frequencyThreshold = 10; // 你可以根据需要更改这个值
    conf.setInt("wordcount.frequency.threshold", frequencyThreshold);
    
    Job job = Job.getInstance(conf, "word count(plus)");
    job.setJarByClass(WordCount_1.class);
    
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    job.addCacheFile(new URI("stopwords.txt"));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

HDFS共享

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package WP;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import java.net.URL;
import java.net.URI;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;



public class WordCount_2 {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    protected static List<String> stops = new ArrayList<>();
    
    static{URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());}
    @Override
    protected void setup(Context context)throws IOException,InterruptedException{
    	InputStream in = new URL("hdfs://localhost:9000/stop/stopwords.txt").openStream();
    	BufferedReader sb = new BufferedReader(new InputStreamReader(in));
    	String line;
        while ((line = sb.readLine()) != null) {
          // 假设每行只有一个停用词
          stops.add(line.trim());
        }
    	sb.close();
    }
    
    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        String cleanedWord = word.toString().replaceAll("[^a-zA-Z]", "").toLowerCase();
        if(!stops.contains(cleanedWord)) {
            word.set(cleanedWord);
            context.write(word, one);
        }
      }
      
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    private int k;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 从配置中读取频率阈值
        k = context.getConfiguration().getInt("wordcount.frequency.threshold", 10);
    }
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      if(sum > k)
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    int frequencyThreshold = 10; // 你可以根据需要更改这个值
    conf.setInt("wordcount.frequency.threshold", frequencyThreshold);

    
    Job job = Job.getInstance(conf, "word count(plus)");
    job.setJarByClass(WordCount_2.class);
    
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

最终效果如下:

KNN鸢尾花分类(两种实现方法)

训练集较大情况

package KNN;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.net.URL;
import java.net.URI;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;

public class KNN {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, IntWritable, Text>{
    
	double dis;
	private ArrayList<ArrayList<Double>> test = new ArrayList<>();
	
	static{URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());}
	@Override
    protected void setup(Context context)throws IOException,InterruptedException{
		InputStream in = new URL("hdfs://localhost:9000/data/iris_test_data.csv").openStream();
    	BufferedReader sb = new BufferedReader(new InputStreamReader(in));
    	//store the test data
    	String line;
        while ((line = sb.readLine()) != null) {
        	StringTokenizer itr = new StringTokenizer(line.toString());
        	while(itr.hasMoreTokens()){
        		String tmp[] = itr.nextToken().split(",");
    			ArrayList data = new ArrayList();
    			for(int i=0;i<4;++i){
    				//string invert to double
    				data.add(Double.parseDouble(tmp[i]));
    			}
    			test.add(data);
        	}
        }
    	sb.close();
    }
	
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
		 StringTokenizer itr = new StringTokenizer(value.toString());
		 
		 while(itr.hasMoreTokens()){
			 //store the train data
			 String tmp[] = itr.nextToken().split(",");
			 String label = tmp[4];
			 ArrayList data = new ArrayList();
			 for(int i=0;i<4;++i){
				 data.add(Double.parseDouble(tmp[i]));
			 }
			 
			 for(int i=0;i<test.size();i++){
		        	ArrayList tmp2 = (ArrayList) test.get(i);
		        	double dis = 0;
		        	for(int j=0;j<4;++j){
		        		dis += Math.pow(((double)data.get(j)-(double)tmp2.get(j)), 2);
		        	}
		        	dis = Math.sqrt(dis);
		        	
		        	String out = label + "," + dis;
		        	context.write(new IntWritable(i),new Text(out));
		        }
		 }
		 
		 
      
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<IntWritable,Text,IntWritable,Text> {
    private IntWritable result = new IntWritable();
    protected static List<String> labels = new ArrayList<>();
    int k;
    
    //store the test label 
    //static{URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());}
    @Override
    protected void setup(Context context)throws IOException,InterruptedException{
    	InputStream in = new URL("hdfs://localhost:9000/data/iris_test_label.csv").openStream();
    	BufferedReader sb = new BufferedReader(new InputStreamReader(in));
    	String line;
        while ((line = sb.readLine()) != null) {
          // 假设每行只有一个停用词
          labels.add(line.trim());
        }
    	sb.close();
    	
    	Configuration conf = context.getConfiguration();
    	k=conf.getInt("k",4);
    }
    
    public void reduce(IntWritable key, Iterable<Text> values, 
                       Context context
                       ) throws IOException, InterruptedException {
    	ArrayList<String> tmp = new ArrayList<String>();
    	for(Text val: values){
    		tmp.add(val.toString());
    	}
    	System.out.println(tmp);
    	
    	Collections.sort(tmp,new Comparator<String>(){
    		public int compare(String s1,String s2){
    			
    			double d1 = Double.parseDouble(s1.split(",")[1]);
    			double d2 = Double.parseDouble(s2.split(",")[1]);
    			return Double.compare(d1,d2);
    		}
    	});
    	
    	ArrayList<String> tmp2 = new ArrayList<String>();
    	for(int i=0;i<k;++i){
    		tmp2.add(tmp.get(i).split(",")[0]);
    	}
    	Set<String> set = new LinkedHashSet<>();
        set.addAll(tmp2);
        List<String> labelset = new ArrayList<>(set);
        int[] count = new int[labelset.size()];
        // initial
        for (int i=0;i<count.length;i++){
        	count[i] = 0;
        }

        for(int i=0;i<labelset.size();i++){
        	for (int j=0;j<tmp2.size();j++){
        		if (labelset.get(i).equals(tmp2.get(j))){
        			count[i] += 1;
        		}
        	}
        }
        
        int max = 0;
        for(int i=0;i<count.length;i++){
        	if(count[i] > count[max]){
        		max = i;
        	}
        }
     
        context.write(key, new Text("pre_label:" + labelset.get(max) + "\t" + "real_label:" + String.valueOf(labels.get(key.get()))));
       
   }
}
  public static class KNNCombiner extends Reducer<IntWritable, Text, IntWritable, Text> {
	    private int k;

	    @Override
	    protected void setup(Context context) throws IOException, InterruptedException {
	        Configuration conf = context.getConfiguration();
	        k = conf.getInt("k", 4);
	    }

	    @Override
	    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
	        ArrayList<String> tmp = new ArrayList<>();
	        for (Text val : values) {
	            String valueStr = val.toString();
	            // 检查字符串是否包含逗号
	            if (valueStr.contains(",")) {
	                tmp.add(valueStr);
	            }
	        }

	        if (tmp.size() < k) {
	            for (String val : tmp) {
	                context.write(key, new Text(val));
	            }
	        } else {
	            Collections.sort(tmp, new Comparator<String>() {
	                public int compare(String s1, String s2) {
	                    double d1 = Double.parseDouble(s1.split(",")[1]);
	                    double d2 = Double.parseDouble(s2.split(",")[1]);
	                    return Double.compare(d1, d2);
	                }
	            });

	            
	            for (int i = 0; i < k; ++i) {
	                String out = tmp.get(i);
	                context.write(key, new Text(out));
	            }
	        }
	    }
	}


  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "KNN Classify");
    job.setJarByClass(KNN.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(KNNCombiner.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    //job.addCacheFile(new URI("iris_test_data.csv"));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

测试集较大情况

package KNN;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.net.URL;
import java.net.URI;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;

public class KNN2 {

  public static class TokenizerMapper 
       extends Mapper<LongWritable, Text, LongWritable, Text>{
    
	double dis;
	private ArrayList<ArrayList<Double>> train = new ArrayList<>();
	private int count = 0;
	private int k = 0;
	private ArrayList labels = new ArrayList<>();
	
	static{URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());}
	@Override
    protected void setup(Context context)throws IOException,InterruptedException{
		InputStream in = new URL("hdfs://localhost:9000/data/iris_train.csv").openStream();
    	BufferedReader sb = new BufferedReader(new InputStreamReader(in));
    	//store the test data
    	String line;
        while ((line = sb.readLine()) != null) {
        	StringTokenizer itr = new StringTokenizer(line.toString());
        	while(itr.hasMoreTokens()){
        		String tmp[] = itr.nextToken().split(",");
    			ArrayList data = new ArrayList();
    			for(int i=0;i<4;++i){
    				//string invert to double
    				data.add(Double.parseDouble(tmp[i]));
    			}
    			labels.add(tmp[4]);
    			train.add(data);
        	}
        }
    	sb.close();
    }
	
      
    public void map(LongWritable key, Text value, Context context
                    ) throws IOException, InterruptedException {
		 StringTokenizer itr = new StringTokenizer(value.toString());
		 
		 while(itr.hasMoreTokens()){
			 String tmp[] = itr.nextToken().split(",");

			 ArrayList data = new ArrayList();
			 for(int i=0;i<4;++i){
				 data.add(Double.parseDouble(tmp[i]));
			 }
			 
			 for(int i=0;i<train.size();i++){
		        	ArrayList tmp2 = (ArrayList) train.get(i);
		        	//String tmp2[] = itr.nextToken().split(",");
					String label = (String) labels.get(i);
		        	double dis = 0;
		        	for(int j=0;j<4;++j){
		        		dis += Math.pow(((double)data.get(j)-(double)tmp2.get(j)), 2);
		        	}
		        	dis = Math.sqrt(dis);
		        	
		        	String out = label + "," + dis;
		        	context.write(key,new Text(out));
		        }

		 }
		 
      
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<LongWritable,Text,LongWritable,Text> {
    private IntWritable result = new IntWritable();
    protected static List<String> labels = new ArrayList<>();
    int k;
    
    //store the test label 
    //static{URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());}
    @Override
    protected void setup(Context context)throws IOException,InterruptedException{
    	InputStream in = new URL("hdfs://localhost:9000/data/iris_test_label.csv").openStream();
    	BufferedReader sb = new BufferedReader(new InputStreamReader(in));
    	String line;
        while ((line = sb.readLine()) != null) {
          // 假设每行只有一个停用词
          labels.add(line.trim());
        }
    	sb.close();
    	
    	Configuration conf = context.getConfiguration();
    	k=conf.getInt("k",4);
    }
    
    public void reduce(LongWritable key, Iterable<Text> values, 
                       Context context
                       ) throws IOException, InterruptedException {
    	ArrayList<String> tmp = new ArrayList<String>();
    	for(Text val: values){
    		tmp.add(val.toString());
    	}
    	System.out.println(key.toString()+tmp);
    	
    	Collections.sort(tmp,new Comparator<String>(){
    		public int compare(String s1,String s2){
    			
    			double d1 = Double.parseDouble(s1.split(",")[1]);
    			double d2 = Double.parseDouble(s2.split(",")[1]);
    			return Double.compare(d1,d2);
    		}
    	});
    	
    	ArrayList<String> tmp2 = new ArrayList<String>();
    	for(int i=0;i<k;++i){
    		tmp2.add(tmp.get(i).split(",")[0]);
    	}
    	Set<String> set = new LinkedHashSet<>();
        set.addAll(tmp2);
        List<String> labelset = new ArrayList<>(set);
        int[] count = new int[labelset.size()];
        // initial
        for (int i=0;i<count.length;i++){
        	count[i] = 0;
        }

        for(int i=0;i<labelset.size();i++){
        	for (int j=0;j<tmp2.size();j++){
        		if (labelset.get(i).equals(tmp2.get(j))){
        			count[i] += 1;
        		}
        	}
        }
        
        int max = 0;
        for(int i=0;i<count.length;i++){
        	if(count[i] > count[max]){
        		max = i;
        	}
        }
        
        int index = (int) key.get();
        index/=16;
        if (index < labels.size()) {
            context.write(key, new Text("pre_label:" + labelset.get(max) + "\t" + "real_label:" + String.valueOf(labels.get(index))));
        } else {
            context.write(key, new Text("pre_label:" + labelset.get(max) + "\t" + "real_label: unknown"));
        }
        //context.write(key, new Text("pre_label:" + labelset.get(max) + "\t" + "real_label:" + String.valueOf(labels.get((int) key.get()))));
       
   }
}
  public static class KNNCombiner extends Reducer<LongWritable, Text, LongWritable, Text> {
	    private int k;

	    @Override
	    protected void setup(Context context) throws IOException, InterruptedException {
	        Configuration conf = context.getConfiguration();
	        k = conf.getInt("k", 4);
	    }

	    @Override
	    public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
	        ArrayList<String> tmp = new ArrayList<>();
	        for (Text val : values) {
	            String valueStr = val.toString();
	            // 检查字符串是否包含逗号
	            if (valueStr.contains(",")) {
	                tmp.add(valueStr);
	            }
	        }

	        if (tmp.size() < k) {
	            for (String val : tmp) {
	                context.write(key, new Text(val));
	            }
	        } else {
	            Collections.sort(tmp, new Comparator<String>() {
	                public int compare(String s1, String s2) {
	                    double d1 = Double.parseDouble(s1.split(",")[1]);
	                    double d2 = Double.parseDouble(s2.split(",")[1]);
	                    return Double.compare(d1, d2);
	                }
	            });

	            
	            for (int i = 0; i < k; ++i) {
	                String out = tmp.get(i);
	                context.write(key, new Text(out));
	            }
	        }
	    }
	}


  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "KNN Classify");
    job.setJarByClass(KNN2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(KNNCombiner.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    //job.addCacheFile(new URI("iris_test_data.csv"));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

人生苦难处,正是修行时