word2vec 代码
程序员文章站
2022-06-15 14:33:41
...
github:https://github.com/hacker-wei/tensorflow-tutorial/blob/master/Word2vec
# _*_ encoding=utf-8 _*_
import collections
import tensorflow as tf
slim=tf.contrib.slim
import os
import math
import random
import zipfile
import sys
import numpy as np
import urllib.request
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
url = 'http://mattmahoney.net/dc/'
vocabulary_size =50000
batch_size = 128
embedding_size =128
skip_window =1
num_skips =2
valid_size = 16 # 抽取的验证单词数
valid_window = 100 # 只从频数最高的100个单词中抽取
valid_examples = np.random.choice(valid_window, valid_size, replace=False) # 验证数据
num_sampled = 64 # 训练时用来负样本的噪声单词的数量
# 定义下载文本数据的函数
# 下载数据的安装包并且核对文件的大小
def maybe_download(filename, expected_bytes,progress =None):
if not os.path.exists(filename):
filename, _ = urllib.request.urlretrieve(url+ filename,filename,progress) # 将URL表示的网络对象复制到本地文件
return filename
filename = maybe_download('text8.zip', 28844032)
# unzip data
def read_data(filename):
with zipfile.ZipFile(filename) as f:
data = tf.compat.as_str_any(f.read(f.namelist()[0])).split() # read first dir,返回是list
return data
words = read_data(filename)
print('Data size',len(words))
# 创建vocabulary词汇表 collections.Counter 统计单词列表中的单词的频数。
# 然后使用most_common 方法来获取top 50000频数的单词作为vocabulary词汇表
# dict 的查询复杂度为O(1)
def build_dataset(words):
count =[['UNK',-1]]
count.extend(collections.Counter(words).most_common(vocabulary_size-1)) # collections.Counter是对字典的补充,用于追踪值的出现次数。
# most_common 表示返回TopN的排序,返回类型是列表,里面的元素是tuple,如:[('a', 5), ('r', 2), ('b', 2), ('c', 1), ('d', 1)]
# 将元组加到列表当中,extend方法可以在列表的末尾一次性的追加另一个序列的多个值
# c = Counter("abcdcba")
# Counter({'a': 2, 'c': 2, 'b': 2, 'd': 1})
# 获得dictionary,是给前50000的单词进行分类
#print(count)
dictionary =dict()
#print(dictionary)
for word,_ in count:
# print(word)
# print(len(dictionary))
dictionary[word] =len(dictionary) # 相当于给dictionary分类了,给每个word进行从0 - num(count)
data = list()
unk_count =0
# print(words)
for word in words:
if word in dictionary:
index = dictionary[word] # 将所有的单词通过dictionary词汇表找到相应的类别放在data中
else:
index = 0
unk_count +=1
data.append(index)
count[0][1]=unk_count # 相当于将不在字典里的单词个数返回给第一个元组
reverse_dictionary =dict(zip(dictionary.values(),dictionary.keys())) # 将可迭代的对象作为参数,将对应的元素打包成一个元组,然后返回这些元组组成的列表
#print(data)
# 然后强制转化成字典结构
return data, count, dictionary, reverse_dictionary
data, count, dictionary, reverse_dictionary = build_dataset(words)
#print(count)
print(data[:10]) #[5237, 3083, 12, 6, 195, 2, 3137, 46, 59, 156]
#print(reverse_dictionary)
del words
print('Most common words (+UNK)',count[:5])
print('Sample data',data[:10],[reverse_dictionary[i] for i in data[:10]])
# 现在生成Word2vec的训练样本
data_index =0
# num_skips 是单个周期
def generate_batch(batch_size, num_skips, skip_window):
global data_index
assert batch_size %num_skips ==0 # 用来让程序测试这个condition,如果condition为false,那么raise一个AssertionError出来
# if not condition:
# raise AssertionError()
assert num_skips<=2 *skip_window
batch = np.ndarray(shape=(batch_size), dtype=np.int32)
labels = np.ndarray(shape=(batch_size, 1),dtype=np.int32)
#print(batch)
#print(labels)
span = 2* skip_window +1 #某个单词创建相关的样本时覆盖的单词数的范围
buffer = collections.deque(maxlen=span) # 定义一个空的双向队列
#print("lalalallal:",data_index)
# 把span个单词顺序读入buffer
for _ in range(span): # 此时的span是3
#print("修改后:",data_index)
#print(data[data_index])
buffer.append(data[data_index])
data_index =(data_index +1) %len(data)
# 此时的buffer已经满了,相当于初始化结束了
for i in range(batch_size // num_skips): # num_skips 是每个单词生成的样本个数
target = skip_window # buffer中第skip_window个变量为目标单词
target_to_avoid = [skip_window]
for j in range(num_skips):
while target in target_to_avoid:
target = random.randint(0, span -1)
target_to_avoid.append(target)
batch[i*num_skips +j] = buffer[skip_window]
#print(buffer[skip_window])
labels[i*num_skips +j, 0] = buffer[target]
buffer.append(data[data_index])
data_index = (data_index+1)%len(data)
return batch, labels
batch, labels = generate_batch(batch_size=8, num_skips= 2, skip_window= 1)
for i in range(8):
print(batch[i], reverse_dictionary[batch[i]], '->',labels[i,0],
reverse_dictionary[labels[i,0]])
def plot_with_labels(low_dim_embs, labels, filename ='tsne.png'):
assert low_dim_embs.shape[0]>=len(labels),"More labels than embeddings"
plt.figure(figsize=(18, 18))
print(labels)
for i,label in enumerate(labels):
x, y = low_dim_embs[i,:]
plt.scatter(x, y)
plt.annotate(label,
xy=(x,y),
xytext=(5,2),
textcoords = 'offset points',
ha ='right',
va ='bottom')
plt.savefig(filename)
graph = tf.Graph()
with graph.as_default():
train_inputs = tf.placeholder(tf.int32, shape =[batch_size])
train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
valid_dataset = tf.constant(valid_examples, dtype=tf.int32) # 验证数据
with tf.device('/cpu:0'):
embeddings = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
print(embeddings)
embed = tf.nn.embedding_lookup(embeddings, train_inputs)
nce_weights = tf.Variable(
tf.truncated_normal([vocabulary_size, embedding_size],
stddev=1.0 / math.sqrt(embedding_size)))
nce_biases = tf.Variable(tf.zeros([vocabulary_size]))
loss = tf.reduce_mean(tf.nn.nce_loss(
weights=nce_weights,
biases=nce_biases,
labels = train_labels,
inputs =embed,
num_sampled =num_sampled,
num_classes=vocabulary_size))
optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)
norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings),1,keep_dims=True))
normalized_embeddings = embeddings / norm
valid_embeddings = tf.nn.embedding_lookup(
normalized_embeddings, valid_dataset)
similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)
init = tf.global_variables_initializer()
num_steps =100001
with tf.Session(graph=graph) as sess:
init.run()
print('Initialized')
average_loss =0
for step in range(num_steps):
batch_inputs, batch_labels = generate_batch(batch_size, num_skips, skip_window)
feed_dict = {train_inputs:batch_inputs,train_labels:batch_labels}
_, loss_val = sess.run([optimizer, loss], feed_dict=feed_dict)
average_loss += loss_val
if step %2000 ==0:
if step >0:
average_loss /=2000
print("Average loss at step",step,": ", average_loss)
average_loss =0
if step%10000==0:
sim = similarity.eval()
for i in range(valid_size):
valid_word = reverse_dictionary[valid_examples[i]]
top_k = 8
nearest = (-sim[i,:]).argsort()[1:top_k+1]
log_str = "Nearest to %s :" %valid_word
for k in range(top_k):
close_word = reverse_dictionary[nearest[k]]
log_str = "%s %s,"%(log_str, close_word)
print(log_str)
final_embedding = normalized_embeddings.eval()
tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
plot_only = 100
low_dim_embs = tsne.fit_transform(final_embedding[:plot_only, :])
labels = [reverse_dictionary[i] for i in range(plot_only)]
plot_with_labels(low_dim_embs, labels)
效果图:
上一篇: 湖南工程学院应用技术学院综合实力展示:来看它是否值得一读!
下一篇: 长得丑就是好认