Deep Cloud

Deep learning models are known to be computationally expensive and can take several days to train depending upon the size of data-set. To reduce the processing time the use of GPU and distributed computing using MapReduce can be seen these days. In this post I will show how to combine both of these processing paradigm. Once the learning algorithm is implemented using MapReduce it is possible to use the model on the Elastic Map Reduce(EMR) platform provided by Amazon Web Services (AWS). The code is available on Github.

  1. I will use Mrjob’s MapReduce implementation in Python to implement a simple neural network.
  2. Each mapper or individual machine is equipped with a GPU and uses Theano/Tensorflow for GPU multi-threading.
  3. The reducer uses genetic algorithms to speed up the convergence rate with the help of 2 roulette selection – one truncation and other the roulette wheel.
  4. Currently this model is in its development stages  for deep learning algorithms. I have implemented this type of model on neural networks but the same idea can be applied to other deep neural models.
  5. In this post I am omitting the steps to run the model on Cloud platform provided by AWS.

In this posts I am assuming that you are familiar with Python, Neural Networks, MapReduce and Theano/Tensorflow. I will include the details for Neural Networks, MapReduce and Theano/Tensorflow in future blog posts.

THE IDEA

The idea to use map-reduce programming paradigm for machine learning algorithms is introduced in Map-Reduce for Machine Learning on Multicore. This paper also describes the use of a distributed architecture to speed up algorithms such as SVM, PCA, k-means, logistic regression, linear regression, EM, and back-propagation. A time complexity analysis in terms of speed up is also shown in the paper. The basic idea of the paper can be explained in following steps:

  • Pass each training sample to an individual mapper.
  • Each mapper must compute the gradients by performing back-propagation. Emit the key-value pair as key = Id of mapper and value = gradient value.
  • The task of the reducer is to update the parameters.
  • One pass through the first 3 steps is one epoch. Iterate the training data through the first 3 steps for multiple epochs.

IMPROVEMENTS

Some improvements can made to the idea given in the above paper. One enhancement is to use batch gradient descent instead of  online mode i.e. passing a batch of training samples to each mapper instead of passing an individual sample to a mapper.  The problem with online mode is the requirement of as many mappers as the number of samples, which is definitely a bottleneck. A further improvement can be made by modifying the reducer to perform a hybrid update of parameters. This type of improvement is introduced in The Improved BP Algorithm Based on MapReduce and Genetic Algorithm.

Finally, I am going to use the idea described in these two papers and combine the MapReduce paradigm with GPU computing using Theano/Tensorflow.

Picture2

The working of the model as shown above can be explained in following steps:

  1. Data in training samples is divided into mini batches each of size K and passed to the mappers along with initial weights.
  2. Each mapper performs forward propagation and back-propagation using Theano on GPU.
  3. Back-propagation is carried out by each mapper.
  4. Once the new weights are passed to the reducer, the weights are updated using the modified GA.
    • The parent selection is done using truncation selection based on the fitness, which is the inverse of the error or cost of each mapper (as computed by cross entropy).
    • During the crossover several sets of parameters are generated based on various ways with which crossover can be performed.
    • There is a roulette wheel for the selection of new parameters which are then passed to next training phase.

DESIGNING THE MAPPER CONSTRUCTOR

Initialize the parameters globally or in the main class (these must be copied to all the mapper). The parameters to be included are:

  • Prepare the data-set for training the model.
  • Initializing the weights in the GPU memory.
  • Architecture of the Neural Network – number of neurons, layers, learning rate, regularization parameter, etc.
if __name__ == '__main__':
     inp_size = 400      #number of dimension in the input
     hidden_size = 25    #number of neuron in the hidden layer
     out_size = 10       #number of neurons in the output layer
     reg = 0.01          #regularisation parameter
     alpha = 0.01        #learning rate
     m = 50              #number of samples
     dic = [10]
     final_cost = []
     parent = {}
     w1 = theano.shared(rn.randn(inp_size,hidden_size).astype('float32'),name='w1')
     w2 = theano.shared(rn.randn(hidden_size,out_size).astype('float32'),name='w2')
     b1 = theano.shared(np.zeros(hidden_size).astype('float32'),name='b1')
     b2 = theano.shared(np.ones(out_size).astype('float32'),name='b2')

Once all the weights are initialized in the main class, the next step is to call the Mrjob’s run module.

MRHadoopFormatJob.run()

Mrjob provides a lot of programming constructs and the data processing is relatively easier than Mincemeat, Octopy, etc. The next step is to build a constructor to a mapper which will be called during the initialization of each mapper. This way we have the flexibility to pass specific data values and other parameters to each mapper at the time of their creation. According to above figure, we need to iterate through the mapper-reducer pair for every epoch. Here are few observations that will make it easier to understand the basic idea:

  • The first time a mapper is called it will process the data provided by the main class.
  • After the first epoch, the mapper will receive the data from the reducer and not from the main class.

For this reason I have created two mapper functions mapper1 and mapper2. mapper1 will be called only during the first epoch while mapper2 will be used for later epochs. The design of constructor for either mappers is same. The training data is initialized as

def init_mapper(self):
     self.count = 1
     self.gradients = []
     self.final_cost = []
     self.x = theano.shared(np.matrix(np.zeros((inp_size,1))).astype('float32'),'x')
     self.y = theano.shared(np.matrix(np.zeros((out_size,1))).astype('float32'),'y')

Here, the data is stored in matrix x and y, which are saved as GPU variables. The forward-propagation is computed as

bp1

\theta^{(1)} is the weight matrix corresponding to the input-hidden layers. {a}^{(2)} is the activation of the hidden layers and is also denoted by h.

a = T.tanh(self.x.dot(w1)+b1)
h = T.nnet.softmax(a.dot(w2)+b2)

The loss function that I have used is cross-entropy and is computed as

bp3

where m is the total number of training samples and K is the number of neurons in the output layers. L2-regularization technique can also be added to prevent over-fitting of parameters and is given by

bp4

 \lambda is the regularization parameter and L is the number of layer in the neural network. The loss function along with L2-regularization is given as

loss_reg = 1./m* reg/2 * ((w1**2).sum()+(w2**2).sum())
cost = T.nnet.categorical_crossentropy(h,self.y).mean()*(2./m) + loss_reg

We have completed the forward propagation and now its time for computing the gradients using back-propagation. Mathematically back-propagation is given by

bp5

Fortunately, we don’t have to write the complete code for back-propagation. As we are using Theano so we will build a Theano graph and use auto differentiation utility for the computation of gradients.  The gradients are computed as

pred = T.argmax(h,axis=1)
gw1 = T.grad(cost,w1)
gw2 = T.grad(cost,w2)
gb1 = T.grad(cost,b1)
gb2 = T.grad(cost,b2)
#self.forward_prop = theano.function([],h)
self.compute_cost = theano.function([],outputs=[cost,gw1,gw2,gb1,gb2])
#self.predict = theano.function([],pred) 

The mapper constructor initializes the weights that are stored in the GPU memory. At this point a Theano graph is built which will be used by the mapper for computing the gradients.

DESIGNING THE MAPPERS

Once the mapper constructor is initialized the control is passed to the mappers. The mapper1 is designed as

def mapper1(self, key, value):
     value = map(float,value.split(','))
     x_train = np.array(value[:-1])
     y_train = np.zeros(out_size)
     y_train[value[-1]] = 1
     y_train = np.matrix(y_train)
     x_train = np.matrix(x_train)
     self.x.set_value(x_train.astype('float32'))
     self.y.set_value(y_train.astype('float32'))
     grads = self.compute_cost()
     b = jobconf_from_env('mapreduce.task.partition')
     if self.count % 50 == 0:
          print 'cost is ',float(grads[0]),' mapper',b,' iteration :: ', self.count
     if len(self.gradients) == 0:
          self.gradients = grads
     else:
          for i in range(0,5):
               self.gradients[i] += grads[i]
               self.count+=1
     yield b, float(grads[0])

The data is stored as x_train and y_train. For computing gradients the Theano graph is called using the self keyword which is defined in the mapper constructor. Here, the compute_cost() function is called with initiates the gradient computations. The mapper emits key as the id of local mapper and value as the gradients.

The code for mapper2 is identical to mapper1, the only difference is the calling procedures.

I have also included a powerful feature of Mrjob and that is final mapper. The final mapper is analogous to mapper constructor, the only difference is that it is called after all the mappers produced by a particular machine have completed their processing. The mapper final is used to update the weights locally and it performs some of the functions of the combiner.

def final_mapper(self):
     final_cost.append(self.gradients[0] / self.count)
     temp_w1 = w1.get_value()
     temp_w2 = w2.get_value()
     temp_b1 = b1.get_value()
     temp_b2 = b2.get_value()
     self.gradients[1] = temp_w1 - (self.alpha*self.gradients[1])
     self.gradients[2] = temp_w2 - (self.alpha*self.gradients[2])
     self.gradients[3] = temp_b1 - (self.alpha*np.asarray(self.gradients[3]))
     self.gradients[4] = temp_b2 - (self.alpha*np.asarray(self.gradients[4]))
     parent[b] = self.gradients
     self.gradients = []

DESIGNING THE REDUCER

The task of the reducer is to update the weights with the help of gradients computed by the mappers. The reducer is designed as

def reducer(self,key,value):
     cost = sum(value)
     temp1 = parent['0']
     temp2 = parent['1']
     all_weights = []
     for i in range(1,3):
          mid = temp1[i].shape[0] / 2
          temp3 = np.matrix(np.zeros((temp1[i].shape)))
          temp3 [0:mid,:] = np.matrix(temp1[i])[0:mid,:]
          temp3 [mid:,:] = np.matrix(temp2[i])[mid:,:]
          all_weights.append(temp3)
          w1.set_value(all_weights[0].astype('float32'))
          w2.set_value(all_weights[1].astype('float32'))
          b1.set_value((np.asarray(temp1[3])).astype('float32'))
          b2.set_value((np.asarray(temp2[4])).astype('float32'))

     for i in data:
          yield i,1

The reducer performs the mutation with the help of truncation and roulette selection. For now, I have omitted the random selection of cross-overs for either of the techniques. Since the weights are initialized globally, thus the changes are visible to all of the mappers for the next epoch. The reducer emit the entire data for the mappers for another iteration (batch mode is also omitted and for now I am staying with the online mode).

Finally, we need to call the mapper-reducer pair for every iteration. This is achieved with the step function utility of Mrjob and is called as

def steps(self):
     return [ MRStep(mapper_init=self.init_mapper,mapper=self.mapper1,
              reducer=self.reducer,mapper_final=self.final_mapper)]

The step function controls the flow of mapper and reducers. The parameters to the step function are

  • mapper_init
  • mapper
  • reducer
  • mapper_final
  • reducer_final

For multiple epochs the MRStep is called inside a loop and the model is trained over the training data many times.

WHAT’S NEXT ?

The code is available on Github and you can play around with it. In this post I have explained the idea to use MapReduce and Theano to speed up a simple neural network. The same idea can be applied to complex neural networks such as CNN, RNN, LSTM, GRUs, etc. In future posts I will try to generalize the given idea to these deep models.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s