欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

利用MR中文分词完成倒排索引

程序员文章站 2022-04-28 18:10:41
...

步骤

1、map分词 

item1 --> [(token1,weight1),(token2,weight2),(token3,weight3)]

item2 --> [(token1,weight1),(token4,weight4),(token5,weight5)]

2、map_inverted,转换为以token为key

token1 --> (item1,weight1)

token2 --> (item1,,weight2)

token3 --> (item1,weight3)

token1 --> (item2,weight1)

token4 --> (item2,weight4)

token5 --> (item2,weight5)

3、reduce_inverted,根据key进行聚合

token1  --> [(item1,weight1),(item2,weight1)]

token2 --> [(item1,,weight2)]

token3 --> [(item1,weight3)]

token4 --> [(item2,weight4)]

token5 --> [(item2,weight5)]

测试数据:

利用MR中文分词完成倒排索引

1)map.py

#! /usr/bin/python

import sys
os.system('tar zxvf jieba.tgz > /dev/null')
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./')

import jieba
import jieba.posseg
import jieba.analyse

for line in sys.stdin:
        ss = line.strip().split('\t')
        if len(ss) != 2:
                continue
        music_id = ss[0].strip()
        music_name = ss[1].strip()
 
        tmp_list = []
        for x,w in jieba.analyse.extract_tags(music_name,withWeight = True):
                tmp_list.append((x,float(w)))

        final_token_score_sorted = sorted(tmp_list, key = lambda x: x[1], reverse = True)
        print '\t'.join([music_id,music_name,'^A'.join(['^B'.join([token_score[0],str(token_score[1])]) for token_score in final_to
ken_score_sorted])])

测试: cat music_meta.data | python map.py > result1.local 

利用MR中文分词完成倒排索引

2)map_inverted.py

import os

os.system('tar zxvf jieba.tgz > /dev/null')
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./')


import jieba
import jieba.posseg
import jieba.analyse

for line in sys.stdin:
        ss = line.strip().split('\t')
        if len(ss) != 3:
                continue
        music_id = ss[0].strip()
        music_name = ss[1].strip()
        music_fealist = ss[2].strip()

        for fea in music_fealist.split('^A'):
                token, weight = fea.strip().split('^B')
                print '\t'.join([token,music_name,weight])

测试: cat result1.local | python map_inverted.py > result2.local 

利用MR中文分词完成倒排索引

注意分词后的权重,因为词库的原因,分出的相同的词,权重一样。

3)reduce_inverted.py

#! /usr/bin/python

import sys 
import os

os.system('tar zxvf jieba.tgz > /dev/null')
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./')

import jieba
import jieba.posseg
import jieba.analyse

cur_token = None
m_list = []

for line in sys.stdin:
        ss = line.strip().split('\t')
        if len(ss) != 3:
                continue
        token = ss[0].stri
        weight = float(ss[2].strip())

        if cur_token == None:
                cur_token = token

        if cur_token != token:
                final_list = sorted(m_list,key = lambda x:x[1], reverse=True)
                print '\t'.join([cur_token,'^A'.join(['^B'.join([name_weight[0],str(name_weight[1])]) for name_weight in final_list
])])
                cur_token = token
                m_list = []

        m_list.append((name,weight))

final_list = sorted(m_list,key = lambda x:x[1], reverse=True)
print '\t'.join([cur_token,'^A'.join(['^B'.join([name_weight[0],str(name_weight[1])]) for name_weight in final_list])])

测试:cat result2.local | python reduce_inverted.py > result3.local 

利用MR中文分词完成倒排索引

倒排索引完成。

4)run.sh

HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"

INPUT_FILE_PATH_A="/Test/fenci_mr_daopai/music_meta.data"

OUTPUT_PATH_A="/Test/fenci_mr_daopai/result1"
OUTPUT_PATH_B="/Test/fenci_mr_daopai/result2"


$HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH_A
$HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH_B

# step 1
$HADOOP_CMD jar $STREAM_JAR_PATH \
        -input $INPUT_FILE_PATH_A \
        -output $OUTPUT_PATH_A \
        -mapper "python map.py" \
        -file ./map.py \
        -file ./jieba.tgz \
        -jobconf mapred.job.name="fenci_mr" \
        -jobconf mapred.reduce.tasks=0

# step 2
$HADOOP_CMD jar $STREAM_JAR_PATH \
        -input $OUTPUT_PATH_A \
        -output $OUTPUT_PATH_B \
        -mapper "python map_inverted.py" \
        -reducer "python reduce_inverted.py" \
        -file ./jieba.tgz \
        -file ./map_inverted.py \
        -file ./reduce_inverted.py \
        -jobconf mapred.job.name="fenci_mr_2" \
        -jobconf mapred.reduce.tasks=2

注意:jieba的包,在两次map中都要进行分发。

bash run.sh 执行MR

结果:

利用MR中文分词完成倒排索引