728x90
반응형
문제1) MNIST 데이터셋으로 분류기를 만들어 테스트 세트에서 97% 정확도를 달성해보세요.
힌트: `KNeighborsClassifier`가 이 작업에 아주 잘 맞습니다. 좋은 하이퍼파라미터 값만 찾으면 됩니다(`weights`와 `n_neighbors` 하이퍼파라미터로 그리드 탐색을 시도해보세요).
from sklearn.model_selection import GridSearchCV
knn_clf = KNeighborsClassifier()
grid_params = {
'n_neighbors' : list(range(1,20)),
'weights' : ["uniform", "distance"]
}
gs = GridSearchCV(knn_clf, grid_params, cv=3)
gs.fit(X_train[:10000], y_train[:10000])
print("Best Parameters : ", gs.best_params_)
print("Best Score : ", gs.best_score_)
print("Best Test Score : ", gs.score(X_test, y_test))
gs.best_estimator_.fit(X_train, y_train)
tuned_accuracy = gs.score(X_test, y_test)
# tuned_accuracy: 0.9714
문제2) Data Augmentation
MNIST 이미지를 (왼, 오른, 위, 아래) 어느 방향으로든 한 픽셀 이동시킬 수 있는 함수를 만들어보세요. `scipy.ndimage.interpolation` 모듈의 `shift()` 함수를 사용할 수 있습니다.
그런 다음 훈련 세트에 있는 각 이미지에 대해 네 개의 이동된 복사본(방향마다 한 개씩)을 만들어 훈련 세트에 추가하세요.
마지막으로 이 확장된 데이터셋에서 앞에서 찾은 최선의 모델을 훈련시키고 테스트 세트에서 정확도를 측정해보세요.
from scipy.ndimage import shift
def shift_image(image, dx, dy):
image = image.reshape((28, 28))
shifted_image = shift(image, [dy, dx], cval=0, mode="constant")
return shifted_image.reshape([-1])
X_train_augmented = [image for image in X_train]
y_train_augmented = [label for label in y_train]
for dx, dy in ((-1, 0), (1, 0), (0, 1), (0, -1)):
for image, label in zip(X_train, y_train):
X_train_augmented.append(shift_image(image, dx, dy))
y_train_augmented.append(label)
X_train_augmented = np.array(X_train_augmented)
y_train_augmented = np.array(y_train_augmented)
shuffle_idx = np.random.permutation(len(X_train_augmented))
X_train_augmented = X_train_augmented[shuffle_idx]
y_train_augmented = y_train_augmented[shuffle_idx]
knn_clf = KNeighborsClassifier(**grid_search.best_params_)
knn_clf.fit(X_train_augmented, y_train_augmented)
augmented_accuracy = knn_clf.score(X_test, y_test) # 97.63
문제3) 타이타닉 데이터셋 도전하기
from pathlib import Path
import pandas as pd
import tarfile
import urllib.request
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
# 1단계) 데이터 로드
def load_titanic_data():
tarball_path = Path("datasets/titanic.tgz")
if not tarball_path.is_file():
Path("datasets").mkdir(parents=True, exist_ok=True)
url = "https://github.com/ageron/data/raw/main/titanic.tgz"
urllib.request.urlretrieve(url, tarball_path)
with tarfile.open(tarball_path) as titanic_tarball:
titanic_tarball.extractall(path="datasets")
return [pd.read_csv(Path("datasets/titanic") / filename)
for filename in ("train.csv", "test.csv")]
train_data, test_data = load_titanic_data()
train_data = train_data.set_index("PassengerId")
test_data = test_data.set_index("PassengerId")
# 2단계) 데이터 전처리 파이프라인 생성
num_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler())
])
cat_pipeline = Pipeline([
("ordinal_encoder", OrdinalEncoder()),
("imputer", SimpleImputer(strategy="most_frequent")),
("cat_encoder", OneHotEncoder(sparse_output=False)),
])
num_attribs = ["Age", "SibSp", "Parch", "Fare"]
cat_attribs = ["Pclass", "Sex", "Embarked"]
preprocess_pipeline = ColumnTransformer([
("num", num_pipeline, num_attribs),
("cat", cat_pipeline, cat_attribs),
])
X_train = preprocess_pipeline.fit_transform(train_data)
y_train = train_data["Survived"]
X_test = preprocess_pipeline.transform(test_data)
# 3단계) 모델 생성 및 훈련
forest_clf = RandomForestClassifier(n_estimators=100, random_state=42)
forest_clf.fit(X_train, y_train)
y_pred = forest_clf.predict(X_test)
forest_scores = cross_val_score(forest_clf, X_train, y_train, cv=10)
forest_scores.mean() # 81.37
svm_clf = SVC(gamma="auto")
svm_scores = cross_val_score(svm_clf, X_train, y_train, cv=10)
svm_scores.mean() # 82.49
문제4) 스팸 분류기를 만들어보세요
import tarfile
import email
import email.policy
import numpy as np
import re
import nltk
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from html import unescape
from scipy.sparse import csr_matrix
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
# 1단계) 데이터 로드
def fetch_spam_data():
spam_root = "http://spamassassin.apache.org/old/publiccorpus/"
ham_url = spam_root + "20030228_easy_ham.tar.bz2"
spam_url = spam_root + "20030228_spam.tar.bz2"
spam_path = Path() / "datasets" / "spam"
spam_path.mkdir(parents=True, exist_ok=True)
for dir_name, tar_name, url in (("easy_ham", "ham", ham_url),
("spam", "spam", spam_url)):
if not (spam_path / dir_name).is_dir():
path = (spam_path / tar_name).with_suffix(".tar.bz2")
print("Downloading", path)
urllib.request.urlretrieve(url, path)
tar_bz2_file = tarfile.open(path)
tar_bz2_file.extractall(path=spam_path)
tar_bz2_file.close()
return [spam_path / dir_name for dir_name in ("easy_ham", "spam")]
ham_dir, spam_dir = fetch_spam_data()
ham_filenames = [f for f in sorted(ham_dir.iterdir()) if len(f.name) > 20]
spam_filenames = [f for f in sorted(spam_dir.iterdir()) if len(f.name) > 20]
ham_emails = [load_email(filepath) for filepath in ham_filenames]
spam_emails = [load_email(filepath) for filepath in spam_filenames]
# 2단계) 데이터 전처리
def get_email_structure(email):
if isinstance(email, str):
return email
payload = email.get_payload()
if isinstance(payload, list):
multipart = ", ".join([get_email_structure(sub_email)
for sub_email in payload])
return f"multipart({multipart})"
else:
return email.get_content_type()
def html_to_plain_text(html):
text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
text = re.sub('<.*?>', '', text, flags=re.M | re.S)
text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
return unescape(text)
def email_to_text(email):
html = None
for part in email.walk():
ctype = part.get_content_type()
if not ctype in ("text/plain", "text/html"):
continue
try:
content = part.get_content()
except: # in case of encoding issues
content = str(part.get_payload())
if ctype == "text/plain":
return content
else:
html = content
if html:
return html_to_plain_text(html)
X = np.array(ham_emails + spam_emails, dtype=object)
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
html_spam_emails = [email for email in X_train[y_train==1]
if get_email_structure(email) == "text/html"]
# 3단계) 데이터 전처리 파이프라인 생성
stemmer = nltk.PorterStemmer()
class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
def __init__(self, strip_headers=True, lower_case=True,
remove_punctuation=True, replace_urls=True,
replace_numbers=True, stemming=True):
self.strip_headers = strip_headers
self.lower_case = lower_case
self.remove_punctuation = remove_punctuation
self.replace_urls = replace_urls
self.replace_numbers = replace_numbers
self.stemming = stemming
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
X_transformed = []
for email in X:
text = email_to_text(email) or ""
if self.lower_case:
text = text.lower()
if self.replace_urls and url_extractor is not None:
urls = list(set(url_extractor.find_urls(text)))
urls.sort(key=lambda url: len(url), reverse=True)
for url in urls:
text = text.replace(url, " URL ")
if self.replace_numbers:
text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)
if self.remove_punctuation:
text = re.sub(r'\W+', ' ', text, flags=re.M)
word_counts = Counter(text.split())
if self.stemming and stemmer is not None:
stemmed_word_counts = Counter()
for word, count in word_counts.items():
stemmed_word = stemmer.stem(word)
stemmed_word_counts[stemmed_word] += count
word_counts = stemmed_word_counts
X_transformed.append(word_counts)
return np.array(X_transformed)
class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
def __init__(self, vocabulary_size=1000):
self.vocabulary_size = vocabulary_size
def fit(self, X, y=None):
total_count = Counter()
for word_count in X:
for word, count in word_count.items():
total_count[word] += min(count, 10)
most_common = total_count.most_common()[:self.vocabulary_size]
self.vocabulary_ = {word: index + 1
for index, (word, count) in enumerate(most_common)}
return self
def transform(self, X, y=None):
rows = []
cols = []
data = []
for row, word_count in enumerate(X):
for word, count in word_count.items():
rows.append(row)
cols.append(self.vocabulary_.get(word, 0))
data.append(count)
return csr_matrix((data, (rows, cols)),
shape=(len(X), self.vocabulary_size + 1))
preprocess_pipeline = Pipeline([
("email_to_wordcount", EmailToWordCounterTransformer()),
("wordcount_to_vector", WordCounterToVectorTransformer()),
])
X_train_transformed = preprocess_pipeline.fit_transform(X_train)
# 4단계) 훈련
log_clf = LogisticRegression(max_iter=1000, random_state=42)
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3)
score.mean() # 98.5
728x90
반응형
'ML' 카테고리의 다른 글
[머신러닝] 3분 만에 이해되는 K-Means 클러스터링 (0) | 2024.08.27 |
---|