Classification of sequences of different lengths

Despite having a few examples , I still don't understand how to classify sequences of different lengths using Keras, similar to this question . I can train a network that determines the frequency of a sinusoid with different lengths using masking:

from keras import models
from keras.layers.recurrent import LSTM
from keras.layers import Dense, Masking
from keras.optimizers import RMSprop
from keras.losses import categorical_crossentropy
from keras.preprocessing.sequence import pad_sequences

import numpy as np


def gen_noise(noise_len, mag):
    return np.random.uniform(size=noise_len) * mag


def gen_sin(t_val, freq):
    return 2 * np.sin(2 * np.pi * t_val * freq)


def train_rnn(x_train, y_train, max_len, mask, number_of_categories):
    epochs = 3
    batch_size = 500

    # three hidden layers of 256 each
    vec_dims = 1
    hidden_units = 256
    in_shape = (max_len, vec_dims)

    model = models.Sequential()

    model.add(Masking(mask, name="in_layer", input_shape=in_shape,))
    model.add(LSTM(hidden_units, return_sequences=False))
    model.add(Dense(number_of_categories, input_shape=(number_of_categories,),
              activation='softmax', name='output'))

    model.compile(loss=categorical_crossentropy, optimizer=RMSprop())

    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs,
              validation_split=0.05)

    return model


def gen_sig_cls_pair(freqs, t_stops, num_examples, noise_magnitude):
    x = []
    y = []

    num_cat = len(freqs)

    dt = 0.01
    max_t = int(np.max(t_stops) / dt)

    for f_i, f in enumerate(freqs):
        for t_stop in t_stops:
            t_range = np.arange(0, t_stop, dt)
            t_len = t_range.size

            for _ in range(num_examples):
                sig = gen_sin(f, t_range) + gen_noise(t_len, noise_magnitude)
                x.append(sig)

                one_hot = np.zeros(num_cat, dtype=np.bool)
                one_hot[f_i] = 1
                y.append(one_hot)

    pad_kwargs = dict(padding='post', maxlen=max_t, value=np.NaN, dtype=np.float32)
    return pad_sequences(x, **pad_kwargs), np.array(y)


if __name__ == '__main__':
    noise_mag = 0.01
    mask_val = -10
    frequencies = (5, 7, 10)
    signal_lengths = (0.8, 0.9, 1)

    x_in, y_in = gen_sig_cls_pair(frequencies, signal_lengths, 50, noise_mag)
    mod = train_rnn(x_in[:, :, None], y_in, 100, mask_val, len(frequencies))

      

However, I don't understand how I am supposed to tell Keras about other sequences. I thought I could disguise them, but when I try, they just output NaN

.

testing_dat, expected = gen_sig_cls_pair(frequencies, signal_lengths, 1, 0)
res = mod.predict(testing_dat[:, :, None])

fig, axes = plt.subplots(3)
axes[0].plot(np.concatenate(testing_dat), label="input")

axes[1].plot(np.argmax(res, axis=1), "ro", label="result", alpha=0.2)
axes[1].plot(np.argmax(expected, axis=1), "bo", label="expected", alpha=0.2)
axes[1].legend(bbox_to_anchor=(1.1, 1))

axes[2].plot(res)

plt.show()

      

How do you create a network that can evaluate inputs of different lengths?

+3


source to share


1 answer


You can pad the input sequences (usually with zeros) or , you can use batches of size 1 with resizable input as stated in fchollet's answer on Keras github:

for seq, label in zip(sequences, y):
    model.train(np.array([seq]), [label])

      

Alternatively, if your problem type allows it, you extract subsequences of the original time series with a length less than the length of the shortest sequences. The third option also allows you to add redundancy to the dataset if your sample size is low and reduces the chances of overfitting.



EDIT:

Seanny123 (OP) pointed out that the fchollet lines above contain model.train

, which is invalid code. He solved the problem using batches of size 1 and the following code:

from keras.models import Sequential
from keras.layers import LSTM, Dense
import numpy as np


def gen_sig(num_samples, seq_len):
    one_indices = np.random.choice(a=num_samples, size=num_samples // 2, replace=False)

    x_val = np.zeros((num_samples, seq_len), dtype=np.bool)
    x_val[one_indices, 0] = 1

    y_val = np.zeros(num_samples, dtype=np.bool)
    y_val[one_indices] = 1

    return x_val, y_val


N_train = 100
N_test = 10
recall_len = 20

X_train, y_train = gen_sig(N_train, recall_len)

X_test, y_test = gen_sig(N_train, recall_len)

print('Build STATEFUL model...')
model = Sequential()
model.add(LSTM(10, batch_input_shape=(1, 1, 1), return_sequences=False, stateful=True))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

print('Train...')
for epoch in range(15):
    mean_tr_acc = []
    mean_tr_loss = []

    for seq_idx in range(X_train.shape[0]):
        start_val = X_train[seq_idx, 0]
        assert y_train[seq_idx] == start_val
        assert tuple(np.nonzero(X_train[seq_idx, :]))[0].shape[0] == start_val

        y_in = np.array([y_train[seq_idx]], dtype=np.bool)

        for j in range(np.random.choice(a=np.arange(5, recall_len+1))):
            x_in = np.array([[[X_train[seq_idx][j]]]])
            tr_loss, tr_acc = model.train_on_batch(x_in, y_in)

            mean_tr_acc.append(tr_acc)
            mean_tr_loss.append(tr_loss)

        model.reset_states()

    print('accuracy training = {}'.format(np.mean(mean_tr_acc)))
    print('loss training = {}'.format(np.mean(mean_tr_loss)))
    print('___________________________________')

    mean_te_acc = []
    mean_te_loss = []
    for seq_idx in range(X_test.shape[0]):
        start_val = X_test[seq_idx, 0]
        assert y_test[seq_idx] == start_val
        assert tuple(np.nonzero(X_test[seq_idx, :]))[0].shape[0] == start_val

        y_in = np.array([y_test[seq_idx]], dtype=np.bool)

        for j in range(np.random.choice(a=np.arange(5, recall_len+1))):
            te_loss, te_acc = model.test_on_batch(np.array([[[X_test[seq_idx][j]]]], dtype=np.bool), y_in)
            mean_te_acc.append(te_acc)
            mean_te_loss.append(te_loss)
        model.reset_states()

    print('accuracy testing = {}'.format(np.mean(mean_te_acc)))
    print('loss testing = {}'.format(np.mean(mean_te_loss)))
print('___________________________________')

      

+2


source







All Articles