python对dcm的数据进行初步预处理
程序员文章站
2022-05-18 17:25:51
...
数据预处理
1.get_first_of_dicom_field_as_int用于检查x的类型是否等于pydicom.multival.MultiValue,然后返回x的第一个索引,否则返回x的整数部分。
def get_first_of_dicom_field_as_int(x):
if type(x) == pydicom.multival.MultiValue:
# print(x)
return int(x[0])
return int(x)
2.精确定位图象,并重新缩放为标准形式```python
def window_image(img, window_center, window_width, intercept, slope): #定位图象位置
img = img * slope + intercept #医学图像转世界坐标
img_min = window_center - window_width // 2
img_max = window_center + window_width // 2
img[img < img_min] = img_min
img[img > img_max] = img_max
# 重新缩放
def normalize_minmax(img):
mi, ma = img.min(), img.max()
return (img - mi) / (ma - mi)
3.获取窗口中心和宽度
def get_id(img_dicom):
return str(img_dicom.SOPInstanceUID)
def get_metadata_from_dicom(img_dicom):
metadata = {
"window_center": img_dicom.WindowCenter,
"window_width": img_dicom.WindowWidth,
"intercept": img_dicom.RescaleIntercept,
"slope": img_dicom.RescaleSlope,
}
return {k: get_first_of_dicom_field_as_int(v) for k, v in metadata.items()}
4.将dcm的图像转化为正常的图像
def prepare_image(img_path):
img_dicom = pydicom.read_file(img_path)
img_id = get_id(img_dicom)
metadata = get_metadata_from_dicom(img_dicom)
img = window_image(img_dicom.pixel_array, **metadata)
img = normalize_minmax(img) * 255
img = PIL.Image.fromarray(img.astype(np.int8), mode="L")
return img_id, img
5.以下是全部的代码
import joblib
import PIL
from glob import glob
import pydicom
import numpy as np
import pandas as pd
import os
import cv2
import json
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from PIL import Image
import math
import seaborn as sns
from collections import defaultdict
from pathlib import Path
import cv2
from tqdm import tqdm
import re
import logging as l
from glob import glob
import argparse
def get_first_of_dicom_field_as_int(x):
if type(x) == pydicom.multival.MultiValue:
# print(x)
return int(x[0])
return int(x)
def get_id(img_dicom):
return str(img_dicom.SOPInstanceUID)
def get_metadata_from_dicom(img_dicom):
metadata = {
"window_center": img_dicom.WindowCenter,
"window_width": img_dicom.WindowWidth,
"intercept": img_dicom.RescaleIntercept,
"slope": img_dicom.RescaleSlope,
}
return {k: get_first_of_dicom_field_as_int(v) for k, v in metadata.items()}
def window_image(img, window_center, window_width, intercept, slope): #定位图象位置
img = img * slope + intercept #医学图像转世界坐标
img_min = window_center - window_width // 2
img_max = window_center + window_width // 2
img[img < img_min] = img_min
img[img > img_max] = img_max
return img
def resize(img, new_w, new_h):
img = PIL.Image.fromarray(img.astype(np.int8), mode="L")
return img.resize((new_w, new_h), resample=PIL.Image.BICUBIC)
def save_img(img_pil, subfolder, name):
img_pil.save(subfolder+name+'.png')
def normalize_minmax(img):
mi, ma = img.min(), img.max()
return (img - mi) / (ma - mi)
def prepare_image(img_path):
img_dicom = pydicom.read_file(img_path)
img_id = get_id(img_dicom)
metadata = get_metadata_from_dicom(img_dicom)
img = window_image(img_dicom.pixel_array, **metadata)
img = normalize_minmax(img) * 255
img = PIL.Image.fromarray(img.astype(np.int8), mode="L")
return img_id, img
def prepare_and_save(img_path, subfolder):
try:
img_id, img_pil = prepare_image(img_path)
save_img(img_pil, subfolder, img_id)
except KeyboardInterrupt:
# Rais interrupt exception so we can stop the cell execution
# without shutting down the kernel.
raise
except:
l.error('Error processing the image: {'+img_path+'}')
def prepare_images(imgs_path, subfolder):
for i in tqdm.tqdm(imgs_path):
prepare_and_save(i, subfolder)
def prepare_images_njobs(img_paths, subfolder, n_jobs=-1):
joblib.Parallel(n_jobs=n_jobs)(joblib.delayed(prepare_and_save)(i, subfolder) for i in tqdm(img_paths))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-dcm_path", "--dcm_path", type=str)
parser.add_argument("-png_path", "--png_path", type=str)
args = parser.parse_args()
dcm_path = args.dcm_path
png_path = args.png_path
if not os.path.exists(png_path):
os.makedirs(png_path)
prepare_images_njobs(glob(dcm_path+'/*'), png_path+'/')
import os
import pickle
import random
import glob
import datetime
import pandas as pd
import numpy as np
import torch
import cv2
import pydicom
from tqdm import tqdm
from joblib import delayed, Parallel
import zipfile
from pydicom.filebase import DicomBytesIO
import sys
sys.path.insert(0, 'scripts')
from logs import get_logger, dumpobj, loadobj
# Print info about environments
logger = get_logger('Prepare Data', 'INFO') # noqa
logger.info('Cuda set up : time {}'.format(datetime.datetime.now().time()))
def get_dicom_value(x, cast=int):
if type(x) in [pydicom.multival.MultiValue, tuple]:
return cast(x[0])
else:
return cast(x)
def cast(value):
if type(value) is pydicom.valuerep.MultiValue:
return tuple(value)
return value
def get_dicom_raw(dicom):
return {attr:cast(getattr(dicom,attr)) for attr in dir(dicom) if attr[0].isupper() and attr not in ['PixelData']}
def rescale_image(image, slope, intercept):
return image * slope + intercept
def apply_window(image, center, width):
image = image.copy()
min_value = center - width // 2
max_value = center + width // 2
image[image < min_value] = min_value
image[image > max_value] = max_value
return image
def get_dicom_meta(dicom):
return {
'PatientID': dicom.PatientID, # can be grouped (20-548)
'StudyInstanceUID': dicom.StudyInstanceUID, # can be grouped (20-60)
'SeriesInstanceUID': dicom.SeriesInstanceUID, # can be grouped (20-60)
'WindowWidth': get_dicom_value(dicom.WindowWidth),
'WindowCenter': get_dicom_value(dicom.WindowCenter),
'RescaleIntercept': float(dicom.RescaleIntercept),
'RescaleSlope': float(dicom.RescaleSlope), # all same (1.0)
}
def apply_window_policy(image):
image1 = apply_window(image, 40, 80) # brain
image2 = apply_window(image, 80, 200) # subdural
image3 = apply_window(image, 40, 380) # bone
image1 = (image1 - 0) / 80
image2 = (image2 - (-20)) / 200
image3 = (image3 - (-150)) / 380
image = np.array([
image1 - image1.mean(),
image2 - image2.mean(),
image3 - image3.mean(),
]).transpose(1,2,0)
return image
def convert_dicom_to_jpg(name):
try:
data =f.read(name)
dirtype = 'train' if 'train' in name else 'test'
imgnm = (name.split('/')[-1]).replace('.dcm', '')
dicom = pydicom.dcmread(DicomBytesIO(data))
image = dicom.pixel_array
image = rescale_image(image, rescaledict['RescaleSlope'][imgnm], rescaledict['RescaleIntercept'][imgnm])
image = apply_window_policy(image)
image -= image.min((0,1))
image = (255*image).astype(np.uint8)
cv2.imwrite(os.path.join(PATHPROC, imgnm)+'.jpg', image)
except:
logger.info(name)
#定义生成.csv的函数
def generate_df(base, files):
train_di = {}
for filename in tqdm(files):
path = os.path.join( base , filename)# 拼接路径
dcm = pydicom.dcmread(path) # 读取dcm文件
all_keywords = dcm.dir()
# print(all_keywords)
ignored = ['Rows', 'Columns', 'PixelData']
for name in all_keywords:
if name in ignored:
continue
if name not in train_di:
train_di[name] = []
train_di[name].append(dcm[name].value)
print(train_di[name])
df = pd.DataFrame(train_di)#构成表格
return df
DATAPATH = "../data/"
TRAIN_DIR = os.path.join(DATAPATH, 'raw/stage_2_train_images')
TEST_DIR = os.path.join(DATAPATH, 'raw/stage_2_test_images')
PATHPROC = os.path.join(DATAPATH, 'proc')
logger.info('Create test meta files') #日志和 print一样
test_files = os.listdir(TEST_DIR)
print(test_files)
test_df = generate_df(TEST_DIR, test_files)
test_df.to_csv(os.path.join(DATAPATH, 'test_metadata.csv'))
logger.info('Create train meta files')
train_files = os.listdir( TRAIN_DIR)
train_df = generate_df(TRAIN_DIR, train_files)
train_df.to_csv(os.path.join(DATAPATH, 'train_metadata.csv'))
logger.info('Load meta files')
trnmdf = pd.read_csv(os.path.join(DATAPATH, 'train_metadata.csv'))
logger.info('Train meta shape {} {}'.format(*trnmdf.shape))
tstmdf = pd.read_csv(os.path.join(DATAPATH, 'test_metadata.csv'))
logger.info('Test meta shape {} {}'.format(*tstmdf.shape))
mdf = pd.concat([trnmdf, tstmdf], 0)
rescaledict = mdf.set_index('SOPInstanceUID')[['RescaleSlope', 'RescaleIntercept']].to_dict()
# logger.info('Create windowed images')
# with zipfile.ZipFile(os.path.join(DATAPATH, "raw/rsna-intracranial-hemorrhage-detection.zip"), "r") as f:
# for t, name in enumerate(tqdm(f.namelist())):
# convert_dicom_to_jpg(name)