파이토치 간단한 ai 챗봇 소스 트랜스포머 모델이 아니라 성능은 안 좋음 초보자 분석용으로 추천 ~
페이지 정보
작성자 최고관리자 작성일 24-04-25 04:18 조회 682 댓글 0본문
import torch
import torch.nn as nn
from io import open
import string
import re
import time
import random
import torchtext
from torchtext import data
import torch.optim as optim
from collections import Counter
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data import DataLoader, Dataset
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# for reproducibility
torch.manual_seed(777)
if device == 'cuda':
torch.cuda.manual_seed_all(777)
#####
MAX_LENGTH = 100 #문장의 최대 길이
max_vocab_size=150000
optimizer_learning_rate = 0.03 # modified learning rate from 0.01 to 1
drop_out_rate = 0.01
# 파라미터 설정
embedding_dim = 1200
batch_size = 32
learning_rate = 0.002
num_epochs = 30
#####
import gc
gc.collect()
# 소문자, 다듬기, 그리고 문자가 아닌 문자 제거
def normalizeString(s):
s = re.sub(r"([.!?])", r" \1", s)
s = re.sub(r"[^a-zA-Zㄱ-ㅎㅏ-ㅣ가-힣.!?]+", r" ", s)
return s
import requests
def readLangs():
print("Reading lines...")
response = requests.get('https://magu.co.kr/ai/sort-talk-and-gam.htm')
lines = response.text.strip().split('\n')
# 파일을 읽고 줄로 분리
#lines = open(g_sourcetext, encoding='utf-8').\
# read().strip().split('\n')
#print(lines)
# 모든 줄을 쌍으로 분리하고 정규화
pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines]
#print(pairs)
temp = 0
temp2 = len(pairs)/2
input_lang = []
output_lang = []
for i in range(int(temp2)): #8이면 4
input_lang = input_lang + pairs[temp] #0,2,4,6,
output_lang = output_lang+pairs[temp+1]
temp = temp +2
return input_lang, output_lang, pairs
######################################################################
# 데이터 준비를 위한 전체 과정:
#
# - 텍스트 파일을 읽고 줄로 분리하고, 줄을 쌍으로 분리합니다.
# - 쌍을 이룬 문장들로 단어 리스트를 생성합니다.
def prepareData():
input_lang, output_lang, pairs = readLangs()
print("Read %s sentence pairs" % len(pairs))
return input_lang, output_lang, pairs
inputs_data, outpus_data, pairs = prepareData()
print("outpus_data[5]", outpus_data[5])
#########################################
# 어휘 구성
def build_vocabulary(data, max_vocab_size):
counter = Counter()
for sentence in data:
tokens = sentence.split() # 단어 수준으로 토큰화
# 중복을 제거하고 고유한 토큰 집합을 생성
unique_tokens = list(set(tokens))
counter.update(unique_tokens)
# 가장 많이 나타나는 단어들을 선택하여 어휘를 구성
vocab = {word: idx for idx, (word, count) in enumerate(counter.most_common(max_vocab_size))}
vocab['<PAD>'] = len(vocab) # 패딩 토큰을 어휘에 추가
vocab['<UNK>'] = len(vocab)
vocab2text = {idx: word for word, idx in vocab.items()}
return vocab,vocab2text
g_vocab, g_vocab2text = build_vocabulary(inputs_data+ outpus_data, max_vocab_size)
print("len(g_vocab2text)",len(g_vocab2text))
# 토큰화 및 어휘 인덱싱
def tokenize_and_index(data, vocab):
print("data[5]",data[5])
original_word = data[5]
tokenized_data = []
for sentence in data:
tokens = sentence.split() # 단어 수준으로 토큰화
indices = [vocab.get(token, vocab.get('<UNK>', len(vocab))) for token in tokens]
tokenized_data.append(indices)
print("tokenized_data[5]",tokenized_data[5])
original_word = tokenized_data[5]
original_words =""
for i in range(len(original_word)):
# 각 요소를 추출하여 스칼라 값으로 변환 (필요하면)
original_words = original_words + " "+ g_vocab2text[original_word[i]]
print("original_word len(original_word)",len(original_word))
print("original_word",original_words)
return tokenized_data
# 패딩
def pad_sequences(data, pad_idx, max_length):
padded_data = []
for sequence in data:
padded_sequence = sequence[:max_length]
padded_sequence += [pad_idx] * (max_length - len(padded_sequence))
padded_data.append(padded_sequence)
return torch.tensor(padded_data)
# 데이터 준비 단계
def prepare_data(questions, answers, max_length=MAX_LENGTH):
# 어휘 구성
pad_idx = g_vocab['<PAD>'] # 패딩 인덱스
# 질문과 답변을 토큰화하고 어휘 인덱스에 맞게 변환
questions_indexed = tokenize_and_index(questions, g_vocab)
answers_indexed = tokenize_and_index(answers, g_vocab)
# 패딩 적용
questions_padded = pad_sequences(questions_indexed, pad_idx, max_length)
answers_padded = pad_sequences(answers_indexed, pad_idx, max_length)
return questions_padded, answers_padded
# 데이터 준비
questions_padded, answers_padded = prepare_data(inputs_data, outpus_data)
def adjust_data_lengths(questions_padded, answers_padded):
# 두 데이터셋의 행 수를 비교
num_questions = questions_padded.shape[0]
num_answers = answers_padded.shape[0]
# 데이터셋의 행 수를 짧은 것에 맞추어 조정
if num_questions < num_answers:
# answers_padded를 짧은 길이에 맞춰 자르기
answers_padded = answers_padded[:num_questions]
elif num_questions > num_answers:
# questions_padded를 짧은 길이에 맞춰 자르기
questions_padded = questions_padded[:num_answers]
return questions_padded, answers_padded
# 데이터의 행 크기 조정 행 크기 동일하게 짧은 길이에 맞게 자르기
questions_padded, answers_padded = adjust_data_lengths(questions_padded, answers_padded)
# 결과 확인
#print(inputs_data)
print("질문 패딩 후:", questions_padded.size())
print("답변 패딩 후:", answers_padded.size())
#print("어휘:", vocab)
# 데이터셋 클래스
class ChatbotDataset(Dataset):
def __init__(self, questions, answers):
self.questions = questions
self.answers = answers
def __len__(self):
return len(self.questions)
def __getitem__(self, idx):
return self.questions[idx], self.answers[idx]
# 모델 정의
class ChatbotModel(nn.Module):
def __init__(self):
super(ChatbotModel, self).__init__()
self.embedding = nn.Embedding(len(g_vocab2text), embedding_dim) # 임베딩 레이어
self.elu = nn.ELU(alpha=1.0)
self.dropout = nn.Dropout(drop_out_rate)
self.output_layer = nn.Linear(embedding_dim, len(g_vocab2text)) # 출력 레이어:
def forward(self, x):
x = self.embedding(x) # 임베딩 레이어를 통해 입력 처리
x= self.elu(x)
x = self.dropout(x)
output = self.output_layer(x) # 출력 레이어를 통해 최종 출력 계산
return output
# 모델 생성
model = ChatbotModel().to(device)
# 손실 함수 및 옵티마이저 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 데이터셋을 미리 지정한 장치로 이동시킵니다.
questions_padded = questions_padded.to(device)
answers_padded = answers_padded.to(device)
# 데이터셋과 데이터 로더 생성
dataset = ChatbotDataset(questions_padded, answers_padded)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 문장에서 <pad> 토큰을 삭제하는 함수
def remove_pad_from_sentence(sentence, pad_token='<PAD>'):
# str.replace() 메서드를 사용하여 <pad> 토큰을 빈 문자열로 대체하여 삭제
cleaned_sentence = sentence.replace(pad_token, "")
return cleaned_sentence.strip() # 문장 양쪽 공백을 제거하여 깔끔한 문자열 반환
import time
# 모델 훈련
for epoch in range(num_epochs):
total_loss = 0
start_time = time.time() # 시작 시간 기록
# 배치별로 데이터 처리
#for question_batch, answer_batch in dataloader:
for i, (question_batch, answer_batch) in enumerate(dataloader):
start_time = time.time() # 시작 시간 기록
# 모델의 예측 수행
output_batch = model(question_batch)
end_time = time.time() # 종료 시간 기록
elapsed_time = end_time - start_time # 총 학습 시간 계산
print(f"i {i} 배치 크기 {batch_size}: 학습 시간 {elapsed_time:.4f} 초")
loss = criterion(output_batch.view(-1, len(g_vocab2text)), answer_batch.view(-1))
total_loss += loss.item()
# 옵티마이저 초기화 및 가중치 업데이트
optimizer.zero_grad()
loss.backward()
optimizer.step()
end_time = time.time() # 종료 시간 기록
elapsed_time = end_time - start_time # 총 학습 시간 계산
print(f"배치 크기 {batch_size}: 학습 시간 {elapsed_time:.4f} 초")
# 각 에포크별 손실 출력
if epoch % 1 == 0:
print(f'Epoch {epoch + 1}, Loss: {total_loss / len(dataloader):.4f}')
sorted_probs, sorted_indices = torch.sort(output_batch[1], dim=-1, descending=True)
_, predicted_index = torch.max(output_batch[2], dim=-1)
#가장 확률 높은 단어 추출
predicted_word = ""
for i in range(len(predicted_index)):
predicted_word = predicted_word + " "+ g_vocab2text[predicted_index[i].item()]
print("predicted_word",remove_pad_from_sentence(predicted_word))
original_word = answer_batch[2]
question_word = question_batch[2]
question_words =""
for i in range(30):
question_words = question_words + " "+ g_vocab2text[question_word[i].item()]
print("question_word", remove_pad_from_sentence(question_words))
original_words =""
for i in range(30):
original_words = original_words + " "+ g_vocab2text[original_word[i].item()]
print("original_word", remove_pad_from_sentence(original_words))
########################################
#모델 저장
import re, os
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)
#모델 저장
model_save_path = os.path.join(save_dir, "chatbot_model.pth")
torch.save(
{
"model_state_dict": model.state_dict(),
"g_vocab": g_vocab, # 어휘 정보 저장
"g_vocab2text": g_vocab2text # 인덱스에서 단어로 변환하기 위한 사전 저장
},
model_save_path,
)
########################################
#모델 로드
model2 = ChatbotModel().to(device)
model_load_path = os.path.join(save_dir, "chatbot_model.pth")
checkpoint = torch.load(model_load_path)
model2.load_state_dict(checkpoint["model_state_dict"])
# 어휘 정보 로드
vocab = checkpoint["g_vocab"]
vocab2text_loaded = checkpoint["g_vocab2text"]
#tokenized_data= 'ㅋㅋ 마음 같아서는 대신 먹어주고 싶다 이거죠 .'
tokenized_data = '[1, 460, 461, 462, 463, 77, 464, 0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,]'
tokenized_data_list = eval(tokenized_data)
# 리스트를 파이토치 텐서로 변환
tokenized_data_tensor = torch.tensor(tokenized_data_list).to(device)
print(tokenized_data_tensor)
output_a = model2(tokenized_data_tensor).to(device)
print("output_a", output_a,output_a.size())
_, predicted_index = torch.max(output_a, dim=-1)
predicted_word =""
for i in range(50):
predicted_word = predicted_word + " "+ vocab2text_loaded[predicted_index[i].item()]
print("answer", remove_pad_from_sentence(predicted_word))
import torch.nn as nn
from io import open
import string
import re
import time
import random
import torchtext
from torchtext import data
import torch.optim as optim
from collections import Counter
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data import DataLoader, Dataset
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# for reproducibility
torch.manual_seed(777)
if device == 'cuda':
torch.cuda.manual_seed_all(777)
#####
MAX_LENGTH = 100 #문장의 최대 길이
max_vocab_size=150000
optimizer_learning_rate = 0.03 # modified learning rate from 0.01 to 1
drop_out_rate = 0.01
# 파라미터 설정
embedding_dim = 1200
batch_size = 32
learning_rate = 0.002
num_epochs = 30
#####
import gc
gc.collect()
# 소문자, 다듬기, 그리고 문자가 아닌 문자 제거
def normalizeString(s):
s = re.sub(r"([.!?])", r" \1", s)
s = re.sub(r"[^a-zA-Zㄱ-ㅎㅏ-ㅣ가-힣.!?]+", r" ", s)
return s
import requests
def readLangs():
print("Reading lines...")
response = requests.get('https://magu.co.kr/ai/sort-talk-and-gam.htm')
lines = response.text.strip().split('\n')
# 파일을 읽고 줄로 분리
#lines = open(g_sourcetext, encoding='utf-8').\
# read().strip().split('\n')
#print(lines)
# 모든 줄을 쌍으로 분리하고 정규화
pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines]
#print(pairs)
temp = 0
temp2 = len(pairs)/2
input_lang = []
output_lang = []
for i in range(int(temp2)): #8이면 4
input_lang = input_lang + pairs[temp] #0,2,4,6,
output_lang = output_lang+pairs[temp+1]
temp = temp +2
return input_lang, output_lang, pairs
######################################################################
# 데이터 준비를 위한 전체 과정:
#
# - 텍스트 파일을 읽고 줄로 분리하고, 줄을 쌍으로 분리합니다.
# - 쌍을 이룬 문장들로 단어 리스트를 생성합니다.
def prepareData():
input_lang, output_lang, pairs = readLangs()
print("Read %s sentence pairs" % len(pairs))
return input_lang, output_lang, pairs
inputs_data, outpus_data, pairs = prepareData()
print("outpus_data[5]", outpus_data[5])
#########################################
# 어휘 구성
def build_vocabulary(data, max_vocab_size):
counter = Counter()
for sentence in data:
tokens = sentence.split() # 단어 수준으로 토큰화
# 중복을 제거하고 고유한 토큰 집합을 생성
unique_tokens = list(set(tokens))
counter.update(unique_tokens)
# 가장 많이 나타나는 단어들을 선택하여 어휘를 구성
vocab = {word: idx for idx, (word, count) in enumerate(counter.most_common(max_vocab_size))}
vocab['<PAD>'] = len(vocab) # 패딩 토큰을 어휘에 추가
vocab['<UNK>'] = len(vocab)
vocab2text = {idx: word for word, idx in vocab.items()}
return vocab,vocab2text
g_vocab, g_vocab2text = build_vocabulary(inputs_data+ outpus_data, max_vocab_size)
print("len(g_vocab2text)",len(g_vocab2text))
# 토큰화 및 어휘 인덱싱
def tokenize_and_index(data, vocab):
print("data[5]",data[5])
original_word = data[5]
tokenized_data = []
for sentence in data:
tokens = sentence.split() # 단어 수준으로 토큰화
indices = [vocab.get(token, vocab.get('<UNK>', len(vocab))) for token in tokens]
tokenized_data.append(indices)
print("tokenized_data[5]",tokenized_data[5])
original_word = tokenized_data[5]
original_words =""
for i in range(len(original_word)):
# 각 요소를 추출하여 스칼라 값으로 변환 (필요하면)
original_words = original_words + " "+ g_vocab2text[original_word[i]]
print("original_word len(original_word)",len(original_word))
print("original_word",original_words)
return tokenized_data
# 패딩
def pad_sequences(data, pad_idx, max_length):
padded_data = []
for sequence in data:
padded_sequence = sequence[:max_length]
padded_sequence += [pad_idx] * (max_length - len(padded_sequence))
padded_data.append(padded_sequence)
return torch.tensor(padded_data)
# 데이터 준비 단계
def prepare_data(questions, answers, max_length=MAX_LENGTH):
# 어휘 구성
pad_idx = g_vocab['<PAD>'] # 패딩 인덱스
# 질문과 답변을 토큰화하고 어휘 인덱스에 맞게 변환
questions_indexed = tokenize_and_index(questions, g_vocab)
answers_indexed = tokenize_and_index(answers, g_vocab)
# 패딩 적용
questions_padded = pad_sequences(questions_indexed, pad_idx, max_length)
answers_padded = pad_sequences(answers_indexed, pad_idx, max_length)
return questions_padded, answers_padded
# 데이터 준비
questions_padded, answers_padded = prepare_data(inputs_data, outpus_data)
def adjust_data_lengths(questions_padded, answers_padded):
# 두 데이터셋의 행 수를 비교
num_questions = questions_padded.shape[0]
num_answers = answers_padded.shape[0]
# 데이터셋의 행 수를 짧은 것에 맞추어 조정
if num_questions < num_answers:
# answers_padded를 짧은 길이에 맞춰 자르기
answers_padded = answers_padded[:num_questions]
elif num_questions > num_answers:
# questions_padded를 짧은 길이에 맞춰 자르기
questions_padded = questions_padded[:num_answers]
return questions_padded, answers_padded
# 데이터의 행 크기 조정 행 크기 동일하게 짧은 길이에 맞게 자르기
questions_padded, answers_padded = adjust_data_lengths(questions_padded, answers_padded)
# 결과 확인
#print(inputs_data)
print("질문 패딩 후:", questions_padded.size())
print("답변 패딩 후:", answers_padded.size())
#print("어휘:", vocab)
# 데이터셋 클래스
class ChatbotDataset(Dataset):
def __init__(self, questions, answers):
self.questions = questions
self.answers = answers
def __len__(self):
return len(self.questions)
def __getitem__(self, idx):
return self.questions[idx], self.answers[idx]
# 모델 정의
class ChatbotModel(nn.Module):
def __init__(self):
super(ChatbotModel, self).__init__()
self.embedding = nn.Embedding(len(g_vocab2text), embedding_dim) # 임베딩 레이어
self.elu = nn.ELU(alpha=1.0)
self.dropout = nn.Dropout(drop_out_rate)
self.output_layer = nn.Linear(embedding_dim, len(g_vocab2text)) # 출력 레이어:
def forward(self, x):
x = self.embedding(x) # 임베딩 레이어를 통해 입력 처리
x= self.elu(x)
x = self.dropout(x)
output = self.output_layer(x) # 출력 레이어를 통해 최종 출력 계산
return output
# 모델 생성
model = ChatbotModel().to(device)
# 손실 함수 및 옵티마이저 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 데이터셋을 미리 지정한 장치로 이동시킵니다.
questions_padded = questions_padded.to(device)
answers_padded = answers_padded.to(device)
# 데이터셋과 데이터 로더 생성
dataset = ChatbotDataset(questions_padded, answers_padded)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 문장에서 <pad> 토큰을 삭제하는 함수
def remove_pad_from_sentence(sentence, pad_token='<PAD>'):
# str.replace() 메서드를 사용하여 <pad> 토큰을 빈 문자열로 대체하여 삭제
cleaned_sentence = sentence.replace(pad_token, "")
return cleaned_sentence.strip() # 문장 양쪽 공백을 제거하여 깔끔한 문자열 반환
import time
# 모델 훈련
for epoch in range(num_epochs):
total_loss = 0
start_time = time.time() # 시작 시간 기록
# 배치별로 데이터 처리
#for question_batch, answer_batch in dataloader:
for i, (question_batch, answer_batch) in enumerate(dataloader):
start_time = time.time() # 시작 시간 기록
# 모델의 예측 수행
output_batch = model(question_batch)
end_time = time.time() # 종료 시간 기록
elapsed_time = end_time - start_time # 총 학습 시간 계산
print(f"i {i} 배치 크기 {batch_size}: 학습 시간 {elapsed_time:.4f} 초")
loss = criterion(output_batch.view(-1, len(g_vocab2text)), answer_batch.view(-1))
total_loss += loss.item()
# 옵티마이저 초기화 및 가중치 업데이트
optimizer.zero_grad()
loss.backward()
optimizer.step()
end_time = time.time() # 종료 시간 기록
elapsed_time = end_time - start_time # 총 학습 시간 계산
print(f"배치 크기 {batch_size}: 학습 시간 {elapsed_time:.4f} 초")
# 각 에포크별 손실 출력
if epoch % 1 == 0:
print(f'Epoch {epoch + 1}, Loss: {total_loss / len(dataloader):.4f}')
sorted_probs, sorted_indices = torch.sort(output_batch[1], dim=-1, descending=True)
_, predicted_index = torch.max(output_batch[2], dim=-1)
#가장 확률 높은 단어 추출
predicted_word = ""
for i in range(len(predicted_index)):
predicted_word = predicted_word + " "+ g_vocab2text[predicted_index[i].item()]
print("predicted_word",remove_pad_from_sentence(predicted_word))
original_word = answer_batch[2]
question_word = question_batch[2]
question_words =""
for i in range(30):
question_words = question_words + " "+ g_vocab2text[question_word[i].item()]
print("question_word", remove_pad_from_sentence(question_words))
original_words =""
for i in range(30):
original_words = original_words + " "+ g_vocab2text[original_word[i].item()]
print("original_word", remove_pad_from_sentence(original_words))
########################################
#모델 저장
import re, os
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)
#모델 저장
model_save_path = os.path.join(save_dir, "chatbot_model.pth")
torch.save(
{
"model_state_dict": model.state_dict(),
"g_vocab": g_vocab, # 어휘 정보 저장
"g_vocab2text": g_vocab2text # 인덱스에서 단어로 변환하기 위한 사전 저장
},
model_save_path,
)
########################################
#모델 로드
model2 = ChatbotModel().to(device)
model_load_path = os.path.join(save_dir, "chatbot_model.pth")
checkpoint = torch.load(model_load_path)
model2.load_state_dict(checkpoint["model_state_dict"])
# 어휘 정보 로드
vocab = checkpoint["g_vocab"]
vocab2text_loaded = checkpoint["g_vocab2text"]
#tokenized_data= 'ㅋㅋ 마음 같아서는 대신 먹어주고 싶다 이거죠 .'
tokenized_data = '[1, 460, 461, 462, 463, 77, 464, 0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,]'
tokenized_data_list = eval(tokenized_data)
# 리스트를 파이토치 텐서로 변환
tokenized_data_tensor = torch.tensor(tokenized_data_list).to(device)
print(tokenized_data_tensor)
output_a = model2(tokenized_data_tensor).to(device)
print("output_a", output_a,output_a.size())
_, predicted_index = torch.max(output_a, dim=-1)
predicted_word =""
for i in range(50):
predicted_word = predicted_word + " "+ vocab2text_loaded[predicted_index[i].item()]
print("answer", remove_pad_from_sentence(predicted_word))
첨부파일
- C-1.py (8.9K) 6회 다운로드 | DATE : 2024-04-25 04:23:01
- 이전글 파이토치 인코더 모델 없이 임베딩, 라이너, 활성화 함수만 사용해서 만들어본 텍스트 예상 (책 텍스트를 그냥 넣는) 프로그램 초보자 코딩 연습용
- 다음글 import pywintypes ImportError: No module named pywintypes fixed
댓글목록 0
등록된 댓글이 없습니다.