你好,游客 登录
背景:
阅读新闻

使用hadoop实现关联商品统计

[日期:2014-10-20] 来源:CSDN  作者:xiaojimanman [字体: ]

    转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40184581

    最近几天一直在看hadoop相关的书籍,目前稍微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。

需求描述:

    根据超市的销售清单,计算商品之间的关联程度(即统计同时买A商品和B商品的次数)。

数据格式:

    超市销售清单简化为如下格式:一行表示一个清单,每个商品采用 "," 分割,如下图所示:

 

需求分析:

    采用hadoop中的mapreduce对该需求进行计算。

    map函数主要拆分出关联的商品,输出结果为 key为商品A,value为商品B,对于第一条三条结果拆分结果如下图所示:

    这里为了统计出和A、B两件商品想关联的商品,所以商品A、B之间的关系输出两条结果即 A-B、B-A。

    reduce函数分别对和商品A相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品A|商品B,value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为R的做下分析:

    通过map函数的处理,得到如下图所示的记录:

 

    reduce中对map输出的value值进行分组计数,得到的结果如下图所示

    将商品A B作为key,组合个数作为value输出,输出结果如下图所示:

 

    对于需求的实现过程的分析到目前就结束了,下面就看下具体的代码实现

代码实现:

    关于代码就不做详细的介绍,具体参照代码之中的注释吧。

[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. package com; 
  2.  
  3. import java.io.IOException; 
  4. import java.util.HashMap; 
  5. import java.util.Map.Entry; 
  6.  
  7. import org.apache.hadoop.conf.Configuration; 
  8. import org.apache.hadoop.conf.Configured; 
  9. import org.apache.hadoop.fs.Path; 
  10. import org.apache.hadoop.io.IntWritable; 
  11. import org.apache.hadoop.io.LongWritable; 
  12. import org.apache.hadoop.io.Text; 
  13. import org.apache.hadoop.mapreduce.Job; 
  14. import org.apache.hadoop.mapreduce.Mapper; 
  15. import org.apache.hadoop.mapreduce.Reducer; 
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
  19. import org.apache.hadoop.util.Tool; 
  20. import org.apache.hadoop.util.ToolRunner; 
  21.  
  22. public class Test extends Configured implements Tool{ 
  23.  
  24.     /** 
  25.      * map类,实现数据的预处理 
  26.      * 输出结果key为商品A value为关联商品B 
  27.      * @author lulei 
  28.      */ 
  29.     public static class MapT extends Mapper<LongWritable, Text, Text, Text> { 
  30.         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 
  31.             String line = value.toString(); 
  32.             if (!(line == null || "".equals(line))) { 
  33.                 //分割商品 
  34.                 String []vs = line.split(","); 
  35.                 //两两组合,构成一条记录 
  36.                 for (int i = 0; i < (vs.length - 1); i++) { 
  37.                     if ("".equals(vs[i])) {//排除空记录 
  38.                         continue
  39.                     } 
  40.                     for (int j = i+1; j < vs.length; j++) { 
  41.                         if ("".equals(vs[j])) { 
  42.                             continue
  43.                         } 
  44.                         //输出结果 
  45.                         context.write(new Text(vs[i]), new Text(vs[j])); 
  46.                         context.write(new Text(vs[j]), new Text(vs[i])); 
  47.                     } 
  48.                 } 
  49.             }   
  50.         } 
  51.     } 
  52.      
  53.     /** 
  54.      * reduce类,实现数据的计数 
  55.      * 输出结果key 为商品A|B value为该关联次数 
  56.      * @author lulei 
  57.      */ 
  58.     public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> { 
  59.         private int count; 
  60.          
  61.         /** 
  62.          * 初始化 
  63.          */ 
  64.         public void setup(Context context) { 
  65.             //从参数中获取最小记录个数 
  66.             String countStr = context.getConfiguration().get("count"); 
  67.             try { 
  68.                 this.count = Integer.parseInt(countStr); 
  69.             } catch (Exception e) { 
  70.                 this.count = 0
  71.             } 
  72.         } 
  73.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ 
  74.             String keyStr = key.toString(); 
  75.             HashMap<String, Integer> hashMap = new HashMap<String, Integer>(); 
  76.             //利用hash统计B商品的次数 
  77.             for (Text value : values) { 
  78.                 String valueStr = value.toString(); 
  79.                 if (hashMap.containsKey(valueStr)) { 
  80.                     hashMap.put(valueStr, hashMap.get(valueStr) + 1); 
  81.                 } else { 
  82.                     hashMap.put(valueStr, 1); 
  83.                 } 
  84.             } 
  85.             //将结果输出 
  86.             for (Entry<String, Integer> entry : hashMap.entrySet()) { 
  87.                 if (entry.getValue() >= this.count) {//只输出次数不小于最小值的 
  88.                     context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue())); 
  89.                 } 
  90.             } 
  91.         } 
  92.     } 
  93.      
  94.     @Override 
  95.     public int run(String[] arg0) throws Exception { 
  96.         // TODO Auto-generated method stub 
  97.         Configuration conf = getConf(); 
  98.         conf.set("count", arg0[2]); 
  99.          
  100.         Job job = new Job(conf); 
  101.         job.setJobName("jobtest"); 
  102.          
  103.         job.setOutputFormatClass(TextOutputFormat.class); 
  104.         job.setOutputKeyClass(Text.class); 
  105.         job.setOutputValueClass(Text.class); 
  106.          
  107.         job.setMapperClass(MapT.class); 
  108.         job.setReducerClass(ReduceT.class); 
  109.          
  110.         FileInputFormat.addInputPath(job, new Path(arg0[0])); 
  111.         FileOutputFormat.setOutputPath(job, new Path(arg0[1])); 
  112.          
  113.         job.waitForCompletion(true); 
  114.          
  115.         return job.isSuccessful() ? 0 : 1
  116.          
  117.     } 
  118.      
  119.     /** 
  120.      * @param args 
  121.      */ 
  122.     public static void main(String[] args) { 
  123.         // TODO Auto-generated method stub 
  124.         if (args.length != 3) { 
  125.             System.exit(-1); 
  126.         } 
  127.         try { 
  128.             int res = ToolRunner.run(new Configuration(), new Test(), args); 
  129.             System.exit(res); 
  130.         } catch (Exception e) { 
  131.             // TODO Auto-generated catch block 
  132.             e.printStackTrace(); 
  133.         } 
  134.     } 
  135.  


上传运行:

    将程序打包成jar文件,上传到机群之中。将测试数据也上传到HDFS分布式文件系统中。

    命令运行截图如下图所示:

    运行结束后查看相应的HDFS文件系统,如下图所示:

    到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~





收藏 推荐 打印 | 录入: | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款