欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

最短路径Mapreduce实现

程序员文章站 2022-07-14 20:54:27
...

 

¢A map task receives
lKey: node n
lValue: D (distance from start); points-to (list of nodes reachable from n)
¢p \in  points-to: emit (p, D+1)
¢The reduce task gathers possible distances to a given p and selects the minimum one
input文件格式为:<node id><distance to the start node><[node id it can reach]:>
map过程中根据每个node id的distance输出“VALUE”类型键值对:[(node id it can reach):(distance + 1)]。如果distance是inf,则不输出“VALUE”类型的键值对;另外还需要输出node自身的连接信息,即“NODE”类型的键值对:<node id>:<[node id it can reach]:>。
Reduce过程输入为<node id>[VALUE distance]/[NODE node id it can reach:]。计算出VALUE list中的最小值,输出为<node id><min of distance><node id it can reach> (和输入文件格式一致)。
Mapper代码:
package com.hadoop.dijkstra;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DjksMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

	private String MAXDIS = Integer.MAX_VALUE + "";

	@Override
	protected void setup(Context context) throws IOException,
			InterruptedException {

	}

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
		// @ Maryland)
		// Key is node n
		// Value is D, Points-To
		// For every point (or key), look at everything it points to.
		// Emit or write to the points to variable with the current distance + 1
		Text word = new Text();
		String line = value.toString();// looks like 1 0 2:3:
		String[] sp = line.split(" ");// splits on space
		if (sp[1].compareTo(MAXDIS) != 0) { // we don't care those lines with
											// MAX distance
			int distanceadd = Integer.parseInt(sp[1]) + 1;
			String[] PointsTo = sp[2].split(":");
			for (int i = 0; i < PointsTo.length; i++) {
				word.set("VALUE " + distanceadd);// tells me to look at distance
													// value
				context.write(new LongWritable(Integer.parseInt(PointsTo[i])),
						word);
				word.clear();
			}
		}
		// pass in current node's distance (if it is the lowest distance)
		word.set("VALUE " + sp[1]);
		context.write(new LongWritable(Integer.parseInt(sp[0])), word);
		word.clear();

		word.set("NODES " + sp[2]);// tells me to append on the final tally
		context.write(new LongWritable(Integer.parseInt(sp[0])), word);
		word.clear();

	}
}
Reducer代码:
package com.hadoop.dijkstra;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class DjksReducer extends
		Reducer<LongWritable, Text, LongWritable, Text> {

	public void reduce(LongWritable key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
		// @ Maryland)
		// The key is the current point
		// The values are all the possible distances to this point
		// we simply emit the point and the minimum distance value

		String nodes = "UNMODED";
		Text word = new Text();
		int lowest = Integer.MAX_VALUE;// start at infinity

		for (Text val : values) {// looks like NODES/VALUES 1 0 2:3:, we need to
									// use the first as a key
			String[] sp = val.toString().split(" ");// splits on space
			// look at first value
			if (sp[0].equalsIgnoreCase("NODES")) {
				nodes = null;
				nodes = sp[1];
			} else if (sp[0].equalsIgnoreCase("VALUE")) {
				int distance = Integer.parseInt(sp[1]);
				lowest = Math.min(distance, lowest);
			}
		}
		word.set(lowest + " " + nodes);
		context.write(key, word);
		word.clear();
	}
}
 
         算法需要迭代多次最终得到各个顶点到start顶点的最短距离。每个迭代round都是一次mapreduce。后一轮的输入为前一轮的输出,直到结果与上一轮相同。主程序代码:
package com.hadoop.dijkstra;

import java.io.*;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Djkstra {
	public static String OUT = "outfile";
	public static String IN = "inputlarger";

	public static void main(String args[]) throws IOException,
			InterruptedException, ClassNotFoundException {

		// set in and out to args.
		IN = args[0];
		OUT = args[1];

		String infile = IN;
		String outputfile = OUT + System.nanoTime();

		boolean isdone = false;
		@SuppressWarnings("unused")
		boolean success = false;

		HashMap<Integer, Integer> _map = new HashMap<Integer, Integer>();

		while (isdone == false) {
			Configuration conf = new Configuration();
			conf.set("mapred.textoutputformat.separator", " ");// make the key
																// -> value
																// space
																// separated
																// (for
																// iterations)
			Job job = new Job(conf);
			job.setJarByClass(Djkstra.class);
			job.setJobName("Dijkstra");

			job.setMapperClass(DjksMapper.class);
			job.setReducerClass(DjksReducer.class);

			FileInputFormat.addInputPath(job, new Path(infile));
			FileOutputFormat.setOutputPath(job, new Path(outputfile));

			job.setMapOutputKeyClass(LongWritable.class);
			job.setMapOutputValueClass(Text.class);
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(Text.class);

			success = job.waitForCompletion(true);

			// remove the input file
			if (infile != IN) {
				String indir = infile.replace("part-r-00000", "");
				Path ddir = new Path(indir);
				FileSystem dfs = FileSystem.get(conf);
				dfs.delete(ddir, true);
			}

			// output path as the input path for next round
			// TODO: what if there are more than one reducers?
			infile = outputfile + "/part-r-00000";
			outputfile = OUT + System.nanoTime();

			// do we need to re-run the job with the new input file??
			isdone = true;// set the job to NOT run again!
			Path ofile = new Path(infile);
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(
					fs.open(ofile)));

			HashMap<Integer, Integer> imap = new HashMap<Integer, Integer>();
			String line = br.readLine();
			while (line != null) {
				// each line looks like 0 1 2:3:
				// we need to verify node -> distance doesn't change
				String[] sp = line.split(" ");
				int node = Integer.parseInt(sp[0]);
				int distance = Integer.parseInt(sp[1]);
				imap.put(node, distance);
				line = br.readLine();
			}
			if (_map.isEmpty()) {
				// first iteration... must do a second iteration regardless!
				isdone = false;
			} else {
				Iterator<Integer> itr = imap.keySet().iterator();
				while (itr.hasNext()) {
					int key = itr.next();
					int val = imap.get(key);
					if (_map.get(key) != val) {
						// values aren't the same... we aren't at convergence
						// yet (iterate until results are the same as last round)
						isdone = false;
					}
				}
			}
			if (isdone == false) {
				_map.putAll(imap);// copy imap to _map for the next iteration
									// (if required)
			}
		}

	}
}
 
 
 
 
 

 

相关标签: big data