欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python对dcm的数据进行初步预处理

程序员文章站 2022-05-18 17:25:51
...

数据预处理

1.get_first_of_dicom_field_as_int用于检查x的类型是否等于pydicom.multival.MultiValue,然后返回x的第一个索引,否则返回x的整数部分。

def get_first_of_dicom_field_as_int(x):
    if type(x) == pydicom.multival.MultiValue:
        # print(x)
        return int(x[0])
    return int(x)

2.精确定位图象,并重新缩放为标准形式```python

def window_image(img, window_center, window_width, intercept, slope): #定位图象位置
    img = img * slope + intercept  #医学图像转世界坐标
    img_min = window_center - window_width // 2
    img_max = window_center + window_width // 2
    img[img < img_min] = img_min
    img[img > img_max] = img_max
 # 重新缩放
def normalize_minmax(img):
    mi, ma = img.min(), img.max()
    return (img - mi) / (ma - mi)

3.获取窗口中心和宽度

def get_id(img_dicom):
    return str(img_dicom.SOPInstanceUID)

def get_metadata_from_dicom(img_dicom):
    metadata = {
        "window_center": img_dicom.WindowCenter,
        "window_width": img_dicom.WindowWidth,
        "intercept": img_dicom.RescaleIntercept,
        "slope": img_dicom.RescaleSlope,
    }
    return {k: get_first_of_dicom_field_as_int(v) for k, v in metadata.items()}

4.将dcm的图像转化为正常的图像

def prepare_image(img_path):
    img_dicom = pydicom.read_file(img_path)
    img_id = get_id(img_dicom)
    metadata = get_metadata_from_dicom(img_dicom)
    img = window_image(img_dicom.pixel_array, **metadata)
    img = normalize_minmax(img) * 255
    img = PIL.Image.fromarray(img.astype(np.int8), mode="L")
    return img_id, img

5.以下是全部的代码

import joblib
import PIL
from glob import glob
import pydicom
import numpy as np
import pandas as pd
import os
import cv2
import json
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from PIL import Image
import math
import seaborn as sns
from collections import defaultdict
from pathlib import Path
import cv2
from tqdm import tqdm
import re
import logging as l
from glob import glob
import argparse

def get_first_of_dicom_field_as_int(x):
    if type(x) == pydicom.multival.MultiValue:
        # print(x)
        return int(x[0])
    return int(x)

def get_id(img_dicom):
    return str(img_dicom.SOPInstanceUID)

def get_metadata_from_dicom(img_dicom):
    metadata = {
        "window_center": img_dicom.WindowCenter,
        "window_width": img_dicom.WindowWidth,
        "intercept": img_dicom.RescaleIntercept,
        "slope": img_dicom.RescaleSlope,
    }
    return {k: get_first_of_dicom_field_as_int(v) for k, v in metadata.items()}

def window_image(img, window_center, window_width, intercept, slope): #定位图象位置
    img = img * slope + intercept  #医学图像转世界坐标
    img_min = window_center - window_width // 2
    img_max = window_center + window_width // 2
    img[img < img_min] = img_min
    img[img > img_max] = img_max
    return img

def resize(img, new_w, new_h):
    img = PIL.Image.fromarray(img.astype(np.int8), mode="L")
    return img.resize((new_w, new_h), resample=PIL.Image.BICUBIC)

def save_img(img_pil, subfolder, name):
    img_pil.save(subfolder+name+'.png')

def normalize_minmax(img):
    mi, ma = img.min(), img.max()
    return (img - mi) / (ma - mi)

def prepare_image(img_path):
    img_dicom = pydicom.read_file(img_path)
    img_id = get_id(img_dicom)
    metadata = get_metadata_from_dicom(img_dicom)
    img = window_image(img_dicom.pixel_array, **metadata)
    img = normalize_minmax(img) * 255
    img = PIL.Image.fromarray(img.astype(np.int8), mode="L")
    return img_id, img

def prepare_and_save(img_path, subfolder):
    try:
        img_id, img_pil = prepare_image(img_path)
        save_img(img_pil, subfolder, img_id)
    except KeyboardInterrupt:
        # Rais interrupt exception so we can stop the cell execution
        # without shutting down the kernel.
        raise
    except:
        l.error('Error processing the image: {'+img_path+'}')

def prepare_images(imgs_path, subfolder):
    for i in tqdm.tqdm(imgs_path):
        prepare_and_save(i, subfolder)

def prepare_images_njobs(img_paths, subfolder, n_jobs=-1):
    joblib.Parallel(n_jobs=n_jobs)(joblib.delayed(prepare_and_save)(i, subfolder) for i in tqdm(img_paths))

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("-dcm_path", "--dcm_path", type=str)
    parser.add_argument("-png_path", "--png_path", type=str)
    args = parser.parse_args()
    dcm_path = args.dcm_path
    png_path = args.png_path

    if not os.path.exists(png_path):
        os.makedirs(png_path)

    prepare_images_njobs(glob(dcm_path+'/*'), png_path+'/')

import os
import pickle
import random
import glob
import datetime
import pandas as pd
import numpy as np
import torch
import cv2
import pydicom
from tqdm import tqdm
from joblib import delayed, Parallel
import zipfile
from pydicom.filebase import DicomBytesIO
import sys
sys.path.insert(0, 'scripts')
from logs import get_logger, dumpobj, loadobj

# Print info about environments
logger = get_logger('Prepare Data', 'INFO') # noqa
logger.info('Cuda set up : time {}'.format(datetime.datetime.now().time()))

def get_dicom_value(x, cast=int):
    if type(x) in [pydicom.multival.MultiValue, tuple]:
        return cast(x[0])
    else:
        return cast(x)


def cast(value):
    if type(value) is pydicom.valuerep.MultiValue:
        return tuple(value)
    return value


def get_dicom_raw(dicom):
    return {attr:cast(getattr(dicom,attr)) for attr in dir(dicom) if attr[0].isupper() and attr not in ['PixelData']}


def rescale_image(image, slope, intercept):
    return image * slope + intercept


def apply_window(image, center, width):
    image = image.copy()
    min_value = center - width // 2
    max_value = center + width // 2
    image[image < min_value] = min_value
    image[image > max_value] = max_value
    return image


def get_dicom_meta(dicom):
    return {
        'PatientID': dicom.PatientID, # can be grouped (20-548)
        'StudyInstanceUID': dicom.StudyInstanceUID, # can be grouped (20-60)
        'SeriesInstanceUID': dicom.SeriesInstanceUID, # can be grouped (20-60)
        'WindowWidth': get_dicom_value(dicom.WindowWidth),
        'WindowCenter': get_dicom_value(dicom.WindowCenter),
        'RescaleIntercept': float(dicom.RescaleIntercept),
        'RescaleSlope': float(dicom.RescaleSlope), # all same (1.0)
    }


def apply_window_policy(image):

    image1 = apply_window(image, 40, 80) # brain
    image2 = apply_window(image, 80, 200) # subdural
    image3 = apply_window(image, 40, 380) # bone
    image1 = (image1 - 0) / 80
    image2 = (image2 - (-20)) / 200
    image3 = (image3 - (-150)) / 380
    image = np.array([
        image1 - image1.mean(),
        image2 - image2.mean(),
        image3 - image3.mean(),
    ]).transpose(1,2,0)

    return image

def convert_dicom_to_jpg(name):
    try:
        data =f.read(name)
        dirtype = 'train' if 'train' in name else 'test'
        imgnm = (name.split('/')[-1]).replace('.dcm', '')
        dicom = pydicom.dcmread(DicomBytesIO(data))
        image = dicom.pixel_array
        image = rescale_image(image, rescaledict['RescaleSlope'][imgnm], rescaledict['RescaleIntercept'][imgnm])
        image = apply_window_policy(image)
        image -= image.min((0,1))
        image = (255*image).astype(np.uint8)
        cv2.imwrite(os.path.join(PATHPROC, imgnm)+'.jpg', image)
    except:
        logger.info(name)
#定义生成.csv的函数       
def generate_df(base, files):
    train_di = {}

    for filename in tqdm(files):
        path = os.path.join( base ,  filename)# 拼接路径
        dcm = pydicom.dcmread(path) # 读取dcm文件
        all_keywords = dcm.dir()
        # print(all_keywords)
        ignored = ['Rows', 'Columns', 'PixelData']

        for name in all_keywords:
            if name in ignored:
                continue

            if name not in train_di:
                train_di[name] = []

            train_di[name].append(dcm[name].value)
            print(train_di[name])

    df = pd.DataFrame(train_di)#构成表格
    
    return df

DATAPATH = "../data/"
TRAIN_DIR = os.path.join(DATAPATH, 'raw/stage_2_train_images')
TEST_DIR = os.path.join(DATAPATH, 'raw/stage_2_test_images')
PATHPROC = os.path.join(DATAPATH, 'proc')

logger.info('Create test meta files') #日志和 print一样
test_files = os.listdir(TEST_DIR)
print(test_files)
test_df = generate_df(TEST_DIR, test_files)
test_df.to_csv(os.path.join(DATAPATH, 'test_metadata.csv'))

logger.info('Create train meta files')
train_files = os.listdir( TRAIN_DIR)
train_df = generate_df(TRAIN_DIR, train_files)
train_df.to_csv(os.path.join(DATAPATH, 'train_metadata.csv'))

logger.info('Load meta files')
trnmdf = pd.read_csv(os.path.join(DATAPATH, 'train_metadata.csv'))
logger.info('Train meta shape {} {}'.format(*trnmdf.shape))

tstmdf = pd.read_csv(os.path.join(DATAPATH, 'test_metadata.csv'))
logger.info('Test  meta shape {} {}'.format(*tstmdf.shape))


mdf = pd.concat([trnmdf, tstmdf], 0)
rescaledict = mdf.set_index('SOPInstanceUID')[['RescaleSlope', 'RescaleIntercept']].to_dict()

# logger.info('Create windowed images')
# with zipfile.ZipFile(os.path.join(DATAPATH, "raw/rsna-intracranial-hemorrhage-detection.zip"), "r") as f:
#     for t, name in enumerate(tqdm(f.namelist())):
#         convert_dicom_to_jpg(name)