tta
程序员文章站
2022-07-11 13:52:17
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58eVE6F5OedOYE3nf8JFQX2WXgHe43gNtNd63nWViTRBBX+fgZ1w3Fdj+RFYccCnLz460vCDD7ZVqyRPkDl5EjWt0ZE4jtOBOLIaHguZrB+DR+zP++WkuAJWgAMv65+sYG8vKHaa//pyQFfbn9ZBHbKgwN4ZlLljow64RO/i2uZ8sNmxXUWE7nXXkS9vBf7jXsIz8pALOe5yVMjyDmhiR77ud...
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58eVE6F5OedOYE3nf8JFQX2WXgHe43gNtNd63nWViTRBBX+fgZ1w3Fdj+RFYccCnLz460vCDD7ZVqyRPkDl5EjWt0ZE4jtOBOLIaHguZrB+DR+zP++WkuAJWgAMv65+sYG8vKHaa//pyQFfbn9ZBHbKgwN4ZlLljow64RO/i2uZ8sNmxXUWE7nXXkS9vBf7jXsIz8pALOe5yVMjyDmhiR77udD2Lo+A88bXjvRaf5lWQJvoP7NSIdF9ZZxRlliRm0Tm0RvSI4lvDFf4Fa/ZsBi1VS9+58dogHvdx+1JIebyYwpzLs/V2jR/UKcsJ/O5lGyrDgCngccyyan+Faxxu9 lg@lg-MRC-WX0
from config import path_dir
import os
import time
import json
from google.cloud import bigtable
# from google.cloud import happybase
import pandas as pd
import six
from google.cloud.bigtable.row_filters import ColumnQualifierRegexFilter
from google.cloud.bigtable.row_filters import FamilyNameRegexFilter
from google.cloud.bigtable.row_filters import RowFilterChain
from google.cloud.bigtable.row_filters import RowFilterUnion
from google.cloud.bigtable.row_set import RowSet
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_dir() + "/train_data/p.json"
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/jlh/Desktop/bigdata-group.json"
client = bigtable.Client(project="heidao-market", admin=True)
instance = client.instance("yotta-bigtable")
# connection = happybase.Connection(instance=instance)
# connection = bigtable.table.Table(instance
# , app_profile_id='copy-in-tw'
# )
def get_data_by_list(rows):
if len(rows) <= 0:
return pd.DataFrame()
data = pd.DataFrame(rows)
for col in data.columns:
# print(data.columns)
# print(data)
# new_name = col.decode().split(':')[1]
# print(col)
if col == b'game:value_1' or col == b'game:position':
data = data[~data[col].isnull()]
data[col] = data[col].apply(lambda x: x.decode())
else:
data[col] = data[col].apply(
lambda x: int.from_bytes(x, byteorder='big', signed=True)
)
data = data.rename(columns=lambda x: x.decode().split(':')[1])
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='us')
return data
def get_bigtable_data(player_id, time_cut, path, columns, hour=14):
table = bigtable.table.Table(path, instance, app_profile_id='copy-in-tw')
# connection.table(path)
re_player_id = str(player_id)[::-1]
# end_time = round(time.time()//3600)*3600*1000 + 1
# end_time = int(time.time())*1000
end_time = time_cut
start_time = end_time - int(hour * 3600 * 1000)
start = '{}#{}'.format(re_player_id, start_time).encode()
stop = '{}#{}'.format(re_player_id, end_time).encode()
rows = []
for key, row in scan(table, {'start_row': start,
'end_row': stop,
'columns': columns}).items():
rows.append(row)
data = get_data_by_list(rows)
return data
def get_exposure_data(player_id, time_cut, project='mafia1_ods', hour=14):
data = get_bigtable_data(player_id, time_cut, path="{}.sdk_gift_bag_tracking".format(project),
columns=[b'game:player_id', b'game:timestamp', b'game:value_1', b'game:event_id',
b'game:action',
b'game:type', b'game:position', b'game:event_id'],
hour=hour)
return data
def scan(table, params):
if not table:
return
if 'row_prefix' in params and (('start_row' in params and
params['start_row']) or ('end_row' in params and params['end_row'])):
return
filters = params['columns']
start_row = None
end_row = None
filter_chain = None
row_set = None
if 'start_row' in params:
start_row = params['start_row']
if 'end_row' in params:
end_row = params['end_row']
if start_row and end_row:
row_set = RowSet()
row_set.add_row_range_from_keys(start_key=start_row, end_key=end_row)
scan_list = {}
# rows = table.read_rows(row_set=row_set, filter_=filter_chain)
rows = table.read_rows(row_set=row_set)
for rowdata in rows:
curr_row_data = rowdata
curr_row_dict = _row_to_dict(curr_row_data)
scan_list[curr_row_data.row_key] = curr_row_dict
info = {}
for key, row in scan_list.items():
row_dict = {}
for k, v in row.items():
if k in filters:
row_dict[k] = v
info[key] = row_dict
return info
def _get_columns_filter(columns):
columns_filters = []
for column in columns:
col_family_id, col_qualifier = str(column).split(':')
col_family_id = bytes(col_family_id, encoding='utf-8')
col_qualifier = bytes(col_qualifier, encoding='utf-8')
fam_filter = FamilyNameRegexFilter(col_family_id)
if col_qualifier is not None:
qual_filter = ColumnQualifierRegexFilter(col_qualifier)
combined_filter = RowFilterChain(filters=[fam_filter, qual_filter])
columns_filters.append(combined_filter)
else:
columns_filters.append(fam_filter)
col_filters_num = len(columns_filters)
if col_filters_num == 1:
return columns_filters[0]
elif col_filters_num > 1:
return RowFilterUnion(filters=columns_filters)
def _cells_to_pairs(cells, include_timestamp=False):
result = []
for cell in cells:
if include_timestamp:
ts_millis = _microseconds_from_datetime(cell.timestamp) // 1000
result.append((cell.value, ts_millis))
else:
result.append(cell.value)
return result
def _row_to_dict(partial_row_data, include_timestamp=False):
result = {}
for column, cells in six.iteritems(partial_row_data.to_dict()):
cell_vals = _cells_to_pairs(cells, include_timestamp=include_timestamp)
result[column] = cell_vals[0]
return result
print(get_exposure_data(1101278060, 1594019041000))
本文地址:https://blog.csdn.net/luoganttcc/article/details/107321813