Java8 stream reduce

Submitted by Lizhe on Thu, 04/13/2017 - 08:39

 

虽然上次我说filter方法有点类似于hadoop的Mapreduce,不过实际上stream提供了一个自己的方法也叫 reduce

通常情况下这个方法接受两个参数, 一个初始值,一个Lambda表达式

下面是一个求和的例子

 int total = students.stream().map(Student::getScore).reduce(0,(a,b)->a+b);

上面这个等价于 Optional<Integer> sum = students.stream().map(Student::getScore).reduce(Integer::sum);

首先我们把Students的list用map方法取出score, 然后初始值设置成0, 第一个值是0+80=80, 第二个值是80+81=161... 以此类推

取乘积的话可以用         int product = students.stream().map(Student::getScore).reduce(1,(a,b)->a*b);

这个方法还有一个不需要提供初始值的版本

Java8 stream api

Submitted by Lizhe on Thu, 04/13/2017 - 04:16

实际上java8的stream指导思想几乎跟hadoop的Mapreduce如出一辙, 

都是基于一个数据流,然后做reduce,最后进行一次聚合的操作,这种模型完美的匹配了多线程

import static静态导入是JDK1.5中的新特性。一般我们导入一个类都用 import com.....ClassName;而静态导入是这样:import static com.....ClassName.*;这里的多了个static,还有就是类名ClassName后面多了个 .* ,意思是导入这个类里的静态方法。当然,也可以只导入某个静态方法,只要把 .* 换成静态方法名就行了。然后在这个类中,就可以直接用方法名调用静态方法,而不必用ClassName.方法名 的方式来调用

下面的例子如果想使用多线程只需要改成

List<Student> stus3 = students.parallelStream().filter(s->s.getScore()>90).collect(toList());

Vagrant不能挂在共享文件夹

Submitted by Lizhe on Wed, 04/12/2017 - 05:52

 

vagrant突然不能挂在共享文件夹了, 提示我版本不匹配,这些文件夹全部不能映射

 config.vm.synced_folder "ansible", "/home/vagrant/ansible", owner: "vagrant", group: "vagrant"
 config.vm.synced_folder "deployment", "/home/vagrant/deployment", owner: "vagrant", group: "vagrant"
 config.vm.synced_folder "preparation", "/home/vagrant/preparation", owner: "vagrant", group: "vagrant"
 config.vm.synced_folder "C:/Users/Lizhe/.m2", "/root/.m2", owner: "vagrant", group: "vagrant"

安装增强插件以后好了

vagrant plugin install vagrant-vbguest

FSDataInputStream seek

Submitted by Lizhe on Tue, 04/11/2017 - 14:13

 

package hadoop.cat;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

使用 FileSystem 读取文件

Submitted by Lizhe on Tue, 04/11/2017 - 08:35

不能使用URLStreamHandlerFactory时,可以通过FileSystem来读取文件的输入流

public static FileSystem get(Configuration conf) throws IOException 
public static FileSystem get(URI uri, Configuration conf) throws IOException 
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException

conf对象用于封装服务器或者客户端信息,它需要使用 core-site.xml来指定

如果没有指定相关的配置,默认会使用本地文件系统, 下面是我本地的配置

 

$ vim ~/hadoop/etc/hadoop/core-site.xml

使用Java Hadoop URL 接口读取数据

Submitted by Lizhe on Tue, 04/11/2017 - 03:21

 

要从Hadoop文件系统读取文件,最简单的方法是使用java.net.URL对象打开数据流

不过使用这种方式需要先让虚拟机识别hadoop的URL数据流, 通过以下静态块实现

不过这种方式也有局限性, 每个java虚拟机只能调用一次这个方法, 因此通常在静态方法中调用

如果你醒目中的其他模块也需要使用这一方法,这种方式可能并不可取

package hadoop.cat;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class Cat {

Hadoop hdfs 文件系统

Submitted by Lizhe on Sun, 04/09/2017 - 07:06

Hadoop在50070端口上提供了一个web页面

15

创建文件夹

[root@lzvm bin]# ./hadoop fs -mkdir /books
[root@lzvm bin]# ./hadoop fs -ls /

14

拷贝本地文件到hdfs

这里要使用name node的端口,我配置的是9000,有的博客里写的是8020, 其实默认也是8020如果你不明确写端口的话

Combiner 函数

Submitted by Lizhe on Sat, 04/08/2017 - 08:01

在讨论Combiner函数之前我们先了解一下Mapreduce的数据流

每个作业(Job) 由两个task构成, 一个是map一个是reduce

Hadoop会为每个分片构建一个map任务 (一般分片大小为64M)

map任务将其输出写入本地硬盘,而不是HDFS

数据在reduce端合并, 也就是说,当Mapreduce运行时,一般情况你会拥有若干个map任务和一个reduce任务

这个reduce任务的输入为所有map任务的输出

reduce任务的数量并非由输入数据的大小决定(map是这样的), 而事实上是独立指定的.

如果有多个reduce任务,每个map任务就会针对输出进行分区(partition)

即为每个reduce任务创建一个分区

每个分区有许多个key和其对应的值 ( 一个key和多个value,在一个分区内)

分区由用户定义的partition函数控制,默认是哈希函数

在第二个例子里, eclipse环境生成了一个分区,但是linux命令行环境下生成了两个分区

 

试想如果数据量特别大的情况, 应该尽量避免map与reduce之间的数据传递

Mapreduce java 例子(2)

Submitted by Lizhe on Fri, 04/07/2017 - 14:18

首先目录结构是这样的

8

然后是3个java类

package hadoop.max;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;