最短路径Mapreduce实现
程序员文章站
2022-07-14 20:54:27
...
¢A map task receives
lKey: node n
lValue: D (distance from start); points-to (list of nodes reachable from n)
¢p \in points-to: emit (p, D+1)
¢The reduce task gathers possible distances to a given p and selects the minimum one
input文件格式为:<node id><distance to the start node><[node id it can reach]:>
map过程中根据每个node id的distance输出“VALUE”类型键值对:[(node id it can reach):(distance + 1)]。如果distance是inf,则不输出“VALUE”类型的键值对;另外还需要输出node自身的连接信息,即“NODE”类型的键值对:<node id>:<[node id it can reach]:>。
Reduce过程输入为<node id>[VALUE distance]/[NODE node id it can reach:]。计算出VALUE list中的最小值,输出为<node id><min of distance><node id it can reach> (和输入文件格式一致)。
Mapper代码:
package com.hadoop.dijkstra;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DjksMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
private String MAXDIS = Integer.MAX_VALUE + "";
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
// @ Maryland)
// Key is node n
// Value is D, Points-To
// For every point (or key), look at everything it points to.
// Emit or write to the points to variable with the current distance + 1
Text word = new Text();
String line = value.toString();// looks like 1 0 2:3:
String[] sp = line.split(" ");// splits on space
if (sp[1].compareTo(MAXDIS) != 0) { // we don't care those lines with
// MAX distance
int distanceadd = Integer.parseInt(sp[1]) + 1;
String[] PointsTo = sp[2].split(":");
for (int i = 0; i < PointsTo.length; i++) {
word.set("VALUE " + distanceadd);// tells me to look at distance
// value
context.write(new LongWritable(Integer.parseInt(PointsTo[i])),
word);
word.clear();
}
}
// pass in current node's distance (if it is the lowest distance)
word.set("VALUE " + sp[1]);
context.write(new LongWritable(Integer.parseInt(sp[0])), word);
word.clear();
word.set("NODES " + sp[2]);// tells me to append on the final tally
context.write(new LongWritable(Integer.parseInt(sp[0])), word);
word.clear();
}
}
Reducer代码:
package com.hadoop.dijkstra;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class DjksReducer extends
Reducer<LongWritable, Text, LongWritable, Text> {
public void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
// @ Maryland)
// The key is the current point
// The values are all the possible distances to this point
// we simply emit the point and the minimum distance value
String nodes = "UNMODED";
Text word = new Text();
int lowest = Integer.MAX_VALUE;// start at infinity
for (Text val : values) {// looks like NODES/VALUES 1 0 2:3:, we need to
// use the first as a key
String[] sp = val.toString().split(" ");// splits on space
// look at first value
if (sp[0].equalsIgnoreCase("NODES")) {
nodes = null;
nodes = sp[1];
} else if (sp[0].equalsIgnoreCase("VALUE")) {
int distance = Integer.parseInt(sp[1]);
lowest = Math.min(distance, lowest);
}
}
word.set(lowest + " " + nodes);
context.write(key, word);
word.clear();
}
}
算法需要迭代多次最终得到各个顶点到start顶点的最短距离。每个迭代round都是一次mapreduce。后一轮的输入为前一轮的输出,直到结果与上一轮相同。主程序代码:
package com.hadoop.dijkstra;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Djkstra {
public static String OUT = "outfile";
public static String IN = "inputlarger";
public static void main(String args[]) throws IOException,
InterruptedException, ClassNotFoundException {
// set in and out to args.
IN = args[0];
OUT = args[1];
String infile = IN;
String outputfile = OUT + System.nanoTime();
boolean isdone = false;
@SuppressWarnings("unused")
boolean success = false;
HashMap<Integer, Integer> _map = new HashMap<Integer, Integer>();
while (isdone == false) {
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", " ");// make the key
// -> value
// space
// separated
// (for
// iterations)
Job job = new Job(conf);
job.setJarByClass(Djkstra.class);
job.setJobName("Dijkstra");
job.setMapperClass(DjksMapper.class);
job.setReducerClass(DjksReducer.class);
FileInputFormat.addInputPath(job, new Path(infile));
FileOutputFormat.setOutputPath(job, new Path(outputfile));
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
success = job.waitForCompletion(true);
// remove the input file
if (infile != IN) {
String indir = infile.replace("part-r-00000", "");
Path ddir = new Path(indir);
FileSystem dfs = FileSystem.get(conf);
dfs.delete(ddir, true);
}
// output path as the input path for next round
// TODO: what if there are more than one reducers?
infile = outputfile + "/part-r-00000";
outputfile = OUT + System.nanoTime();
// do we need to re-run the job with the new input file??
isdone = true;// set the job to NOT run again!
Path ofile = new Path(infile);
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(
fs.open(ofile)));
HashMap<Integer, Integer> imap = new HashMap<Integer, Integer>();
String line = br.readLine();
while (line != null) {
// each line looks like 0 1 2:3:
// we need to verify node -> distance doesn't change
String[] sp = line.split(" ");
int node = Integer.parseInt(sp[0]);
int distance = Integer.parseInt(sp[1]);
imap.put(node, distance);
line = br.readLine();
}
if (_map.isEmpty()) {
// first iteration... must do a second iteration regardless!
isdone = false;
} else {
Iterator<Integer> itr = imap.keySet().iterator();
while (itr.hasNext()) {
int key = itr.next();
int val = imap.get(key);
if (_map.get(key) != val) {
// values aren't the same... we aren't at convergence
// yet (iterate until results are the same as last round)
isdone = false;
}
}
}
if (isdone == false) {
_map.putAll(imap);// copy imap to _map for the next iteration
// (if required)
}
}
}
}