제가 공부하면서 이곳 저곳에서 참고한 내용을 정리한 것입니다.
잘못된 내용은 언제든지 댓글로 달아주시면 확인 후 수정하겠습니다.
반박시 여러분들의 말씀이 다 맞습니다 ^-^
0. 도입
이상탐지는 다양한 분야에서 사용 됩니다.
특히 제조 분야로 전문성을 좁혀나가기 시작하면서 다양한 분석기법을 제조 현장에 어떻게 적용할 수 있을지 항상 고민하고 있습니다.
성능이 우수하면서도 표준화가 가능한 모델을 탐색할 때 유용한 곳이 PaterWithCode 인데,, 그 곳에서 알게 된 이미지 이상탐지 기법에 대해 정리하려 합니다.
MVTec AD 데이터셋에서 SOTA 였던 PaDiM 입니다.
21년 상반기까지 SOTA였지만, 현재는 PatchCore라는 모델이 SOTA를 달성하면서 2022.07.24 기준 성능 11위인 모델입니다.
논문 원본 링크
https://arxiv.org/abs/2011.08785
1. Abstract
PaDiM은 a Patch Distribution Modeling Framework for Anomaly Detection and Localization 의 약자입니다.
제조 산업의 검사 단계에서 이상탐지를 진행할 때 픽셀 혹은 Patch of pixel 에 대해 anomaly score를 부여해 anomaly map 을 형성합니다.
anomaly map을 기반으로 anomaly localization에 대한 해석을 할 수 있기 때문에 이상여부에 대해 판단할 뿐만 아니라 어느 부분이 이상 상태인지 진단할 수 있습니다.
즉, PaDiM은
one-class learning setting 상황에서
(1) 이미지의 이상 여부를 진단하고
(2) 이상이 확인되는 위치를 이미지 상에서 찾아낸다는
기능적 특징이 있습니다.
PaDiM이 수행되는 과정은 다음과 같습니다.
(1) extraction embedding
(2) normal learning
(3) anomaly map 계산
각각의 과정을 수행하기 위해 사용되는 방식은 다음과 같습니다.
(1) extraction embedding : patch embedding 을 수행하기 위해 pretrained CNN 을 사용합니다.
(2) normal learning : 각각의 patch position 은 다변량 가우시안 분포(multivatiate Gaussian Distribution) 을 사용합니다.
(3) anomaly map 계산 : pretrain CNN 에서 서로 다른 sementic levels 사이의 correlation 을 고려합니다.
위 처럼 일련의 과정을 통해 제조 산업 분야의 검사 단계에서 anomaly detection 과 anomaly localization 을 수행할 수 있습니다.
2. Patch Distribution Modeling
[1] Embedding Extraction
Pretrained CNN 으로 이상탐지와 관련된 특징들을 뽑아내는 단계입니다.
Patch embedding vector 을 생성하려면 인공신경망을 사용해 최적해를 구해서 만드는 방식이 있습니다.
다만, 다루기가 어렵다는 문제가 있어 사전학습모델(Pretrained CNN)을 사용합니다.
ResNet 과 EfficientNet 을 pretrained CNN으로 사용하며, PaDiM에서 patch embedding process는 SPADE 에서의 방식과 유사합니다.
이미지를 학습할 때, 정상 이미지에서 각각의 patch는 pretrained CNN activation map에서 공간적으로 대응되는 activation vector 입니다.
서로 다른 레이어들에서 나온 activation vector들은 결합이 되는데, 이 과정을 통해 서로 다른 semantic level과 resolution들에서 나온 정보를 전달하는 embedding vector들을 얻을 수 있습니다.
다시 말해서, Embedding Extraction 은 이미지를 patch 로 나누고 다시 결합하면서 pretrained CNN 으로 이상탐지와 관련된 특징들을 뽑아내는 과정이라고 이해됩니다.
* 사전학습모델이란!?
사전학습모델(pretrained model)은 딥러닝 기초 용어입니다.
전이학습(transfer learning)과 사전학습(pretrained learning)을 우선 구분해야합니다.
전이학습이란 어떤 목적을 이루기 위해 학습된 모델을 다른 작업에 이용하는 것으로, 모델의 지식을 다른 문제에 이용하는 것입니다.
예를 들어, semantic segmentation 문제를 해결할 때 이미지 분류에 사용되는 ResNet 등을 사용하는 것이 있습니다.
전이학습을 하려면 학습된 모델과 새로 학습할 데이터셋이 필요합니다. 학습된 모델에 새로운 데이터셋을 학습시키는 것을 사전학습(pre-training)이라 하고, 이 때 사용된 모델을 사전학습모델(pretrained model)이라 합니다. 예를 들어 sematic segmentation 문제에서 사용하는 ResNet은 사전학습모델인 것입니다.
[2] Learning of the nomarlity
(i,j)위치에서 정상 이미지의 특성을 학습하기 위해, n개의 정상 학습 이미지로부터 (i,j)위치에서의 patch embedding vector 집합을 계산합니다.
patch embedding vector는 다변량 가우시안 분포 N 에 의해 생성되었다는 가정하에 sample mean 을 계산하고 sample covariance 를 추정합니다.
sample covariance 는 아래 식으로 추정합니다.
[3] Computation of the anomaly map
PaDiM 저자들은 테스트 이미지의 (i,j) 위치에 있는 patch 가 이상상태인지 여부를 판단하고자 anomaly score를 만들었습니다.
anomalys core 계산에 사용되는 방식은 마할라노비스거리(Mahalanobis Distance) 입니다.
마할라노비스 거리는 데이터 샘플의 무게중심을 찾아 특정 데이터 포인트가 무게중심에 가까울수록 분포에 속할 확률이 높고, 무게중심에서 벗어날수록 분포에 속할 확률이 낮다는 점을 이용해 데이터 포인트가 특정 분포에 속할 확률을 추정하는 방식입니다.
PaDiM에서 M 은 테스트 이미지의 patch embedding x 와 학습된 distribution N(sample mean, sample covariance) 사이의 거리로 해석될 수 있습니다.
계산식은 다음과 같습니다.
동일한 좌표값(i, j)에서 학습된 sample mean과 sample covariance 를 통해 테스트 이미지의 (i,j) 위치에 대해 계산하게 되면, 해당 테스트 이미지의 좌표(i,j)가 정상분포와 비교했을 때 얼만큼 차이가 있는지를 추정할 수 있게 됩니다.
anomaly map 에서 값이 높다는 것은 해당 좌표가 anomalous areas 라는 의미가 됩니다.
따라서 전체 이미지의 최종 anomaly score는 anomaly map 의 최대값이 됩니다.
(PaDiM에서는 많은 양의 거리값을 계산하고 정렬할 필요성이 없었기 때문에 KNN 기반의 방법론을 적용하지 않았습니다. 현재 SOTA 인 PatchCore에서는 PaDiM의 장점과 KNN의 방법론을 적용해 성능을 강화했습니다.)
3. 코드 구현
PaDiM의 코드는 아래 깃허브를 참고했습니다.
https://github.com/remmarp/PaDiM-TF
위 깃허브 능력자분이 tensorflow로 PaDiM을 구현해주셨네요.
PaDiM의 주요 토대였던 [embedding extraction -> learning of the nomality -> computation of the anomaly map] 흐름을 잘 기억한다면 코드 내용을 이해하는데 어렵지 않을 것으로 생각됩니다!
저는 PaDiM의 과정 중 [데이터 로드 -> embedding extraction -> learning of the normality] 까지 정리해보겠습니다.
[1] 데이터 로드
깃헙 능력자분의 data_loader.py 파일에 데이터 로드에 관한 코드가 있습니다.
데이터를 불러올 때 기본적으로 필요한 모듈은 os, cv2, numpy, tensorflow 입니다.
거기에 추가로, 깃헙 능력자분께서 MVTecADLoader 라는 클래스를 생성해 자체 모듈을 개발하셨습니다.
이미지가 저장된 base_path를 사용자 환경에 맞게 개발하면 됩니다.
제가 공부한 부분은 아래 코드에서 한글로 주석처리 해두었습니다.
############
# IMPORT #
############
# 1. Built-in modules
import os
# 2. Third-party modules
import cv2
import numpy as np
import tensorflow as tf
# 3. Own modules
###########
# CLASS #
###########
class MVTecADLoader(object):
base_path = r'D:\mvtec_ad' #사용자 환경에 맞게 경로 설정
train, test = None, None
num_train, num_test = 0, 0
category = {'bottle': ['good', 'broken_large', 'broken_small', 'contamination'],
'cable': ['good', 'bent_wire', 'cable_swap', 'combined', 'cut_inner_insulation', 'cut_outer_insulation',
'missing_cable', 'missing_wire', 'poke_insulation'],
'capsule': ['good', 'crack', 'faulty_imprint', 'poke', 'scratch', 'squeeze'],
'carpet': ['good', 'color', 'cut', 'hole', 'metal_contamination', 'thread'],
'grid': ['good', 'bent', 'broken', 'glue', 'metal_contamination', 'thread'],
'hazelnut': ['good', 'crack', 'cut', 'hole', 'print'],
'leather': ['good', 'color', 'cut', 'fold', 'glue', 'poke'],
'metal_nut': ['good', 'bent', 'color', 'flip', 'scratch'],
'pill': ['good', 'color', 'combined', 'contamination', 'crack', 'faulty_imprint', 'pill_type',
'scratch'],
'screw': ['good', 'manipulated_front', 'scratch_head', 'scratch_neck', 'thread_side', 'thread_top'],
'tile': ['good', 'crack', 'glue_strip', 'gray_stroke', 'oil', 'rough'],
'toothbrush': ['good', 'defective'],
'transistor': ['good', 'bent_lead', 'cut_lead', 'damaged_case', 'misplaced'],
'wood': ['good', 'color', 'combined', 'good', 'hole', 'liquid', 'scratch'],
'zipper': ['good', 'broken_teeth', 'combined', 'fabric_border', 'fabric_interior', 'rough',
'split_teeth', 'squeezed_teeth']}
def setup_base_path(self, path):
self.base_path = path
def load(self, category, repeat=4, max_rot=10):
# data, mask, binary anomaly label (0 for anomaly, 1 for good)
x, y, z = [], [], []
# Load train set
path = os.path.join(os.path.join(self.base_path, category), 'train/good')
files = os.listdir(path)
zero_mask = tf.zeros(shape=(224, 224), dtype=tf.int32) #제로마스크 설정
for rdx in range(repeat):
for _files in files:
full_path = os.path.join(path, _files)
img = self._read_image(full_path=full_path)
if not max_rot == 0:
img = tf.keras.preprocessing.image.random_rotation(img, max_rot) #다양한 이미지로 학습해 성능을 높이고자 학습이미지를 회전시킴
mask = zero_mask
x.append(img)
y.append(mask)
z.append(1)
x = np.asarray(x)
y = np.asarray(y)
self.num_train = len(x)
# x,y,z를 tensor 형태로 변환한 뒤 슬라이스
x = tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(x, dtype=tf.float32))
y = tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(y, dtype=tf.int32))
z = tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(z, dtype=tf.int32))
self.train = tf.data.Dataset.zip((x, y, z)) #x,y,z 를 zip 으로 합침
# data, anomaly label (e.g., good, cut, ..., etc.), binary anomaly label (0 for anomaly, 1 for good)
x, y, z = [], [], []
# Load test set
for _label in self.category[category]:
path = os.path.join(os.path.join(self.base_path, category), 'test/{}'.format(_label))
files = os.listdir(path)
for _files in files:
full_path = os.path.join(path, _files)
img = self._read_image(full_path=full_path)
if _label == 'good':
mask = zero_mask
else:
mask_path = os.path.join(os.path.join(self.base_path, category), 'ground_truth/{}'.format(_label))
_mask_path = os.path.join(mask_path, '{}_mask.png'.format(_files.split('.')[0]))
mask = cv2.resize(cv2.imread(_mask_path, flags=cv2.IMREAD_GRAYSCALE), dsize=(256, 256)) / 255
mask = mask[16:-16, 16:-16]
mask = tf.convert_to_tensor(mask, dtype=tf.int32)
x.append(img)
y.append(mask)
z.append(int(self.category[category].index(_label) == 0))
x = np.asarray(x)
y = np.asarray(y)
self.num_test = len(x)
# x,y,z를 tensor 형태로 변환한 뒤 슬라이스
x = tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(x, dtype=tf.float32))
y = tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(y, dtype=tf.int32))
z = tf.data.Dataset.from_tensor_slices(tf.convert_to_tensor(z, dtype=tf.int32))
self.test = tf.data.Dataset.zip((x, y, z)) #x,y,z를 zip 으로 합침
@staticmethod
def _read_image(full_path, flags=cv2.IMREAD_COLOR):
img = cv2.imread(full_path, flags=flags)
b, g, r = cv2.split(img)
img = cv2.merge([r, g, b])
img = cv2.resize(img, dsize=(256, 256))
img = img[16:-16, 16:-16, :]
return img
* 텐서플로우 tensorflow.data.Dataset.from_tensor_slices 사용 방법
* 텐서플로우 tensorflow.data.Dataset.zip 사용 방법
[2] embedding extraction
utils.py 파일에 embedding extraction 코드가 있습니다.
깃헙 능력자께서 utils.py 파일에 성능 평가를 위한 플롯생성 코드도 함께 만들어두셨습니다.
저는 임베딩에 필요한 부분만 따로 뽑아 스터디 하겠습니다.
############
# IMPORT #
############
# 1. Built-in modules
import os
# 2. Third-party modules
import matplotlib
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from skimage import morphology
from skimage.segmentation import mark_boundaries
# 3. Own modules
################
# Definition #
################
def embedding_concat(l1, l2): #레이어 2개를 인자로 받아 임베딩&컨캣을 실행함
bs, h1, w1, c1 = l1.shape
_, h2, w2, c2 = l2.shape
s = int(h1 / h2)
x = tf.compat.v1.extract_image_patches(l1, ksizes=[1, s, s, 1], strides=[1, s, s, 1], rates=[1, 1, 1, 1],
padding='VALID')
x = tf.reshape(x, (bs, -1, h2, w2, c1))
col_z = []
for idx in range(x.shape[1]):
col_z.append(tf.concat([x[:, idx, :, :, :], l2], axis=-1))
z = tf.stack(col_z, axis=1)
z = tf.reshape(z, (bs, h2, w2, -1))
if s == 1:
return z
z = tf.nn.depth_to_space(z, block_size=s)
return z
[3] learning of the normality
padim.py 파일은 [1]과 [2]단계의 모듈을 호출한 뒤 PaDiM 알고리즘을 수행하는 코드가 있습니다.
저는 수치적 성능평가가 필요했기 때문에 플롯을 생성하는 코드는 생략했습니다.
플롯 생성하는 부분이 필요하신 분은 깃헙 능력자분의 원본 코드를 참고해주시기 바랍니다.
############
# IMPORT #
############
# 1. Built-in modules
import os
# 2. Third-party modules
import numpy as np
import tensorflow as tf
import sklearn.metrics as metrics
from scipy.ndimage import gaussian_filter
from scipy.spatial.distance import mahalanobis
# 3. Own modules
from data_loader import MVTecADLoader #깃헙 능력자분께서 만든 data_loader
from utils import embedding_concat #깃헙 능력자분께서 만든 utils
################
# Definition #
################
def embedding_net(net_type='res'): #임베딩 네트워크는 res(ResNet)와 eff(EfficientNet) 중에 선택
input_tensor = tf.keras.layers.Input([224, 224, 3], dtype=tf.float32)
# ResNet 으로 pretrained CNN 생성
if net_type == 'res':
# resnet 50v2
x = tf.keras.applications.resnet_v2.preprocess_input(input_tensor)
model = tf.keras.applications.ResNet50V2(include_top=False, weights='imagenet', input_tensor=x, pooling=None)
layer1 = model.get_layer(name='conv3_block1_preact_relu').output
layer2 = model.get_layer(name='conv4_block1_preact_relu').output
layer3 = model.get_layer(name='conv5_block1_preact_relu').output
# EfficientNet으로 pretrained CNN 생성
elif net_type == 'eff':
# efficient net B7
x = tf.keras.applications.efficientnet.preprocess_input(input_tensor)
model = tf.keras.applications.EfficientNetB7(include_top=False, weights='imagenet', input_tensor=x,
pooling=None)
layer1 = model.get_layer(name='block5a_activation').output
layer2 = model.get_layer(name='block6a_activation').output
layer3 = model.get_layer(name='block7a_activation').output
else:
raise Exception("[NotAllowedNetType] network type is not allowed ")
model.trainable = False
# model.summary(line_length=100)
# layer1.shape[1]은 높이(height)
# layer1.shape[2]는 너비(width)
# layer1.shape[3]+layer2.shape[3]+layer3.shape[3] 은 각각 레이어에서 채널 개수를 합한 것
shape = (layer1.shape[1], layer1.shape[2], layer1.shape[3] + layer2.shape[3] + layer3.shape[3])
# 반환값 (1) 임베딩 네트워크 모델
# 반환값 (2) 레이어 shape
return tf.keras.Model(model.input, outputs=[layer1, layer2, layer3]), shape
def padim(category, batch_size, rd, net_type='eff', is_plot=False):
loader = MVTecADLoader() #데이터 불러오는 객체 생성
loader.load(category=category, repeat=1, max_rot=0) #데이터 불러오기 실행
train_set = loader.train.batch(batch_size=batch_size, drop_remainder=True).shuffle(buffer_size=loader.num_train,
reshuffle_each_iteration=True)
test_set = loader.test.batch(batch_size=1, drop_remainder=False)
net, _shape = embedding_net(net_type=net_type) #임베딩 네트워크 & 레이어 shape 생성
h, w, c = _shape # height and width of layer1, channel sum of layer 1, 2, and 3, and randomly sampled dimension
out = []
for x, _, _ in train_set:
l1, l2, l3 = net(x)
_out = tf.reshape(embedding_concat(embedding_concat(l1, l2), l3), (batch_size, h * w, c)) # (b, h x w, c)
out.append(_out.numpy())
# calculate multivariate Gaussian distribution.
out = np.concatenate(out, axis=0)
out = np.transpose(out, axes=[0, 2, 1]) # (b, c, h * w)
# RD: random dimension selecting
tmp = tf.unstack(out, axis=0)
_tmp = []
rd_indices = tf.random.shuffle(tf.range(c))[:rd]
for tensor in tmp:
_tmp.append(tf.gather(tensor, rd_indices))
out = tf.stack(_tmp, axis=0)
mu = np.mean(out, axis=0)
cov = np.zeros((rd, rd, h * w))
identity = np.identity(rd) #단위행렬 생성
# learning of the normality 단계에서 sample covariance 를 계산하는 식을
# 반복적으로 적용
for idx in range(h * w):
cov[:, :, idx] = np.cov(out[:, :, idx], rowvar=False) + 0.01 * identity
# 학습데이터에 대한 sample mean(mu) 와 sample covariance(cov) 구함
train_outputs = [mu, cov]
out, gt_list, gt_mask, batch_size, test_imgs = [], [], [], 1, []
# x - data | y - mask | z - binary label
for x, y, z in test_set:
test_imgs.append(x.numpy())
gt_list.append(z.numpy())
gt_mask.append(y.numpy())
l1, l2, l3 = net(x)
_out = tf.reshape(embedding_concat(embedding_concat(l1, l2), l3), (batch_size, h * w, c)) # (BS, h x w, c)
out.append(_out.numpy())
# calculate multivariate Gaussian distribution. skip random dimension selecting
out = np.concatenate(out, axis=0)
gt_list = np.concatenate(gt_list, axis=0)
out = np.transpose(out, axes=[0, 2, 1])
# RD
tmp = tf.unstack(out, axis=0)
_tmp = []
for tensor in tmp:
_tmp.append(tf.gather(tensor, rd_indices))
out = tf.stack(_tmp, axis=0)
b, _, _ = out.shape
dist_list = []
for idx in range(h * w):
mu = train_outputs[0][:, idx]
cov_inv = np.linalg.inv(train_outputs[1][:, :, idx])
dist = [mahalanobis(sample[:, idx], mu, cov_inv) for sample in out]
dist_list.append(dist)
dist_list = np.reshape(np.transpose(np.asarray(dist_list), axes=[1, 0]), (b, h, w))
################
# DATA Level #
################
# upsample
score_map = tf.squeeze(tf.image.resize(np.expand_dims(dist_list, -1), size=[h, w])).numpy()
for i in range(score_map.shape[0]):
score_map[i] = gaussian_filter(score_map[i], sigma=4)
# Normalization
max_score = score_map.max()
min_score = score_map.min()
scores = (score_map - min_score) / (max_score - min_score)
scores = -scores
# calculate image-level ROC AUC score
img_scores = scores.reshape(scores.shape[0], -1).max(axis=1)
gt_list = np.asarray(gt_list)
img_roc_auc = metrics.roc_auc_score(gt_list, img_scores)
# 플롯 생성 부분은 생략했습니다 (플롯생성 부분은 원본 코드 참고해주세요)
# 저는 수치적 성능평가가 필요했기 때문에, scores와 img_roc_auc를 반환하도록 수정했습니다.
return scores, img_roc_auc
#################
# PATCH Level #
#################
# upsample
score_map = tf.squeeze(tf.image.resize(np.expand_dims(dist_list, -1), size=[224, 224])).numpy()
for i in range(score_map.shape[0]):
score_map[i] = gaussian_filter(score_map[i], sigma=4)
# Normalization
max_score = score_map.max()
min_score = score_map.min()
scores = (score_map - min_score) / (max_score - min_score)
# Note that Binary mask indicates 0 for good and 1 for anomaly. It is opposite from our setting.
# scores = -scores
# calculate per-pixel level ROCAUC
gt_mask = np.asarray(gt_mask)
fp_list, tp_list, _ = metrics.roc_curve(gt_mask.flatten(), scores.flatten())
patch_auc = metrics.auc(fp_list, tp_list)
precision, recall, threshold = metrics.precision_recall_curve(gt_mask.flatten(), scores.flatten(), pos_label=1)
numerator = 2 * precision * recall
denominator = precision + recall
numerator[np.where(denominator == 0)] = 0
denominator[np.where(denominator == 0)] = 1
# get optimal threshold
f1_list = numerator / denominator
best_ths = threshold[np.argmax(f1_list).astype(int)]
print('[{}] image ROCAUC: {:.04f}\t pixel ROCAUC: {:.04f}'.format(category, img_roc_auc, patch_auc))
# 플롯 생성 부분은 생략했습니다 (플롯생성 부분은 원본 코드 참고해주세요)
# 저는 수치적 성능평가가 필요했기 때문에, img_roc_auc와 patch_auc만 반환하도록 수정했습니다.
return img_roc_auc, patch_auc