word2vec 代码

程序员文章站 2022-06-15 14:33:41


# _*_ encoding=utf-8 _*_
import collections
import tensorflow as tf
import os
import math
import random
import zipfile
import sys
import numpy as np
import urllib.request
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

url = 'http://mattmahoney.net/dc/'
vocabulary_size =50000
batch_size = 128
embedding_size =128
skip_window =1
num_skips =2

valid_size = 16  # 抽取的验证单词数
valid_window = 100  # 只从频数最高的100个单词中抽取
valid_examples = np.random.choice(valid_window, valid_size, replace=False)  # 验证数据
num_sampled = 64  # 训练时用来负样本的噪声单词的数量

# 定义下载文本数据的函数
# 下载数据的安装包并且核对文件的大小
def maybe_download(filename, expected_bytes,progress =None):
    if not os.path.exists(filename):
        filename, _ = urllib.request.urlretrieve(url+ filename,filename,progress)  # 将URL表示的网络对象复制到本地文件

    return filename

filename = maybe_download('text8.zip', 28844032)
# unzip data
def read_data(filename):
    with zipfile.ZipFile(filename) as f:
        data = tf.compat.as_str_any(f.read(f.namelist()[0])).split()   #  read first dir,返回是list
    return data

words = read_data(filename)
print('Data size',len(words))

# 创建vocabulary词汇表 collections.Counter 统计单词列表中的单词的频数。
# 然后使用most_common 方法来获取top 50000频数的单词作为vocabulary词汇表
# dict 的查询复杂度为O(1)
def build_dataset(words):
    count =[['UNK',-1]]
    count.extend(collections.Counter(words).most_common(vocabulary_size-1))  # collections.Counter是对字典的补充,用于追踪值的出现次数。
    # most_common 表示返回TopN的排序,返回类型是列表,里面的元素是tuple,如:[('a', 5), ('r', 2), ('b', 2), ('c', 1), ('d', 1)]
    # 将元组加到列表当中,extend方法可以在列表的末尾一次性的追加另一个序列的多个值
    # c = Counter("abcdcba")
    # Counter({'a': 2, 'c': 2, 'b': 2, 'd': 1})

    # 获得dictionary,是给前50000的单词进行分类
    dictionary =dict()
    for word,_ in count:
        # print(word)
        # print(len(dictionary))
        dictionary[word] =len(dictionary)   # 相当于给dictionary分类了,给每个word进行从0 - num(count)

    data = list()
    unk_count =0
    # print(words)
    for word in words:
        if word in dictionary:
            index = dictionary[word]  # 将所有的单词通过dictionary词汇表找到相应的类别放在data中
            index = 0
            unk_count +=1
    count[0][1]=unk_count  # 相当于将不在字典里的单词个数返回给第一个元组
    reverse_dictionary =dict(zip(dictionary.values(),dictionary.keys()))  # 将可迭代的对象作为参数,将对应的元素打包成一个元组,然后返回这些元组组成的列表
    # 然后强制转化成字典结构
    return data, count, dictionary, reverse_dictionary

data, count, dictionary, reverse_dictionary = build_dataset(words)
print(data[:10])  #[5237, 3083, 12, 6, 195, 2, 3137, 46, 59, 156]

del words
print('Most common words (+UNK)',count[:5])
print('Sample data',data[:10],[reverse_dictionary[i] for i in data[:10]])

# 现在生成Word2vec的训练样本
data_index =0
# num_skips 是单个周期
def generate_batch(batch_size, num_skips, skip_window):
    global data_index
    assert batch_size %num_skips ==0   # 用来让程序测试这个condition,如果condition为false,那么raise一个AssertionError出来
    # if not condition:
    #     raise AssertionError()
    assert num_skips<=2 *skip_window
    batch = np.ndarray(shape=(batch_size), dtype=np.int32)
    labels = np.ndarray(shape=(batch_size, 1),dtype=np.int32)
    span = 2* skip_window +1   #某个单词创建相关的样本时覆盖的单词数的范围
    buffer = collections.deque(maxlen=span)   # 定义一个空的双向队列

    # 把span个单词顺序读入buffer
    for _ in range(span):   # 此时的span是3
        data_index =(data_index +1) %len(data)

        # 此时的buffer已经满了,相当于初始化结束了
    for i in range(batch_size // num_skips):  # num_skips 是每个单词生成的样本个数
        target = skip_window  # buffer中第skip_window个变量为目标单词
        target_to_avoid = [skip_window]
        for j in range(num_skips):
            while target in target_to_avoid:
                target = random.randint(0, span -1)
            batch[i*num_skips +j] = buffer[skip_window]
            labels[i*num_skips +j, 0] = buffer[target]
        data_index = (data_index+1)%len(data)
    return batch, labels

batch, labels = generate_batch(batch_size=8, num_skips= 2, skip_window= 1)
for i in range(8):
    print(batch[i], reverse_dictionary[batch[i]], '->',labels[i,0],

def plot_with_labels(low_dim_embs, labels, filename ='tsne.png'):
    assert low_dim_embs.shape[0]>=len(labels),"More labels than embeddings"
    plt.figure(figsize=(18, 18))
    for i,label in enumerate(labels):
        x, y = low_dim_embs[i,:]
        plt.scatter(x, y)
                     textcoords = 'offset points',
                     ha ='right',
                     va ='bottom')

graph = tf.Graph()
with graph.as_default():
    train_inputs = tf.placeholder(tf.int32, shape =[batch_size])
    train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
    valid_dataset = tf.constant(valid_examples, dtype=tf.int32)  # 验证数据

    with tf.device('/cpu:0'):
        embeddings = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
        embed = tf.nn.embedding_lookup(embeddings, train_inputs)
        nce_weights = tf.Variable(
            tf.truncated_normal([vocabulary_size, embedding_size],
                                stddev=1.0 / math.sqrt(embedding_size)))
        nce_biases = tf.Variable(tf.zeros([vocabulary_size]))
    loss = tf.reduce_mean(tf.nn.nce_loss(
        labels = train_labels,
        inputs =embed,
        num_sampled =num_sampled,

    optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)
    norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings),1,keep_dims=True))
    normalized_embeddings = embeddings / norm
    valid_embeddings = tf.nn.embedding_lookup(
        normalized_embeddings, valid_dataset)
    similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)
    init = tf.global_variables_initializer()

    num_steps =100001

    with tf.Session(graph=graph) as sess:

        average_loss =0
        for step in range(num_steps):
            batch_inputs, batch_labels = generate_batch(batch_size, num_skips, skip_window)
            feed_dict = {train_inputs:batch_inputs,train_labels:batch_labels}
            _, loss_val = sess.run([optimizer, loss], feed_dict=feed_dict)
            average_loss += loss_val

            if step %2000 ==0:
               if step >0:
                 average_loss /=2000
               print("Average loss at step",step,": ", average_loss)
               average_loss =0

            if step%10000==0:
                sim = similarity.eval()
                for i in range(valid_size):
                    valid_word = reverse_dictionary[valid_examples[i]]
                    top_k = 8
                    nearest = (-sim[i,:]).argsort()[1:top_k+1]
                    log_str = "Nearest to %s :" %valid_word
                    for k in range(top_k):
                        close_word = reverse_dictionary[nearest[k]]
                        log_str = "%s %s,"%(log_str, close_word)
        final_embedding = normalized_embeddings.eval()

tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
plot_only = 100
low_dim_embs = tsne.fit_transform(final_embedding[:plot_only, :])
labels = [reverse_dictionary[i] for i in range(plot_only)]
plot_with_labels(low_dim_embs, labels)


