欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

tta

程序员文章站 2022-04-27 16:54:14
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58eVE6F5OedOYE3nf8JFQX2WXgHe43gNtNd63nWViTRBBX+fgZ1w3Fdj+RFYccCnLz460vCDD7ZVqyRPkDl5EjWt0ZE4jtOBOLIaHguZrB+DR+zP++WkuAJWgAMv65+sYG8vKHaa//pyQFfbn9ZBHbKgwN4ZlLljow64RO/i2uZ8sNmxXUWE7nXXkS9vBf7jXsIz8pALOe5yVMjyDmhiR77ud...
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58eVE6F5OedOYE3nf8JFQX2WXgHe43gNtNd63nWViTRBBX+fgZ1w3Fdj+RFYccCnLz460vCDD7ZVqyRPkDl5EjWt0ZE4jtOBOLIaHguZrB+DR+zP++WkuAJWgAMv65+sYG8vKHaa//pyQFfbn9ZBHbKgwN4ZlLljow64RO/i2uZ8sNmxXUWE7nXXkS9vBf7jXsIz8pALOe5yVMjyDmhiR77udD2Lo+A88bXjvRaf5lWQJvoP7NSIdF9ZZxRlliRm0Tm0RvSI4lvDFf4Fa/ZsBi1VS9+58dogHvdx+1JIebyYwpzLs/V2jR/UKcsJ/O5lGyrDgCngccyyan+Faxxu9 lg@lg-MRC-WX0

from config import path_dir
import os
import time
import json
from google.cloud import bigtable
# from google.cloud import happybase
import pandas as pd

import six
from google.cloud.bigtable.row_filters import ColumnQualifierRegexFilter
from google.cloud.bigtable.row_filters import FamilyNameRegexFilter
from google.cloud.bigtable.row_filters import RowFilterChain
from google.cloud.bigtable.row_filters import RowFilterUnion
from google.cloud.bigtable.row_set import RowSet

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_dir() + "/train_data/p.json"
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/jlh/Desktop/bigdata-group.json"

client = bigtable.Client(project="heidao-market", admin=True)
instance = client.instance("yotta-bigtable")


# connection = happybase.Connection(instance=instance)
# connection = bigtable.table.Table(instance
#                                   , app_profile_id='copy-in-tw'
#                                   )


def get_data_by_list(rows):
    if len(rows) <= 0:
        return pd.DataFrame()
    data = pd.DataFrame(rows)
    for col in data.columns:
        #        print(data.columns)
        #        print(data)
        #        new_name = col.decode().split(':')[1]

        #        print(col)
        if col == b'game:value_1' or col == b'game:position':
            data = data[~data[col].isnull()]
            data[col] = data[col].apply(lambda x: x.decode())
        else:
            data[col] = data[col].apply(
                lambda x: int.from_bytes(x, byteorder='big', signed=True)
            )
    data = data.rename(columns=lambda x: x.decode().split(':')[1])
    data['timestamp'] = pd.to_datetime(data['timestamp'], unit='us')
    return data


def get_bigtable_data(player_id, time_cut, path, columns, hour=14):
    table = bigtable.table.Table(path, instance, app_profile_id='copy-in-tw')
    # connection.table(path)
    re_player_id = str(player_id)[::-1]
    #    end_time = round(time.time()//3600)*3600*1000 + 1
    #    end_time = int(time.time())*1000

    end_time = time_cut
    start_time = end_time - int(hour * 3600 * 1000)

    start = '{}#{}'.format(re_player_id, start_time).encode()
    stop = '{}#{}'.format(re_player_id, end_time).encode()

    rows = []
    for key, row in scan(table, {'start_row': start,
                                 'end_row': stop,
                                 'columns': columns}).items():
        rows.append(row)
    data = get_data_by_list(rows)
    return data


def get_exposure_data(player_id, time_cut, project='mafia1_ods', hour=14):
    data = get_bigtable_data(player_id, time_cut, path="{}.sdk_gift_bag_tracking".format(project),
                             columns=[b'game:player_id', b'game:timestamp', b'game:value_1', b'game:event_id',
                                      b'game:action',
                                      b'game:type', b'game:position', b'game:event_id'],
                             hour=hour)

    return data


def scan(table, params):
    if not table:
        return

    if 'row_prefix' in params and (('start_row' in params and
                                    params['start_row']) or ('end_row' in params and params['end_row'])):
        return

    filters = params['columns']
    start_row = None
    end_row = None
    filter_chain = None
    row_set = None

    if 'start_row' in params:
        start_row = params['start_row']

    if 'end_row' in params:
        end_row = params['end_row']

    if start_row and end_row:
        row_set = RowSet()
        row_set.add_row_range_from_keys(start_key=start_row, end_key=end_row)
    scan_list = {}
    # rows = table.read_rows(row_set=row_set, filter_=filter_chain)
    rows = table.read_rows(row_set=row_set)
    for rowdata in rows:
        curr_row_data = rowdata
        curr_row_dict = _row_to_dict(curr_row_data)
        scan_list[curr_row_data.row_key] = curr_row_dict

    info = {}
    for key, row in scan_list.items():

        row_dict = {}
        for k, v in row.items():
            if k in filters:
                row_dict[k] = v

        info[key] = row_dict

    return info


def _get_columns_filter(columns):
    columns_filters = []
    for column in columns:
        col_family_id, col_qualifier = str(column).split(':')
        col_family_id = bytes(col_family_id, encoding='utf-8')
        col_qualifier = bytes(col_qualifier, encoding='utf-8')
        fam_filter = FamilyNameRegexFilter(col_family_id)
        if col_qualifier is not None:
            qual_filter = ColumnQualifierRegexFilter(col_qualifier)
            combined_filter = RowFilterChain(filters=[fam_filter, qual_filter])
            columns_filters.append(combined_filter)
        else:
            columns_filters.append(fam_filter)

    col_filters_num = len(columns_filters)
    if col_filters_num == 1:
        return columns_filters[0]
    elif col_filters_num > 1:
        return RowFilterUnion(filters=columns_filters)


def _cells_to_pairs(cells, include_timestamp=False):
    result = []
    for cell in cells:
        if include_timestamp:
            ts_millis = _microseconds_from_datetime(cell.timestamp) // 1000
            result.append((cell.value, ts_millis))
        else:
            result.append(cell.value)
    return result


def _row_to_dict(partial_row_data, include_timestamp=False):
    result = {}
    for column, cells in six.iteritems(partial_row_data.to_dict()):
        cell_vals = _cells_to_pairs(cells, include_timestamp=include_timestamp)
        result[column] = cell_vals[0]
    return result


print(get_exposure_data(1101278060, 1594019041000))

本文地址:https://blog.csdn.net/luoganttcc/article/details/107321813

推荐阅读