바닥부터 시작하는 인공지능

GAN을 이용한 Synthetic medical data를 이용한 class imbalance의 해소가 classification model의 정확도에 미치는 영향 알아보기

HappinessChung 2021. 10. 21. 10:59
반응형

Data증강 첫번째 시도: PGGAN

  1. PGGAN이 Cardiomegaly data에 대해서는 잘 학습 되다가 Edema data부터 제대로 된 이미지를 생성하지 못하는 사실 확인
  2. 코드의 문제인지 규명을 위해 논문의 코드를 보고 이해한 후, main.py를 작성하여 증강을 시도함
  3. memory부족으로 학습이 불가능 한 것을 확인
  4. 데이터의 문제인지 규명을 위해 다시 Cardiomegaly데이터셋을 사용해서 증강. 다시 정상적으로 데이터가 증강되는 것을 확인

 

차례대로 Cardiomegaly, Edema(F), Edema(M) dataset을 이용한 증강 결과

결론: 기존의 PGGAN코드의 경우dataset의 알수없는 문제로 인해 많은 데이터셋에서 증강이 불가능 한 사실을 확인하게 되었음. 이를 해결하고자 논문의 PGGAN 코드를 사용하는데 까지는 성공하였으나 컴퓨터의 메모리 부족으로 이를 실행시키는것을 불가능했음. 



Data증강 두번째 시도: 기존 이미지를 Downsizing 해서 DCGAN모델을 이용, 증강 시도

활용한 논문

 

 

  1. DCGAN논문 코드를 다운받음
  2. 코드를 X-ray데이터셋에 맞게 변경: 
  • 1 channel -> 3 channel
  • MNIST dataset -> Custom dataset
  • image size 1024 -> 256



순서대로 증강된 Cardiomegaly, Edema이미지들

Cardiomegaly같은 경우에는 PGGAN을 이용해서 증강한 이미지가 더 좋다는 것을 확인할 수 있고, Edema같은 경우에는 X-ray의 형태를 띄고있기 때문에 PGGAN보다 성능이 좋다는 것을 확인 할 수 있지만 이를 실제로 사용하기에는 무리가 있다고 판단된다. 

결론: 논문에서는 이미지가 잘 증강된것을 확인할 수 있었는데 어찌된 일인지 해당 논문의 코드를 이용해서 데이터를 증강한 결과 쓸만한 이미지가 생성되지 않았다. 어쩌면 데이터의 절대량이 부족해서 그럴것이라는 생각도 든다. 어쨌든 현재로써는 GAN을 이용해서 데이터를 증강 하는 것 자체에 대한 의문점이 든다. 의료 데이터로 사용 할 수 있을만큼 충분히 정밀한 데이터를 만들어내는것이 불가능한것 같다. 

 

세번째 시도: GDGAN

  1. GDGAN xommunity code를 다운받음
  2. common_config를 데이터에 맞게 수정
  3. 학습시켜봄
  4. 이미지의 사이즈를 줄여서 증강시도 -> 성공

결론: 데이터를 애초에 분류를 한 후 증강을 하면 학습할 수 있는 이미지의 갯수가 애초에 적기 때문에 좋은 퀄리티의 이미지가 생성되지 않는다. 따라서 이를 보완하기 위해 다음과 같은 절차를 선택한다.

 

네번째 시도

1. 상대적으로 적은 수의 질병이미지를 학습 한 후 이미지를 생성한다. (학습시에는 질병, 성별로 폴더를 나누어 생성) - Cardiomegaly, Consolidation, Edema, Emphysema, Fibrosis, Pneumonia

2. 이렇게 생성된 이미지를 classification한다(높은 분포를 보이는 Effusion,  Infiltration,  Atelectasis로 classified된 이미지를 제외)

3. 이를 바탕으로 csv를 생성한다

4. 이를 트레이닝 데이터에 첨부한다.

5. test 정확도가 향상되는지 확인한다.

데이터량을 늘려서 GAN을 학습시킨 모습

DCGAN(Greyscale)
PGGAN

PGGAN으로 증강한 이미지의 퀄리티가 좋은 것을 한눈에 봐도 알 수 있다. 따라서 데이터 증강을 위한 GAN 모델로 PGGAN을 선택하였다.

<PGGAN 9265장의 이미지를 생성한 후 변화된 데이터 분포>

차례대로 Cardiomegaly, Emphysema, Effusion, Hernia, Infiltration, Mass, Nodule, Atelectasis, Pneumothorax, Pleural_Thickening, Pneumonia, Fibrosis, Edema, Consolidation

PGGAN사용 전 데이터 분포
PGGAN 사용 후 데이터 분포

 

** 이러한 과정을 위해 사용한 PGGAN 코드의 주요 부분은 아래와 같다.

(1) 데이터 분리

_, test_set = train_test_split(train_df_main, test_size = 0.2, random_state = 1993)

using_data, _ = train_test_split(train_df_main_agmented, test_size = 0.4, random_state = 1993)

train_set, valid_set = train_test_split(using_data, test_size = 0.2, random_state = 1993)

test데이터는 증강되지 않은 순수 X-ray데이터만 이용하였다. 또한 시간관계상 증강된 데이터와 기존의 데이터를 모두 합한 것에서 random sampling하여 60%만 학습과, validation에 이용하였다.

(2) Generator, Discreminator

class Generator(nn.Module):
    def __init__(self, z_dim, in_channels, img_channels=1):
        super(Generator, self).__init__()

        # initial takes 1x1 -> 4x4
        self.initial = nn.Sequential(
            PixelNorm(),
            nn.ConvTranspose2d(z_dim, in_channels, 4, 1, 0),
            nn.LeakyReLU(0.2),
            WSConv2d(in_channels, in_channels, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            PixelNorm(),
        )

        self.initial_rgb = WSConv2d(
            in_channels, img_channels, kernel_size=1, stride=1, padding=0
        )
        self.prog_blocks, self.rgb_layers = (
            nn.ModuleList([]),
            nn.ModuleList([self.initial_rgb]),
        )

        for i in range(
            len(factors) - 1
        ):  # -1 to prevent index error because of factors[i+1]
            conv_in_c = int(in_channels * factors[i])
            conv_out_c = int(in_channels * factors[i + 1])
            self.prog_blocks.append(ConvBlock(conv_in_c, conv_out_c))
            self.rgb_layers.append(
                WSConv2d(conv_out_c, img_channels, kernel_size=1, stride=1, padding=0)
            )

    def fade_in(self, alpha, upscaled, generated):
        # alpha should be scalar within [0, 1], and upscale.shape == generated.shape
        return torch.tanh(alpha * generated + (1 - alpha) * upscaled)

    def forward(self, x, alpha, steps):
        out = self.initial(x)

        if steps == 0:
            return self.initial_rgb(out)

        for step in range(steps):
            upscaled = F.interpolate(out, scale_factor=2, mode="nearest")
            out = self.prog_blocks[step](upscaled)

        # The number of channels in upscale will stay the same, while
        # out which has moved through prog_blocks might change. To ensure
        # we can convert both to rgb we use different rgb_layers
        # (steps-1) and steps for upscaled, out respectively
        final_upscaled = self.rgb_layers[steps - 1](upscaled)
        final_out = self.rgb_layers[steps](out)
        return self.fade_in(alpha, final_upscaled, final_out)
class Discriminator(nn.Module):
    def __init__(self, z_dim, in_channels, img_channels=1):
        super(Discriminator, self).__init__()
        self.prog_blocks, self.rgb_layers = nn.ModuleList([]), nn.ModuleList([])
        self.leaky = nn.LeakyReLU(0.2)

        # here we work back ways from factors because the discriminator
        # should be mirrored from the generator. So the first prog_block and
        # rgb layer we append will work for input size 1024x1024, then 512->256-> etc
        for i in range(len(factors) - 1, 0, -1):
            conv_in = int(in_channels * factors[i])
            conv_out = int(in_channels * factors[i - 1])
            self.prog_blocks.append(ConvBlock(conv_in, conv_out, use_pixelnorm=False))
            self.rgb_layers.append(
                WSConv2d(img_channels, conv_in, kernel_size=1, stride=1, padding=0)
            )

        # perhaps confusing name "initial_rgb" this is just the RGB layer for 4x4 input size
        # did this to "mirror" the generator initial_rgb
        self.initial_rgb = WSConv2d(
            img_channels, in_channels, kernel_size=1, stride=1, padding=0
        )
        self.rgb_layers.append(self.initial_rgb)
        self.avg_pool = nn.AvgPool2d(
            kernel_size=2, stride=2
        )  # down sampling using avg pool

        # this is the block for 4x4 input size
        self.final_block = nn.Sequential(
            # +1 to in_channels because we concatenate from MiniBatch std
            WSConv2d(in_channels + 1, in_channels, kernel_size=3, padding=1),
            nn.LeakyReLU(0.2),
            WSConv2d(in_channels, in_channels, kernel_size=4, padding=0, stride=1),
            nn.LeakyReLU(0.2),
            WSConv2d(
                in_channels, 1, kernel_size=1, padding=0, stride=1
            ),  # we use this instead of linear layer
        )

    def fade_in(self, alpha, downscaled, out):
        """Used to fade in downscaled using avg pooling and output from CNN"""
        # alpha should be scalar within [0, 1], and upscale.shape == generated.shape
        return alpha * out + (1 - alpha) * downscaled

    def minibatch_std(self, x):
        batch_statistics = (
            torch.std(x, dim=0).mean().repeat(x.shape[0], 1, x.shape[2], x.shape[3])
        )
        # we take the std for each example (across all channels, and pixels) then we repeat it
        # for a single channel and concatenate it with the image. In this way the discriminator
        # will get information about the variation in the batch/image
        return torch.cat([x, batch_statistics], dim=1)

    def forward(self, x, alpha, steps):
        # where we should start in the list of prog_blocks, maybe a bit confusing but
        # the last is for the 4x4. So example let's say steps=1, then we should start
        # at the second to last because input_size will be 8x8. If steps==0 we just
        # use the final block
        cur_step = len(self.prog_blocks) - steps

        # convert from rgb as initial step, this will depend on
        # the image size (each will have it's on rgb layer)
        out = self.leaky(self.rgb_layers[cur_step](x))

        if steps == 0:  # i.e, image is 4x4
            out = self.minibatch_std(out)
            return self.final_block(out).view(out.shape[0], -1)

        # because prog_blocks might change the channels, for down scale we use rgb_layer
        # from previous/smaller size which in our case correlates to +1 in the indexing
        downscaled = self.leaky(self.rgb_layers[cur_step + 1](self.avg_pool(x)))
        out = self.avg_pool(self.prog_blocks[cur_step](out))

        # the fade_in is done first between the downscaled and the input
        # this is opposite from the generator
        out = self.fade_in(alpha, downscaled, out)

        for step in range(cur_step + 1, len(self.prog_blocks)):
            out = self.prog_blocks[step](out)
            out = self.avg_pool(out)

        out = self.minibatch_std(out)
        return self.final_block(out).view(out.shape[0], -1)

(3) Generator, Discreminator 훈련

def train_fn(
    critic,
    gen,
    loader,
    dataset,
    step,
    alpha,
    opt_critic,
    opt_gen,
    tensorboard_step,
    writer,
    scaler_gen,
    scaler_critic,
):
    loop = tqdm(loader, leave=True)
    for batch_idx, (real, _) in enumerate(loop):
        real = real.to(config.DEVICE)
        cur_batch_size = real.shape[0]

        # Train Critic: max E[critic(real)] - E[critic(fake)] <-> min -E[critic(real)] + E[critic(fake)]
        # which is equivalent to minimizing the negative of the expression
        noise = torch.randn(cur_batch_size, config.Z_DIM, 1, 1).to(config.DEVICE)

        with torch.cuda.amp.autocast():
            fake = gen(noise, alpha, step)
            critic_real = critic(real, alpha, step)
            critic_fake = critic(fake.detach(), alpha, step)
            gp = gradient_penalty(critic, real, fake, alpha, step, device=config.DEVICE)
            loss_critic = (
                -(torch.mean(critic_real) - torch.mean(critic_fake))
                + config.LAMBDA_GP * gp
                + (0.001 * torch.mean(critic_real ** 2))
            )

        opt_critic.zero_grad()
        scaler_critic.scale(loss_critic).backward()
        scaler_critic.step(opt_critic)
        scaler_critic.update()

        # Train Generator: max E[critic(gen_fake)] <-> min -E[critic(gen_fake)]
        with torch.cuda.amp.autocast():
            gen_fake = critic(fake, alpha, step)
            loss_gen = -torch.mean(gen_fake)

        opt_gen.zero_grad()
        scaler_gen.scale(loss_gen).backward()
        scaler_gen.step(opt_gen)
        scaler_gen.update()

        # Update alpha and ensure less than 1
        alpha += cur_batch_size / (
            (config.PROGRESSIVE_EPOCHS[step] * 0.5) * len(dataset)
        )
        alpha = min(alpha, 1)

        if batch_idx % 500 == 0:
            with torch.no_grad():
                fixed_fakes = gen(config.FIXED_NOISE, alpha, step) * 0.5 + 0.5
            plot_to_tensorboard(
                writer,
                loss_critic.item(),
                loss_gen.item(),
                real.detach(),
                fixed_fakes.detach(),
                tensorboard_step,
            )
            tensorboard_step += 1

        loop.set_postfix(
            gp=gp.item(),
            loss_critic=loss_critic.item(),
        )

    return tensorboard_step, alpha

 

 

반응형