from io import TextIOWrapper from zipfile import ZipFile import math import numpy as np trnX = np.load("newsgroups_trnX.npy") valX = np.load("newsgroups_valX.npy") tstX = np.load("newsgroups_tstX.npy") trnY = np.zeros(trnX.shape[0]).astype("int32") valY = np.zeros(valX.shape[0]).astype("int32") with ZipFile("ml530-2022-fall-newsgroups.zip", "r") as archive: with TextIOWrapper(archive.open("newsgroups_trn.csv", "r")) as file: header = file.readline() for i in range(trnY.shape[0]): trnY[i] = np.int32(file.readline().strip("\r\n").split(",")[1]) with TextIOWrapper(archive.open("newsgroups_val.csv", "r")) as file: header = file.readline() for i in range(valY.shape[0]): valY[i] = np.int32(file.readline().strip("\r\n").split(",")[1]) tokenIndex = dict() index = 1 for line in open("vocabulary.dat", "r", encoding = "utf-8"): value = line.strip("\r\n").split("\t") tokenIndex[value[1]] = index index = index + 1 vocabularySize = index pretrained = np.zeros((vocabularySize, 300), dtype = "float32") with ZipFile("glove.6B.zip", "r") as archive: with archive.open("glove.6B.300d.txt", "r") as file: for line in file: token, weights = line.decode("utf-8").split(" ", maxsplit = 1) if (token in tokenIndex): pretrained[tokenIndex[token]] = np.fromstring(weights, dtype = "float32", sep = " ") from tensorflow.keras import callbacks, experimental, initializers, layers, models, optimizers, regularizers from keras_tuner import HyperModel from keras_tuner.tuners import RandomSearch, Hyperband, BayesianOptimization class CustomHyperModel(HyperModel): def build(self, hp): embedding_type = hp.Choice("embedding_type", values = [ "uninitialized", "initialized", "frozen" ]) width = hp.Choice("width", values = [ 64, 128, 256, 512 ]) regularization_function = hp.Choice("regularization_function", values = [ "none", "l1", "l2", "l1_l2" ]) regularization_penalty = hp.Choice("regularization_penalty", values = [ 0.0001, 0.00001 ]) regularizer = None if (regularization_function == "l1"): regularizer = regularizers.l1(regularization_penalty) elif (regularization_function == "l2"): regularizer = regularizers.l2(regularization_penalty) elif (regularization_function == "l1_l2"): regularizer = regularizers.l1_l2(regularization_penalty, regularization_penalty) dropout = hp.Float("dropout", 0.0, 0.5, step = 0.1) model = models.Sequential() model.add(layers.Input(shape = (trnX.shape[1],))) if (embedding_type == "uninitialized"): model.add(layers.Embedding(vocabularySize, width, mask_zero = True, embeddings_regularizer = regularizer)) elif (embedding_type == "initialized"): model.add(layers.Embedding(vocabularySize, 300, mask_zero = True, embeddings_initializer = initializers.Constant(pretrained), trainable = True)) elif (embedding_type == "frozen"): model.add(layers.Embedding(vocabularySize, 300, mask_zero = True, embeddings_initializer = initializers.Constant(pretrained), trainable = False)) if (dropout > 0.0): model.add(layers.Dropout(float(dropout))) model.add(layers.GlobalAveragePooling1D()) # we're averaging word embeddings model.add(layers.Dense(trnY.max() + 1, activation = "softmax")) batch_size = hp.Choice("batch_size", values = [ 32, 64, 128, 256, 512, 1024 ]) optimizer = hp.Choice("optimizer", values = [ "adam", "rmsprop", "sgd" ]) initial_learning_rate = hp.Choice("learning_rate", values = [ 0.01, 0.001, 0.0001 ]) learning_rate_schedule = hp.Choice("learning_rate_schedule", values = [ "none", "exponential", "cosine" ]) steps_per_epoch = math.ceil(trnX.shape[0] / batch_size) learning_rate = initial_learning_rate if (learning_rate_schedule == "exponential"): learning_rate = optimizers.schedules.ExponentialDecay(initial_learning_rate = initial_learning_rate, decay_steps = steps_per_epoch, decay_rate = 0.95) elif (learning_rate_schedule == "cosine"): learning_rate = experimental.CosineDecayRestarts(initial_learning_rate = initial_learning_rate, first_decay_steps = steps_per_epoch) optimizer = optimizers.Adam(learning_rate = learning_rate) if (optimizer == "rmsprop"): optimizer = optimizers.RMSprop(learning_rate = learning_rate) elif (optimizer == "sgd"): optimizer = optimizers.SGD(learning_rate = learning_rate) model.compile(optimizer = optimizer, loss = "sparse_categorical_crossentropy", metrics = [ "accuracy" ]) return model def fit(self, hp, model, *args, **kwargs): return model.fit(*args, batch_size = hp.get("batch_size"), **kwargs) tuner = Hyperband(CustomHyperModel(), objective = "val_accuracy", max_epochs = 32, hyperband_iterations = 1, directory = "tuning", project_name = "bandit") tuner.search_space_summary() tuner.search(trnX, trnY, validation_data = (valX, valY)) tuner.results_summary() model = tuner.get_best_models(num_models = 1)[0] hyperparameters = tuner.get_best_hyperparameters(num_trials = 1)[0].get_config() print(hyperparameters["values"]) probabilities = model.predict(tstX) classes = probabilities.argmax(axis = -1) predictions = open("predictions.csv", "w") predictions.write("id,class\n") for i in range(tstX.shape[0]): predictions.write(str(i).zfill(5) + "," + str(classes[i]) + "\n") predictions.close() model.summary()