import math import numpy as np from tensorflow.keras import callbacks, experimental, layers, models, optimizers from transformer import TransformerEncoder trnX = np.load('trnX.npy') trnY = np.load('trnY.npy') valX = np.load('valX.npy') valY = np.load('valY.npy') tstX = np.load('tstX.npy') trnP = np.tile(np.arange(trnX.shape[1]), (trnX.shape[0], 1)) valP = np.tile(np.arange(valX.shape[1]), (valX.shape[0], 1)) tstP = np.tile(np.arange(tstX.shape[1]), (tstX.shape[0], 1)) mu = np.mean(trnX) sigma = np.std(trnX) trnX = (trnX - mu) / sigma valX = (valX - mu) / sigma tstX = (tstX - mu) / sigma feature_count = 256 dropout_rate = 0.2 transformer_depth = 4 dense_depth = 1 feature_input = layers.Input(shape = trnX.shape[1:]) feature_embedding = layers.Conv1D(feature_count, 2, padding = 'same', activation = 'relu')(feature_input) position_input = layers.Input(shape = (trnX.shape[1])) position_embedding = layers.Embedding(trnX.shape[1], feature_count)(position_input) embedding = layers.Add()([ feature_embedding, position_embedding ]) x = layers.LayerNormalization()(embedding) for i in range(transformer_depth): x = TransformerEncoder(embed_dim = feature_count, dense_dim = 4 * feature_count, num_heads = feature_count // 64, dropout_rate = dropout_rate, name = f'encoder{i+1}')(x) x = layers.Lambda(lambda x: x[:, 0, :])(x) for i in range(dense_depth): x = layers.Dense(320, activation = 'swish')(x) x = layers.Dropout(dropout_rate)(x) output = layers.Dense(trnY.max() + 1, activation = 'softmax')(x) model = models.Model(inputs = [ feature_input, position_input ], outputs = output) for weights in model.get_layer(name = 'encoder1').get_weights(): print(weights.shape) initial_learning_rate = 0.0005 batch_size = 128 learning_rate = experimental.CosineDecayRestarts(initial_learning_rate = initial_learning_rate, first_decay_steps = math.ceil(1.0 * trnX.shape[0] / batch_size), t_mul = 1.0, m_mul = 0.95) optimizer = optimizers.Adam(learning_rate = learning_rate) model.compile(loss = 'sparse_categorical_crossentropy', optimizer = optimizer, metrics = [ 'accuracy' ]) model.summary() callbacks = [ callbacks.EarlyStopping(monitor = 'val_accuracy', patience = 16, restore_best_weights = True) ] history = model.fit([ trnX, trnP ], trnY, batch_size = batch_size, epochs = 32, validation_data = ([ valX, valP ], valY), callbacks = callbacks) probabilities = model.predict([ tstX, tstP ]) classes = probabilities.argmax(axis = -1) predictions = open('predictions.csv', 'w') predictions.write('id,label\n') for i in range(tstX.shape[0]): predictions.write(str(i).zfill(5) + ',' + str(classes[i]) + '\n') predictions.close()