import numpy
import matplotlib.pyplot as plt
import pandas
import math
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Reshape
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
def make_sine_with_noise(_start, _stop, _step, _phase_shift, gain):
x = numpy.arange(_start, _stop, step = _step)
noise = numpy.random.uniform(-0.1, 0.1, size = len(x))
y = gain*0.5*numpy.sin(x+_phase_shift)
y = numpy.add(noise, y)
return x, y
def create_dataset(dataset, look_back=1, look_ahead=1):
dataX, dataY = [], []
for i in range(len(dataset) - look_back - look_ahead - 1):
a = dataset[i:(i + look_back), :]
dataX.append(a)
b = dataset[(i + look_back):(i + look_back + look_ahead), :]
dataY.append(b)
return numpy.array(dataX), numpy.array(dataY)
numpy.random.seed(7)
x1, y1 = make_sine_with_noise(0, 200, 1/24, 0, 1)
x2, y2 = make_sine_with_noise(0, 200, 1/24, math.pi/4, 3)
x3, y3 = make_sine_with_noise(0, 200, 1/24, math.pi/2, 20)
dataframe = pandas.DataFrame({'y1': y1, 'y2': y2, 'x3': y3})
dataset = dataframe.values
dataset = dataset.astype('float32')
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)
train_size = int(len(dataset) * 0.67)
test_size = len(dataset) - train_size
train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:]
look_back = 10
look_ahead = 5
trainX, trainY = create_dataset(train, look_back, look_ahead)
testX, testY = create_dataset(test, look_back, look_ahead)
print(trainX.shape)
print(trainY.shape)
trainX = numpy.reshape(trainX, (trainX.shape[0], trainX.shape[1], trainX.shape[2]))
testX = numpy.reshape(testX, (testX.shape[0], testX.shape[1], testX.shape[2]))
model = Sequential()
model.add(LSTM(look_ahead, input_shape=(trainX.shape[1], trainX.shape[2]), return_sequences=True))
model.add(LSTM(look_ahead, input_shape=(look_ahead, trainX.shape[2])))
model.add(Dense(trainY.shape[1]*trainY.shape[2]))
model.add(Reshape((trainY.shape[1], trainY.shape[2])))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(trainX, trainY, epochs=1, batch_size=1, verbose=1)
trainPredict = model.predict(trainX)
testPredict = model.predict(testX)
model.save('my_sin_prediction_model.h5')
trainPredictPlottable = trainPredict[::look_ahead]
trainPredictPlottable = [item for sublist in trainPredictPlottable for item in sublist]
trainPredictPlottable = scaler.inverse_transform(numpy.array(trainPredictPlottable))
testPredictPlottable = testPredict[::look_ahead]
testPredictPlottable = [item for sublist in testPredictPlottable for item in sublist]
testPredictPlottable = scaler.inverse_transform(numpy.array(testPredictPlottable))
trainPredictPlot = numpy.empty_like(dataset)
trainPredictPlot[:, :] = numpy.nan
trainPredictPlot[look_back:len(trainPredictPlottable)+look_back, :] = trainPredictPlottable
testPredictPlot = numpy.empty_like(dataset)
testPredictPlot[:, :] = numpy.nan
testPredictPlot[len(dataset)-len(testPredictPlottable):len(dataset), :] = testPredictPlottable
dataset = scaler.inverse_transform(dataset)
plt.plot(dataset, color='k')
plt.plot(trainPredictPlot)
plt.plot(testPredictPlot)
plt.show()