欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Windows环境下大数据处理的构想(一)

程序员文章站 2022-05-10 09:35:03
为什么不呢?我们有了RPC/RMI和MAP,为什么不能在windows环境下处理大数据呢?windows是迄今为止最普及的操作系统,据市调公司NetMarketShare最新(2019年5月)统计数据,在桌面操作系统方面,目前Windows 10的市场占有率已达45.73%。而Windows 7的市 ......

为什么不呢?我们有了rpc/rmi和map,为什么不能在windows环境下处理大数据呢?windows是迄今为止最普及的操作系统,据市调公司netmarketshare最新(2019年5月)统计数据,在桌面操作系统方面,目前windows 10的市场占有率已达45.73%。而windows 7的市场占有率为35.44%。排在第三位的是windows 8.1,市场份额为3.97%。这三个版本的windows市场占有率之和为:85.14%。可以说windows占据了绝大多数用户的心。这与windows界面友好统一、易操作、功能丰富、软硬件兼容性都高密不可分。我们或绝大多数人(目前非公司)在windows上产生的数据要远远多于在linux上产生的数据,如果能直接在windows处理这些数据,那就太方便太好了。
 既然windows这么普及,又这么方便实用,而且个人在windows上产生的数据又那么多(如果在windows上能直接处理大数据的话,以后windows服务器的数量肯定会超过linux服务器),那为什么从大数据处理技术诞生之日起都是基于linux的,从最早的hadoop到国人主导的kylin再到较新的flink,莫不如是?这要从linux和windows的文件系统和目录结构说起,linux操作系统中有一个重要的概念:一切皆文件。linux将独立的文件系统组合成了一个层次化的树形结构,并且由一个统一的、虚拟的根目录代表整个文件系统。linux将新的文件系统通过一个称为“挂装”或“挂上”的操作将其挂装到根目录的某个子目录上,从而让不同的文件系统结合成为一个“整体”。这个“整体”即linux文件系统,实际上是每个实际文件系统从操作系统和系统服务中分离出来,通过一个接口层:虚拟文件系统或vfs来通讯。虚拟文件系统既没有文件,也不直接管理文件,它只是用户与实际文件系统之间的接口。我们操作的linux目录或文件被虚拟文件系统映射到真实的磁盘存储区域。vfs使得linux可以支持多个不同的文件系统,每个表示一个vfs的通用接口。linux支持的文件系统包括: fat、vfat、fat32、minix 等不同类型的文件系统,但extn才是linux的“原配”文件系统。由于软件将linux文件系统的所有细节进行了转换,所以linux核心的其它部分及系统中运行的程序将看到统一的文件系统。linux的虚拟文件系统允许用户同时能透明地安装许多不同的文件系统,而不需要具体地加以区别,即用户和进程不需要知道文件所在的文件系统类型,而只需要像使用extn文件系统中的文件一样使用它们。这一点正是windows所不具备的。
 windows采用地是多根目录文件系统,即一棵独立或多棵并列(根据磁盘分区的多少,一个分区对应一棵目录树)的树形结构;而linux采用地是统一根目录结构,只有一棵巨大的树结构。如果用windows作为服务器来处理大数据,每台服务器的磁盘分区数根据自身磁盘容量、使用者分区习惯大概率不相同,导致盘符参差不齐。数据就有可能在这台服务器存储在d盘,在另一台存储在e盘等等,很难统一管理。而linux就不存在这样的问题,因为它的文件系统利用vfs屏蔽了分区细节,可以在每台linux服务器上根目录下或根目录的某个子目录下建立一致的数据存储目录。但我们的命题就是要在windows上处理大数据,如何解决windows数据统一存储管理问题呢?暂时想到两个办法:一是约定俗成,约定同一个集群的windows服务器分区数相同,盘符相同,或至少都有某一个分区,数据存放在该分区;二是由客户端向集群提交任务时,附带提交一个数据存储地址/目录表,表里记录得是待处理分析的数据具体在某台服务器或节点上的某个具体位置,比如说:某节点:port/某分区/某目录/某文件.xxx(如果整个目录都是,就不需要具体到文件),这样就可以将数据存储到具体服务器不同的盘符下。第二种方法灵活,易于在现实中的机器上搭建集群,但麻烦,需要占用额外的存储,可以用map等集合类对象存储,至于具体怎样实现,等到“构想”完集群、模拟提交任务时再说。
 大数据,故名思义,数据量很大,单结点(主机)的存储容量有限,需要多结点、分布式地存储。而且任一个节点的数据量也往往远远大于要分析处理数据的应用代码量,所以计算向数据移动,可以大大减少io(包括磁盘io和网络io),从时间上和空间上降低成本。无论是将代码发往数据节点,还是shuffle阶段从map端向reduce端拉取数据,都需要数据迁移。而数据迁移就要用到rpc(remote procedure call)——远程过程调用,在百度百科中是这样解释的:它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。rpc协议假定某些传输协议的存在,如tcp或udp,为通信程序之间携带信息数据。在osi网络通信模型中,rpc跨越了传输层和应用层。rpc使得开发包括网络分布式多程序在内的应用程序更加容易。rpc采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。有多种 rpc模式和执行。最初由 sun 公司提出。ietf onc 宪章重新修订了 sun 版本,使得 onc rpc 协议成为 ietf 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(dce)。dce也是大数据框架执行的基础。在java中叫rmi。无论hadoop还是spark底层都是先搭建一个rpc框架,各个角色对象在此基础上进行通信和数据传输。
 有了rpc/rmi,我们就可以在windows服务器间传输各种数据包括代码数据和待分析处理数据了。当然还有很多问题需要解决,等遇到时再说。我们先从最简单的说起,rpc/rmi可以在windows下轻而易举地编程、实现、应用。举例如下,wordcount在本地是这样实现的:
public class stringvaluememstore  implements  serializable {

 /**
  * xxxxx
  */
 private static final long serialversionuid = -4505251514307025804l;
 ......(省略)
 /**
  * 重点是下面这个方法
  */
 public  static  map<string, integer>  wordcount(string  filepath, string  separator) {
  map<string, integer>  map = new  hashmap<>();
  int  one = 1;
  int  val;
  
  path  path = paths.get(filepath);
  
  if (files.notexists(path)) {
   system.err.println(path + " dose not exist.");
            return  null;
  }
  
  if (files.isdirectory(path)) {
   system.err.println(path + " is not a file.");
   return  null;
  }
  
  reader  fin = null;
  bufferedreader  in = null;
  try {
   fin = files.newbufferedreader(path, charset.forname("utf-8"));
   in = new  bufferedreader(fin);
   string  line;
   
   while((line = in.readline()) != null) {
    for (string  key : line.split(separator)) {
     if(map.get(key) == null) {
      map.put(key, one);
     } else {
      val = map.get(key) + 1;
      map2.put(key, val);
     }
    }
   }
  } catch (ioexception e) {
   e.printstacktrace();
   return  null;
  } finally {
   try {
    if(fin != null) {
     fin.close();
    }
    if(in != null) {
     in.close();
    }
   } catch (ioexception e) {
    e.printstacktrace();
   }
  }
  
  return  map;
 }
}
调用如下:
map<string, integer>  map = stringvaluememstore.wordcount("./data/words.txt",
" ");
for(string  k : map.keyset()) {
 system.out.println(k + "\t" + map.get(k));
}
加上rmi实现最简分布式是这样实现的:
public interface wcservice extends remote {
 map<string, integer>  wordcount(string  filepath, string  separator) throws remoteexception;
}

public class wcserviceimpl extends unicastremoteobject implements wcservice {

 /**
  *
  */
 private static final long serialversionuid = 4478936029983919271l;

 protected wcserviceimpl() throws remoteexception {
 }

 @override
 public map<string, integer> wordcount(string filepath, string separator)  throws remoteexception {
  map<string, integer>  map = new  hashmap<>();
  int  val;
  
  path  path = paths.get(filepath);
  
  if (files.notexists(path)) {
   system.err.println(path + " dose not exist.");
            return  null;
  }
  
  if (files.isdirectory(path)) {
   system.err.println(path + " is not a file.");
   return  null;
  }
  
  reader  fin = null;
  bufferedreader  in = null;
  try {
   fin = files.newbufferedreader(path, charset.forname("utf-8"));
   in = new  bufferedreader(fin);
   string  line;
   
   while((line = in.readline()) != null) {
    for (string  key : line.split(separator)) {
     if(map.get(key) == null) {
      map.put(key, constants.one);
     } else {
      val = map.get(key) + 1;
      map.put(key, val);
     }
    }
   }
  } catch (ioexception e) {
   e.printstacktrace();
   return  null;
  } finally {
   try {
    if(fin != null) {
     fin.close();
    }
    if(in != null) {
     in.close();
    }
   } catch (ioexception e) {
    e.printstacktrace();
   }
  }
  
  return  map;
 }

}

public class wcserver {

 public static void main(string[] args) throws remoteexception, malformedurlexception {
  // 注册端口
 locateregistry.createregistry(constants.port);
/*
 constants.wccount_rmi = "rmi://某节点(这里同服务器节点):" + port + "/net.xxx.xxx.wordcount.xxx.wcserviceimpl";
*/
// 将服务对象绑定url
  naming.rebind(constants.wccount_rmi, new  wcserviceimpl());
 }

}

public class wcclient {

 public static void main(string[] args) throws malformedurlexception, remoteexception, notboundexception {
  // 在注册的服务中根据url寻找服务
  wcservice  wcservice = (wcservice) naming.lookup(constants.wccount_rmi);
  map<string, integer> wordcount = wcservice.wordcount("x:/xxx/words.txt", " ");
  for(string  k : wordcount.keyset()) {
   system.out.println(k + "\t" + wordcount.get(k));
  }
 }
}
当然这里为了简单起见,我们把代码写死了,在实际的大数据处理框架中处理逻辑是由用户完成的。我们只需要提供接口,而且第一步先进行数据映射,即map阶段,第二步才是聚合统计reduce/aggregate,中间还有shuffle,最开始还有split。这里只是为了说明rmi在windows中可以轻易、方便地使用。有了rmi,我们一开始的数据存储和处理逻辑就可以分布在分布式环境中了;有了rmi,我们的数据和处理逻辑就可以“*”地在节点间“旅行”了。这构成了大数据处理框架的基础。千里之行,始于足下。借鉴hadoop和spark,我们应该先搭建rpc环境。rmi使用起来太麻烦,而且功能有限,现实中的大数据框架从未真正使用过rmi,而是使用对它的封装和改良,比如说netty。下一节我们介绍netty,并逐渐用netty搭建起rpc环境。学识有限,描述错误、不周的地方,还望各位技术大佬批评指正。这一节主要论述了在windows环境下处理大数据的可能性。