本文共 14442 字,大约阅读时间需要 48 分钟。
@author:wepon
@blog:
本文介绍多层感知机算法,特别是详细解读其代码实现,基于python theano,代码来自:。经详细注释的代码和原始代码:。
一、CNN卷积神经网络原理简介
要讲明白卷积神经网络,估计得长篇大论,网上有很多博文已经写得很好了,所以本文就不重复了,如果你了解CNN,那可以往下看,本文主要是详细地解读CNN的实现代码。如果你没学习过CNN,在此推荐周晓艺师兄的博文:,以及UFLDL上的、
CNN的最大特点就是稀疏连接(局部感受)和权值共享,如下面两图所示,左为稀疏连接,右为权值共享。稀疏连接和权值共享可以减少所要训练的参数,减少计算复杂度。
至于CNN的结构,以经典的LeNet5来说明:
这个图真是无处不在,一谈CNN,必说LeNet5,这图来自于这篇论文:,论文很长,第7页那里开始讲LeNet5这个结构,建议看看那部分。
我这里简单说一下,LeNet5这张图从左到右,先是input,这是输入层,即输入的图片。input-layer到C1这部分就是一个卷积层(convolution运算),C1到S2是一个子采样层(pooling运算),关于卷积和子采样的具体过程可以参考下图:
然后,S2到C3又是卷积,C3到S4又是子采样,可以发现,卷积和子采样都是成对出现的,卷积后面一般跟着子采样。S4到C5之间是全连接的,这就相当于一个MLP的隐含层了(如果你不清楚MLP,参考《》)。C5到F6同样是全连接,也是相当于一个MLP的隐含层。最后从F6到输出output,其实就是一个分类器,这一层就叫分类层。
ok,CNN的基本结构大概就是这样,由输入、卷积层、子采样层、全连接层、分类层、输出这些基本“构件”组成,一般根据具体的应用或者问题,去确定要多少卷积层和子采样层、采用什么分类器。当确定好了结构以后,如何求解层与层之间的连接参数?一般采用向前传播(FP)+向后传播(BP)的方法来训练。具体可参考上面给出的链接。
二、CNN卷积神经网络代码详细解读(基于python+theano)
代码来自于深度学习教程:,这个代码实现的是一个简化了的LeNet5,具体如下:
- 没有实现location-specific gain and bias parameters
- 用的是maxpooling,而不是average_pooling
- 分类器用的是softmax,LeNet5用的是rbf
- LeNet5第二层并不是全连接的,本程序实现的是全连接
另外,代码里将卷积层和子采用层合在一起,定义为“LeNetConvPoolLayer“(卷积采样层),这好理解,因为它们总是成对出现。但是有个地方需要注意,代码中将卷积后的输出直接作为子采样层的输入,而没有加偏置b再通过sigmoid函数进行映射,即没有了下图中fx后面的bx以及sigmoid映射,也即直接由fx得到Cx。
最后,代码中第一个卷积层用的卷积核有20个,第二个卷积层用50个,而不是上面那张LeNet5图中所示的6个和16个。
了解了这些,下面看代码:
(1)导入必要的模块
- import cPickle
- import gzip
- import os
- import sys
- import time
-
- import numpy
-
- import theano
- import theano.tensor as T
- from theano.tensor.signal import downsample
- from theano.tensor.nnet import conv
(2)定义CNN的基本"构件"
CNN的基本构件包括卷积采样层、隐含层、分类器,如下
- 定义LeNetConvPoolLayer(卷积+采样层)
见代码注释: -
-
-
-
-
-
-
-
- class LeNetConvPoolLayer(object):
- def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
-
-
-
- assert image_shape[1] == filter_shape[1]
- self.input = input
-
-
-
- fan_in = numpy.prod(filter_shape[1:])
-
-
- fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) /
- numpy.prod(poolsize))
-
-
- W_bound = numpy.sqrt(6. / (fan_in + fan_out))
- self.W = theano.shared(
- numpy.asarray(
- rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
- dtype=theano.config.floatX
- ),
- borrow=True
- )
-
-
-
-
- b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
- self.b = theano.shared(value=b_values, borrow=True)
-
-
-
- conv_out = conv.conv2d(
- input=input,
- filters=self.W,
- filter_shape=filter_shape,
- image_shape=image_shape
- )
-
-
- pooled_out = downsample.max_pool_2d(
- input=conv_out,
- ds=poolsize,
- ignore_border=True
- )
-
-
-
-
- self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
-
- self.params = [self.W, self.b]
这个跟上一篇文章《 》中的HiddenLayer是一致的,直接拿过来: -
-
-
-
-
-
-
-
-
-
- class HiddenLayer(object):
- def __init__(self, rng, input, n_in, n_out, W=None, b=None,
- activation=T.tanh):
- self.input = input
-
-
-
-
-
-
-
-
-
- if W is None:
- W_values = numpy.asarray(
- rng.uniform(
- low=-numpy.sqrt(6. / (n_in + n_out)),
- high=numpy.sqrt(6. / (n_in + n_out)),
- size=(n_in, n_out)
- ),
- dtype=theano.config.floatX
- )
- if activation == theano.tensor.nnet.sigmoid:
- W_values *= 4
- W = theano.shared(value=W_values, name='W', borrow=True)
-
- if b is None:
- b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
- b = theano.shared(value=b_values, name='b', borrow=True)
-
-
- self.W = W
- self.b = b
-
-
- lin_output = T.dot(input, self.W) + self.b
- self.output = (
- lin_output if activation is None
- else activation(lin_output)
- )
-
-
- self.params = [self.W, self.b]
采用Softmax,这跟《》中的LogisticRegression是一样的,直接拿过来: -
-
-
-
-
-
-
-
-
-
- class LogisticRegression(object):
- def __init__(self, input, n_in, n_out):
-
-
- self.W = theano.shared(
- value=numpy.zeros(
- (n_in, n_out),
- dtype=theano.config.floatX
- ),
- name='W',
- borrow=True
- )
-
- self.b = theano.shared(
- value=numpy.zeros(
- (n_out,),
- dtype=theano.config.floatX
- ),
- name='b',
- borrow=True
- )
-
-
-
-
-
-
- self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
-
-
- self.y_pred = T.argmax(self.p_y_given_x, axis=1)
-
-
- self.params = [self.W, self.b]
到这里,CNN的基本”构件“都有了,下面要用这些”构件“组装成LeNet5(当然,是简化的,上面已经说了),具体来说,就是组装成:LeNet5=input+LeNetConvPoolLayer_1+LeNetConvPoolLayer_2+HiddenLayer+LogisticRegression+output。 然后将其应用于MNIST数据集,用BP算法去解这个模型,得到最优的参数。
(3)加载MNIST数据集()
-
-
-
- def load_data(dataset):
-
-
- data_dir, data_file = os.path.split(dataset)
- if data_dir == "" and not os.path.isfile(dataset):
-
- new_path = os.path.join(
- os.path.split(__file__)[0],
- "..",
- "data",
- dataset
- )
- if os.path.isfile(new_path) or data_file == 'mnist.pkl.gz':
- dataset = new_path
-
- if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz':
- import urllib
- origin = (
- 'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz'
- )
- print 'Downloading data from %s' % origin
- urllib.urlretrieve(origin, dataset)
-
- print '... loading data'
-
-
-
-
-
- f = gzip.open(dataset, 'rb')
- train_set, valid_set, test_set = cPickle.load(f)
- f.close()
-
-
-
-
- def shared_dataset(data_xy, borrow=True):
- data_x, data_y = data_xy
- shared_x = theano.shared(numpy.asarray(data_x,
- dtype=theano.config.floatX),
- borrow=borrow)
- shared_y = theano.shared(numpy.asarray(data_y,
- dtype=theano.config.floatX),
- borrow=borrow)
- return shared_x, T.cast(shared_y, 'int32')
-
-
- test_set_x, test_set_y = shared_dataset(test_set)
- valid_set_x, valid_set_y = shared_dataset(valid_set)
- train_set_x, train_set_y = shared_dataset(train_set)
-
- rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y),
- (test_set_x, test_set_y)]
- return rval
(4)实现LeNet5并测试
-
-
-
-
- def evaluate_lenet5(learning_rate=0.1, n_epochs=200,
- dataset='mnist.pkl.gz',
- nkerns=[20, 50], batch_size=500):
-
-
-
-
-
-
-
-
- rng = numpy.random.RandomState(23455)
-
-
- datasets = load_data(dataset)
- train_set_x, train_set_y = datasets[0]
- valid_set_x, valid_set_y = datasets[1]
- test_set_x, test_set_y = datasets[2]
-
-
- n_train_batches = train_set_x.get_value(borrow=True).shape[0]
- n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
- n_test_batches = test_set_x.get_value(borrow=True).shape[0]
- n_train_batches /= batch_size
- n_valid_batches /= batch_size
- n_test_batches /= batch_size
-
-
- index = T.lscalar()
- x = T.matrix('x')
- y = T.ivector('y')
-
-
-
-
- print '... building the model'
-
-
-
- layer0_input = x.reshape((batch_size, 1, 28, 28))
-
-
-
-
-
-
- layer0 = LeNetConvPoolLayer(
- rng,
- input=layer0_input,
- image_shape=(batch_size, 1, 28, 28),
- filter_shape=(nkerns[0], 1, 5, 5),
- poolsize=(2, 2)
- )
-
-
-
-
-
-
-
- layer1 = LeNetConvPoolLayer(
- rng,
- input=layer0.output,
- image_shape=(batch_size, nkerns[0], 12, 12),
- filter_shape=(nkerns[1], nkerns[0], 5, 5),
- poolsize=(2, 2)
- )
-
-
-
-
-
-
-
- layer2_input = layer1.output.flatten(2)
- layer2 = HiddenLayer(
- rng,
- input=layer2_input,
- n_in=nkerns[1] * 4 * 4,
- n_out=500,
- activation=T.tanh
- )
-
-
-
- layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
-
-
- cost = layer3.negative_log_likelihood(y)
-
-
-
-
- test_model = theano.function(
- [index],
- layer3.errors(y),
- givens={
- x: test_set_x[index * batch_size: (index + 1) * batch_size],
- y: test_set_y[index * batch_size: (index + 1) * batch_size]
- }
- )
-
- validate_model = theano.function(
- [index],
- layer3.errors(y),
- givens={
- x: valid_set_x[index * batch_size: (index + 1) * batch_size],
- y: valid_set_y[index * batch_size: (index + 1) * batch_size]
- }
- )
-
-
-
- params = layer3.params + layer2.params + layer1.params + layer0.params
-
-
- grads = T.grad(cost, params)
-
-
- updates = [
- (param_i, param_i - learning_rate * grad_i)
- for param_i, grad_i in zip(params, grads)
- ]
-
-
- train_model = theano.function(
- [index],
- cost,
- updates=updates,
- givens={
- x: train_set_x[index * batch_size: (index + 1) * batch_size],
- y: train_set_y[index * batch_size: (index + 1) * batch_size]
- }
- )
-
-
-
-
-
- print '... training'
- patience = 10000
- patience_increase = 2
- improvement_threshold = 0.995
-
- validation_frequency = min(n_train_batches, patience / 2)
-
-
- best_validation_loss = numpy.inf
- best_iter = 0
- test_score = 0.
- start_time = time.clock()
-
- epoch = 0
- done_looping = False
-
-
-
-
-
-
-
-
-
- while (epoch < n_epochs) and (not done_looping):
- epoch = epoch + 1
- for minibatch_index in xrange(n_train_batches):
-
- iter = (epoch - 1) * n_train_batches + minibatch_index
-
- if iter % 100 == 0:
- print 'training @ iter = ', iter
- cost_ij = train_model(minibatch_index)
-
- if (iter + 1) % validation_frequency == 0:
-
-
- validation_losses = [validate_model(i) for i
- in xrange(n_valid_batches)]
- this_validation_loss = numpy.mean(validation_losses)
- print('epoch %i, minibatch %i/%i, validation error %f %%' %
- (epoch, minibatch_index + 1, n_train_batches,
- this_validation_loss * 100.))
-
-
- if this_validation_loss < best_validation_loss:
-
-
- if this_validation_loss < best_validation_loss * \
- improvement_threshold:
- patience = max(patience, iter * patience_increase)
-
-
- best_validation_loss = this_validation_loss
- best_iter = iter
-
-
- test_losses = [
- test_model(i)
- for i in xrange(n_test_batches)
- ]
- test_score = numpy.mean(test_losses)
- print((' epoch %i, minibatch %i/%i, test error of '
- 'best model %f %%') %
- (epoch, minibatch_index + 1, n_train_batches,
- test_score * 100.))
-
- if patience <= iter:
- done_looping = True
- break
-
- end_time = time.clock()
- print('Optimization complete.')
- print('Best validation score of %f %% obtained at iteration %i, '
- 'with test performance %f %%' %
- (best_validation_loss * 100., best_iter + 1, test_score * 100.))
- print >> sys.stderr, ('The code for file ' +
- os.path.split(__file__)[1] +
- ' ran for %.2fm' % ((end_time - start_time) / 60.))
文章完,经详细注释的代码和原始代码:。