利用MR中文分词完成倒排索引
步骤
1、map分词
item1 --> [(token1,weight1),(token2,weight2),(token3,weight3)]
item2 --> [(token1,weight1),(token4,weight4),(token5,weight5)]
2、map_inverted,转换为以token为key
token1 --> (item1,weight1)
token2 --> (item1,,weight2)
token3 --> (item1,weight3)
token1 --> (item2,weight1)
token4 --> (item2,weight4)
token5 --> (item2,weight5)
3、reduce_inverted,根据key进行聚合
token1 --> [(item1,weight1),(item2,weight1)]
token2 --> [(item1,,weight2)]
token3 --> [(item1,weight3)]
token4 --> [(item2,weight4)]
token5 --> [(item2,weight5)]
测试数据:
1)map.py
#! /usr/bin/python
import sys
os.system('tar zxvf jieba.tgz > /dev/null')
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./')
import jieba
import jieba.posseg
import jieba.analyse
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
music_id = ss[0].strip()
music_name = ss[1].strip()
tmp_list = []
for x,w in jieba.analyse.extract_tags(music_name,withWeight = True):
tmp_list.append((x,float(w)))
final_token_score_sorted = sorted(tmp_list, key = lambda x: x[1], reverse = True)
print '\t'.join([music_id,music_name,'^A'.join(['^B'.join([token_score[0],str(token_score[1])]) for token_score in final_to
ken_score_sorted])])
测试: cat music_meta.data | python map.py > result1.local
2)map_inverted.py
import os
os.system('tar zxvf jieba.tgz > /dev/null')
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./')
import jieba
import jieba.posseg
import jieba.analyse
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 3:
continue
music_id = ss[0].strip()
music_name = ss[1].strip()
music_fealist = ss[2].strip()
for fea in music_fealist.split('^A'):
token, weight = fea.strip().split('^B')
print '\t'.join([token,music_name,weight])
测试: cat result1.local | python map_inverted.py > result2.local
注意分词后的权重,因为词库的原因,分出的相同的词,权重一样。
3)reduce_inverted.py
#! /usr/bin/python
import sys
import os
os.system('tar zxvf jieba.tgz > /dev/null')
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./')
import jieba
import jieba.posseg
import jieba.analyse
cur_token = None
m_list = []
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 3:
continue
token = ss[0].stri
weight = float(ss[2].strip())
if cur_token == None:
cur_token = token
if cur_token != token:
final_list = sorted(m_list,key = lambda x:x[1], reverse=True)
print '\t'.join([cur_token,'^A'.join(['^B'.join([name_weight[0],str(name_weight[1])]) for name_weight in final_list
])])
cur_token = token
m_list = []
m_list.append((name,weight))
final_list = sorted(m_list,key = lambda x:x[1], reverse=True)
print '\t'.join([cur_token,'^A'.join(['^B'.join([name_weight[0],str(name_weight[1])]) for name_weight in final_list])])
测试:cat result2.local | python reduce_inverted.py > result3.local
倒排索引完成。
4)run.sh
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_A="/Test/fenci_mr_daopai/music_meta.data"
OUTPUT_PATH_A="/Test/fenci_mr_daopai/result1"
OUTPUT_PATH_B="/Test/fenci_mr_daopai/result2"
$HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH_A
$HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH_B
# step 1
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_A \
-output $OUTPUT_PATH_A \
-mapper "python map.py" \
-file ./map.py \
-file ./jieba.tgz \
-jobconf mapred.job.name="fenci_mr" \
-jobconf mapred.reduce.tasks=0
# step 2
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $OUTPUT_PATH_A \
-output $OUTPUT_PATH_B \
-mapper "python map_inverted.py" \
-reducer "python reduce_inverted.py" \
-file ./jieba.tgz \
-file ./map_inverted.py \
-file ./reduce_inverted.py \
-jobconf mapred.job.name="fenci_mr_2" \
-jobconf mapred.reduce.tasks=2
注意:jieba的包,在两次map中都要进行分发。
bash run.sh 执行MR
结果:
上一篇: ElasticSearch学习总结(一):信息检索基础理论
下一篇: Hadoop之倒排索引