import tensorflow as tf
import numpy as np
from tqdm import tqdm_notebook as tqdm
import numpy as np
class MatrixFactorization():
def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
self._R = R
self._num_users, self._num_items = R.shape
self._k = k
self._learning_rate = learning_rate
self._reg_param = reg_param
self._epochs = epochs
self._verbose = verbose
def fit(self):
# init latent features
self._P = np.random.normal(size=(self._num_users, self._k))
self._Q = np.random.normal(size=(self._num_items, self._k))
# init biases
self._b_P = np.zeros(self._num_users)
self._b_Q = np.zeros(self._num_items)
self._b = np.mean(self._R[np.where(self._R != 0)])
# train while epochs
self._training_process = []
for epoch in range(self._epochs):
# rating이 존재하는 index를 기준으로 training
xi, yi = self._R.nonzero()
for i, j in zip(xi, yi):
self.gradient_descent(i, j, self._R[i, j])
cost = self.cost()
self._training_process.append((epoch, cost))
# print status
if self._verbose == True and ((epoch + 1) % 10 == 0):
print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))
def cost(self):
# xi, yi: R[xi, yi]는 nonzero인 value를 의미한다.
xi, yi = self._R.nonzero()
# predicted = self.get_complete_matrix()
cost = 0
for x, y in zip(xi, yi):
cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
return np.sqrt(cost/len(xi))
def gradient(self, error, i, j):
dp = (error * self._Q[j, :]) - (self._reg_param * self._P[i, :])
dq = (error * self._P[i, :]) - (self._reg_param * self._Q[j, :])
return dp, dq
def gradient_descent(self, i, j, rating):
# get error
prediction = self.get_prediction(i, j)
error = rating - prediction
# update biases
self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])
# update latent feature
dp, dq = self.gradient(error, i, j)
self._P[i, :] += self._learning_rate * dp
self._Q[j, :] += self._learning_rate * dq
def get_prediction(self, i, j):
return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)
def get_complete_matrix(self):
return self._b + self._b_P[:, np.newaxis] + self._b_Q[np.newaxis:, ] + self._P.dot(self._Q.T)
if __name__ == "__main__":
# rating matrix - User X Item : (7 X 5)
R = np.array([
[1, 0, 0, 1, 3],
[2, 0, 3, 1, 1],
[1, 2, 0, 5, 0],
[1, 0, 0, 4, 4],
[2, 1, 5, 4, 0],
[5, 1, 5, 4, 0],
[0, 0, 0, 1, 0],
])
factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True)
factorizer.fit()
Iteration: 10 ; cost = 1.0107
Iteration: 20 ; cost = 0.7752
Iteration: 30 ; cost = 0.6467
Iteration: 40 ; cost = 0.5577
Iteration: 50 ; cost = 0.4867
Iteration: 60 ; cost = 0.4284
Iteration: 70 ; cost = 0.3819
Iteration: 80 ; cost = 0.3464
Iteration: 90 ; cost = 0.3198
Iteration: 100 ; cost = 0.2999
CPU times: total: 31.2 ms
Wall time: 30 ms
factorizer.get_complete_matrix()
array([[ 0.93113407, 5.85191286, 0.61074567, 0.98124048, 3.00616421],
[ 2.05027439, -0.27317341, 2.92785343, 1.03304381, 1.07405177],
[ 1.01752706, 1.87731366, 3.12351349, 4.90951206, 3.74130174],
[ 1.23477111, 0.69967989, 3.38122734, 4.18111723, 3.75932449],
[ 2.53641834, 0.79648006, 4.41039268, 4.15528126, 3.81071349],
[ 4.2588167 , 1.29611224, 5.61053335, 3.72141069, 4.3998465 ],
[ 4.7615015 , 2.27992661, 4.78200748, 1.03600803, 3.65113945]])