Deep learning models are known to be computationally expensive and can take several days to train depending upon the size of dataset. To reduce the processing time the use of GPU and distributed computing using MapReduce can be seen these days. In this post I will show how to combine both of these processing paradigm. Once the learning algorithm is implemented using MapReduce it is possible to use the model on the Elastic Map Reduce(EMR) platform provided by Amazon Web Services (AWS). The code is available on Github.

I will use Mrjob’s MapReduce implementation in Python to implement a simple neural network.

Each mapper or individual machine is equipped with a GPU and uses Theano/Tensorflow for GPU multithreading.

The reducer uses genetic algorithms to speed up the convergence rate with the help of 2 roulette selection – one truncation and other the roulette wheel.

Currently this model is in its development stages for deep learning algorithms. I have implemented this type of model on neural networks but the same idea can be applied to other deep neural models.
 In this post I am omitting the steps to run the model on Cloud platform provided by AWS.
In this posts I am assuming that you are familiar with Python, Neural Networks, MapReduce and Theano/Tensorflow. I will include the details for Neural Networks, MapReduce and Theano/Tensorflow in future blog posts.
THE IDEA
The idea to use mapreduce programming paradigm for machine learning algorithms is introduced in MapReduce for Machine Learning on Multicore. This paper also describes the use of a distributed architecture to speed up algorithms such as SVM, PCA, kmeans, logistic regression, linear regression, EM, and backpropagation. A time complexity analysis in terms of speed up is also shown in the paper. The basic idea of the paper can be explained in following steps:

Pass each training sample to an individual mapper.

Each mapper must compute the gradients by performing backpropagation. Emit the keyvalue pair as key = Id of mapper and value = gradient value.

The task of the reducer is to update the parameters.

One pass through the first 3 steps is one epoch. Iterate the training data through the first 3 steps for multiple epochs.
IMPROVEMENTS
Some improvements can made to the idea given in the above paper. One enhancement is to use batch gradient descent instead of online mode i.e. passing a batch of training samples to each mapper instead of passing an individual sample to a mapper. The problem with online mode is the requirement of as many mappers as the number of samples, which is definitely a bottleneck. A further improvement can be made by modifying the reducer to perform a hybrid update of parameters. This type of improvement is introduced in The Improved BP Algorithm Based on MapReduce and Genetic Algorithm.
Finally, I am going to use the idea described in these two papers and combine the MapReduce paradigm with GPU computing using Theano/Tensorflow.
The working of the model as shown above can be explained in following steps:
 Data in training samples is divided into mini batches each of size K and passed to the mappers along with initial weights.
 Each mapper performs forward propagation and backpropagation using Theano on GPU.
 Backpropagation is carried out by each mapper.
 Once the new weights are passed to the reducer, the weights are updated using the modified GA.
 The parent selection is done using truncation selection based on the fitness, which is the inverse of the error or cost of each mapper (as computed by cross entropy).
 During the crossover several sets of parameters are generated based on various ways with which crossover can be performed.
 There is a roulette wheel for the selection of new parameters which are then passed to next training phase.
DESIGNING THE MAPPER CONSTRUCTOR
Initialize the parameters globally or in the main class (these must be copied to all the mapper). The parameters to be included are:
 Prepare the dataset for training the model.
 Initializing the weights in the GPU memory.
 Architecture of the Neural Network – number of neurons, layers, learning rate, regularization parameter, etc.
if __name__ == '__main__': inp_size = 400 #number of dimension in the input hidden_size = 25 #number of neuron in the hidden layer out_size = 10 #number of neurons in the output layer reg = 0.01 #regularisation parameter alpha = 0.01 #learning rate m = 50 #number of samples dic = [10] final_cost = [] parent = {} w1 = theano.shared(rn.randn(inp_size,hidden_size).astype('float32'),name='w1') w2 = theano.shared(rn.randn(hidden_size,out_size).astype('float32'),name='w2') b1 = theano.shared(np.zeros(hidden_size).astype('float32'),name='b1') b2 = theano.shared(np.ones(out_size).astype('float32'),name='b2')
Once all the weights are initialized in the main class, the next step is to call the Mrjob’s run module.
MRHadoopFormatJob.run()
Mrjob provides a lot of programming constructs and the data processing is relatively easier than Mincemeat, Octopy, etc. The next step is to build a constructor to a mapper which will be called during the initialization of each mapper. This way we have the flexibility to pass specific data values and other parameters to each mapper at the time of their creation. According to above figure, we need to iterate through the mapperreducer pair for every epoch. Here are few observations that will make it easier to understand the basic idea:
 The first time a mapper is called it will process the data provided by the main class.
 After the first epoch, the mapper will receive the data from the reducer and not from the main class.
For this reason I have created two mapper functions mapper1 and mapper2. mapper1 will be called only during the first epoch while mapper2 will be used for later epochs. The design of constructor for either mappers is same. The training data is initialized as
def init_mapper(self): self.count = 1 self.gradients = [] self.final_cost = [] self.x = theano.shared(np.matrix(np.zeros((inp_size,1))).astype('float32'),'x') self.y = theano.shared(np.matrix(np.zeros((out_size,1))).astype('float32'),'y')
Here, the data is stored in matrix x and y, which are saved as GPU variables. The forwardpropagation is computed as
is the weight matrix corresponding to the inputhidden layers. is the activation of the hidden layers and is also denoted by h.
a = T.tanh(self.x.dot(w1)+b1) h = T.nnet.softmax(a.dot(w2)+b2)
The loss function that I have used is crossentropy and is computed as
where m is the total number of training samples and K is the number of neurons in the output layers. L2regularization technique can also be added to prevent overfitting of parameters and is given by
is the regularization parameter and L is the number of layer in the neural network. The loss function along with L2regularization is given as
loss_reg = 1./m* reg/2 * ((w1**2).sum()+(w2**2).sum()) cost = T.nnet.categorical_crossentropy(h,self.y).mean()*(2./m) + loss_reg
We have completed the forward propagation and now its time for computing the gradients using backpropagation. Mathematically backpropagation is given by
Fortunately, we don’t have to write the complete code for backpropagation. As we are using Theano so we will build a Theano graph and use auto differentiation utility for the computation of gradients. The gradients are computed as
pred = T.argmax(h,axis=1) gw1 = T.grad(cost,w1) gw2 = T.grad(cost,w2) gb1 = T.grad(cost,b1) gb2 = T.grad(cost,b2) #self.forward_prop = theano.function([],h) self.compute_cost = theano.function([],outputs=[cost,gw1,gw2,gb1,gb2]) #self.predict = theano.function([],pred)
The mapper constructor initializes the weights that are stored in the GPU memory. At this point a Theano graph is built which will be used by the mapper for computing the gradients.
DESIGNING THE MAPPERS
Once the mapper constructor is initialized the control is passed to the mappers. The mapper1 is designed as
def mapper1(self, key, value): value = map(float,value.split(',')) x_train = np.array(value[:1]) y_train = np.zeros(out_size) y_train[value[1]] = 1 y_train = np.matrix(y_train) x_train = np.matrix(x_train) self.x.set_value(x_train.astype('float32')) self.y.set_value(y_train.astype('float32')) grads = self.compute_cost() b = jobconf_from_env('mapreduce.task.partition') if self.count % 50 == 0: print 'cost is ',float(grads[0]),' mapper',b,' iteration :: ', self.count if len(self.gradients) == 0: self.gradients = grads else: for i in range(0,5): self.gradients[i] += grads[i] self.count+=1 yield b, float(grads[0])
The data is stored as x_train and y_train. For computing gradients the Theano graph is called using the self keyword which is defined in the mapper constructor. Here, the compute_cost() function is called with initiates the gradient computations. The mapper emits key as the id of local mapper and value as the gradients.
The code for mapper2 is identical to mapper1, the only difference is the calling procedures.
I have also included a powerful feature of Mrjob and that is final mapper. The final mapper is analogous to mapper constructor, the only difference is that it is called after all the mappers produced by a particular machine have completed their processing. The mapper final is used to update the weights locally and it performs some of the functions of the combiner.
def final_mapper(self): final_cost.append(self.gradients[0] / self.count) temp_w1 = w1.get_value() temp_w2 = w2.get_value() temp_b1 = b1.get_value() temp_b2 = b2.get_value() self.gradients[1] = temp_w1  (self.alpha*self.gradients[1]) self.gradients[2] = temp_w2  (self.alpha*self.gradients[2]) self.gradients[3] = temp_b1  (self.alpha*np.asarray(self.gradients[3])) self.gradients[4] = temp_b2  (self.alpha*np.asarray(self.gradients[4])) parent[b] = self.gradients self.gradients = []
DESIGNING THE REDUCER
The task of the reducer is to update the weights with the help of gradients computed by the mappers. The reducer is designed as
def reducer(self,key,value): cost = sum(value) temp1 = parent['0'] temp2 = parent['1'] all_weights = [] for i in range(1,3): mid = temp1[i].shape[0] / 2 temp3 = np.matrix(np.zeros((temp1[i].shape))) temp3 [0:mid,:] = np.matrix(temp1[i])[0:mid,:] temp3 [mid:,:] = np.matrix(temp2[i])[mid:,:] all_weights.append(temp3) w1.set_value(all_weights[0].astype('float32')) w2.set_value(all_weights[1].astype('float32')) b1.set_value((np.asarray(temp1[3])).astype('float32')) b2.set_value((np.asarray(temp2[4])).astype('float32')) for i in data: yield i,1
The reducer performs the mutation with the help of truncation and roulette selection. For now, I have omitted the random selection of crossovers for either of the techniques. Since the weights are initialized globally, thus the changes are visible to all of the mappers for the next epoch. The reducer emit the entire data for the mappers for another iteration (batch mode is also omitted and for now I am staying with the online mode).
Finally, we need to call the mapperreducer pair for every iteration. This is achieved with the step function utility of Mrjob and is called as
def steps(self): return [ MRStep(mapper_init=self.init_mapper,mapper=self.mapper1, reducer=self.reducer,mapper_final=self.final_mapper)]
The step function controls the flow of mapper and reducers. The parameters to the step function are
 mapper_init
 mapper
 reducer
 mapper_final
 reducer_final
For multiple epochs the MRStep is called inside a loop and the model is trained over the training data many times.
WHAT’S NEXT ?
The code is available on Github and you can play around with it. In this post I have explained the idea to use MapReduce and Theano to speed up a simple neural network. The same idea can be applied to complex neural networks such as CNN, RNN, LSTM, GRUs, etc. In future posts I will try to generalize the given idea to these deep models.