[참조 링크] https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
: 근데 이렇게 코드 한줄로 일반 데이터 패러렐하는 것의 문제점은 메인 지피유를 설정햇을때 일의 부하가 한 노드로 쏠림! 지피유를 골고루 사용하지 못해서 시간이 느려짐
model = Toy().to(device) # 모델 선언
model = DataParallel(model) # 데이터 패러렐
그래서 나온 개념이 Data Parallel 로, 멀티 지피유(노드4개)를 다 써서 데이터셋을 처리하고 싶다는 취지이다.
<Distributed Data Parallel>
**** 운영체제 정리(process, thread) ****
- Program : HDD(hard disk drive)에 코드의 형태로 저장되어있음
- Process : cpu에 있는 메모리(RAM)에 소스코드를올림 → 운영체제는 프로세스 단위로 돌아간다 ============================================================== CPU(보통 1개 컴퓨터에 1개) 안에는 core가 여러개 들어있다. (ex) hexa core, octa core 등) core 안에는 thread 가 여러개 있다(ex) 2개) 보통 thread 하나 당 task 하나를 처리할 수 있다. → 그래서 1개 (octa core 2 thread**)** cpu에서 8x2=16 task 를 처리할 수 있다. Single-Process Single-Thread Single-Process Multi-Thread Multi-Process Multi-Thread ============================================================== CPU 프로세스 : 한번에 하나 일밖에 못함,, 여러개 명령체제를 사용하고그걸 멀티 스레드라고 함 그래서 Distributed Data Parallel은 멀티 프로세스 사용 -> gpu가 여러개일때 gpu하나당 프로세스 하나를 할당하게 해서 사용하기 쉽다
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# setup: 프로그램을 실행시키면 항상 싱글 프로세스로만 시작함 그래서 코드에 멀티 프로세스로 돌릴거다 하는 내용이 들어가야함
def setup(rank, world_size):
# 윈도우 32bit 일때/
if sys.platform == 'win32':
# Distributed package only covers collective communications with Gloo
# backend and FileStore on Windows platform. Set init_method parameter
# in init_process_group to a local file.
# Example init_method="file:///f:/libtmp/some_file"
init_method="file:///{your local file path}"
# initialize the process group
dist.init_process_group(
"gloo", # 고정
init_method=init_method,
rank=rank, # 디바이스 넘버 ex) cuda:0 같은
world_size=world_size # mp(멀티 프로세스)를 몇개 생성할 것인가(4,8 등)
)
else: # 리눅스 등의 os는 환경변수를 사용
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
# 멀티 프로세스를 종료하는 역할
def cleanup():
dist.destroy_process_group()
# example model
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
# 완벽한 디디피는 아님 그냥 멀티 프로세스만 사용
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
# 미니배치는 아님
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
# 런데모가 실제로 DDP 실행
# 디디피는 모델하나를 프로세스 여러개를 사용해서 돌리는 것
def run_demo(demo_fn, world_size):
# mp = multiprocessing/ spawn= demo_fn: 본체 데모함수 실행
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True) # join:프로세스 결과를 다 하나로 합치겟다 라는 의미이다.
ToyModel : loss랑 gradient backward 를 하는 머신러닝 모델이 아니면 DDP가 필요없음
[AssertionError: DistributedDataParallel is not needed when a module doesn't have any parameter that requires a gradient.]
# 위 함수에서 torch.save, torch.load로 만드는 거 랑 같은 기능
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0: # 만약 랭크가 0이면 미리 말해놓은 패스에다가 저장 -> 왜 쿠다0 만 저장하는가? 한꺼번에 세이브하는게 더 효율적이다
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier() # 쿠다 제로의 내용이 다 저장되기전까지 1,2,3을 멈춰서 충돌이 없도록 하는것
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank} # 쿠다0랑 현재 사용 gpu dictionary 만듦
ddp_model.load_state_dict( # 걔를 로드 서로다른 지피유에서 돌아간거도 저장할수 있도록 그리고 마지막에는 꼭 지워줍시다 나중에 리눅스에서는 env파일에 남아잇어요
torch.load(CHECKPOINT_PATH, map_location=map_location))
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn = nn.MSELoss()
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
# Model Parallel : 모델 하나를 지피유 여러개에 쪼개서 올림 to(device) 가 아니라 to(dev0) to(dev1)이런식으로 나눈다 프로세스는 4개 사용
# ex)디바이스는 2개 지피유는 8장 사용 -> 4개 포로세스가 지피유 2갸씩 나눠쓰게 하는것
# 모델 패러렐을 사용하는 이유는 모델 자체가 너무 클때,, 사용한다 벗 모델이 작을때는 굳이 안해도 된다 컴퓨터에서 연산속도가 느린 거는 인풋 아웃풋 같은거 메모리에서 인풋 가져오거나 모닡터애 출력하는거가 느린것이다
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = rank * 2
dev1 = rank * 2 + 1
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
if n_gpus < 8:
print(f"Requires at least 8 GPUs to run, but got {n_gpus}.")
else:
run_demo(demo_basic, 8)
run_demo(demo_checkpoint, 8)
run_demo(demo_model_parallel, 4)
'Python 및 Torch 코딩 이모저모' 카테고리의 다른 글
리눅스에 파이썬 새로운 버전 설치하기! (0) | 2023.10.25 |
---|---|
HuggingFace 실습(PEFT) : 2. Train (0) | 2023.10.17 |
HuggingFace 실습(PEFT) : 1. Data (0) | 2023.10.15 |
GGML 개념 정리 (1) | 2023.10.09 |
*list 와 **dict : unpacking 방법 (0) | 2023.08.09 |