算法的流程:
Spark pagerank。初始化:我们用pages(pairRDD)来记录每个页面和其相关联的页面之间的关系,用ranks(pairRDD)来记录每个页面初始化的rank,初始值为1.0 在每次迭代的过程中,对页面p,我们向其每个相邻的页面,发送一个至为rank(p)/numNeighbors(p)的贡献值 将每个页面收到的contributions相加得到contributionsReceived 将每个页面的排序值设置为0.15 +0.85 * contributionsReceived 算法的实现:
public static JavaPairRDD> run_page(){
JavaPairRDD> res = sc.textFile(
"/home/liang/workspace/learnSpark/pagerank.txt"
).mapToPair(new PairFunction>() {
@Override
public Tuple2> call(String s) throws Exception {
String key = s.split(" ")[0];
String values = s.split(" ")[1];
ArrayList values_integer = new ArrayList();
for(String str : values.split(",")){
values_integer.add(Integer.parseInt(str));
}
return new Tuple2>(Integer.parseInt(key), values_integer);
}
});
return res;
}
public static JavaPairRDD run_rank(){
JavaPairRDD res = sc.textFile(
"/home/liang/workspace/learnSpark/pagerank.txt"
).mapToPair(
line->new Tuple2(Integer.parseInt(line.split("")[0]),1.0)
);
return res;
}
public static JavaPairRDD> run_page(){
JavaPairRDD> res = sc.textFile(
"/home/liang/workspace/learnSpark/pagerank.txt"
).mapToPair(new PairFunction>() {
@Override
public Tuple2> call(String s) throws Exception {
String key = s.split(" ")[0];
String values = s.split(" ")[1];
ArrayList values_integer = new ArrayList();
for(String str : values.split(",")){
values_integer.add(Integer.parseInt(str));
}
return new Tuple2>(Integer.parseInt(key), values_integer);
}
});
return res;
}
public static JavaPairRDD run_rank(){
JavaPairRDD res = sc.textFile(
"/home/liang/workspace/learnSpark/pagerank.txt"
).mapToPair(
line->new Tuple2(Integer.parseInt(line.split("")[0]),1.0)
);
return res;
}