Multi-layer FCN
Implement a full class of Multi-layer FCNN according to these specs (many functions will be shared from the adjacent page )
class FullyConnectedNet ( object ):
"""Class for a multi-layer fully connected neural network.
Network contains an arbitrary number of hidden layers, ReLU nonlinearities,
and a softmax loss function. This will also implement dropout and batch/layer
normalization as options. For a network with L layers, the architecture will be
{affine - [batch/layer norm] - relu - [dropout]} x (L - 1) - affine - softmax
where batch/layer normalization and dropout are optional and the {...} block is
repeated L - 1 times.
Learnable parameters are stored in the self.params dictionary and will be learned
using the Solver class.
"""
def __init__ (
self,
hidden_dims,
input_dim = 3 * 32 * 32 ,
num_classes = 10 ,
dropout_keep_ratio = 1 ,
normalization = None ,
reg = 0.0 ,
weight_scale = 1e-2 ,
dtype = np.float32,
seed = None ,
):
"""Initialize a new FullyConnectedNet.
Inputs:
- hidden_dims: A list of integers giving the size of each hidden layer.
- input_dim: An integer giving the size of the input.
- num_classes: An integer giving the number of classes to classify.
- dropout_keep_ratio: Scalar between 0 and 1 giving dropout strength.
If dropout_keep_ratio=1 then the network should not use dropout at all.
- normalization: What type of normalization the network should use. Valid values
are "batchnorm", "layernorm", or None for no normalization (the default).
- reg: Scalar giving L2 regularization strength.
- weight_scale: Scalar giving the standard deviation for random
initialization of the weights.
- dtype: A numpy datatype object; all computations will be performed using
this datatype. float32 is faster but less accurate, so you should use
float64 for numeric gradient checking.
- seed: If not None, then pass this random seed to the dropout layers.
This will make the dropout layers deteriminstic so we can gradient check the model.
"""
self .normalization = normalization
self .use_dropout = dropout_keep_ratio != 1
self .reg = reg
self .num_layers = 1 + len (hidden_dims)
self .dtype = dtype
self .params = {}
# YOUR CODE HERE
def loss (self, X, y = None ):
"""Compute loss and gradient for the fully connected net.
Inputs:
- X: Array of input data of shape (N, d_1, ..., d_k)
- y: Array of labels, of shape (N,). y[i] gives the label for X[i].
Returns:
If y is None, then run a test-time forward pass of the model and return:
- scores: Array of shape (N, C) giving classification scores, where
scores[i, c] is the classification score for X[i] and class c.
If y is not None, then run a training-time forward and backward pass and
return a tuple of:
- loss: Scalar value giving the loss
- grads: Dictionary with the same keys as self.params, mapping parameter
names to gradients of the loss with respect to those parameters.
"""
X = X.astype( self .dtype)
mode = "test" if y is None else "train"
# Set train/test mode for batchnorm params and dropout param since they
# behave differently during training and testing.
if self .use_dropout:
self .dropout_param[ "mode" ] = mode
if self .normalization == "batchnorm" :
for bn_param in self .bn_params:
bn_param[ "mode" ] = mode
scores = None
# YOUR CODE HERE
return loss, grads
Answer
Initialization :
Loop through dimnsions and output
Initialize weights to np.random.randn
Set bias to zeros
If using batchnorm, set γ and β appropriately (0s, 1s)
Set parameters for batchnorm, dropout, layernorm
Loss/Grad :
For each layer
Run forward pass
Get final score
Get loss and output gradient from the scores (remember to rgularize)
For each layer (in reversed order)
Update that layer’s gradients with backward pass
class FullyConnectedNet ( object ):
"""Class for a multi-layer fully connected neural network.
Network contains an arbitrary number of hidden layers, ReLU nonlinearities,
and a softmax loss function. This will also implement dropout and batch/layer
normalization as options. For a network with L layers, the architecture will be
{affine - [batch/layer norm] - relu - [dropout]} x (L - 1) - affine - softmax
where batch/layer normalization and dropout are optional and the {...} block is
repeated L - 1 times.
Learnable parameters are stored in the self.params dictionary and will be learned
using the Solver class.
"""
def __init__ (
self,
hidden_dims,
input_dim = 3 * 32 * 32 ,
num_classes = 10 ,
dropout_keep_ratio = 1 ,
normalization = None ,
reg = 0.0 ,
weight_scale = 1e-2 ,
dtype = np.float32,
seed = None ,
):
self .normalization = normalization
self .use_dropout = dropout_keep_ratio != 1
self .reg = reg
self .num_layers = 1 + len (hidden_dims)
self .dtype = dtype
self .params = {}
for l, (i, j) in enumerate ( zip ([input_dim, * hidden_dims], [ * hidden_dims, num_classes])):
self .params[ f 'W { l + 1} ' ] = np.random.randn(i, j) * weight_scale
self .params[ f 'b { l + 1} ' ] = np.zeros(j)
if self .normalization and l < self .num_layers - 1 :
self .params[ f 'gamma { l + 1} ' ] = np.ones(j)
self .params[ f 'beta { l + 1} ' ] = np.zeros(j)
# When using dropout we need to pass a dropout_param dictionary to each
# dropout layer so that the layer knows the dropout probability and the mode
# (train / test). You can pass the same dropout_param to each dropout layer.
self .dropout_param = {}
if self .use_dropout:
self .dropout_param = { "mode" : "train" , "p" : dropout_keep_ratio}
if seed is not None :
self .dropout_param[ "seed" ] = seed
# With batch normalization we need to keep track of running means and
# variances, so we need to pass a special bn_param object to each batch
# normalization layer. You should pass self.bn_params[0] to the forward pass
# of the first batch normalization layer, self.bn_params[1] to the forward
# pass of the second batch normalization layer, etc.
self .bn_params = []
if self .normalization == "batchnorm" :
self .bn_params = [{ "mode" : "train" } for i in range ( self .num_layers - 1 )]
if self .normalization == "layernorm" :
self .bn_params = [{} for i in range ( self .num_layers - 1 )]
# Cast all parameters to the correct datatype.
for k, v in self .params.items():
self .params[k] = v.astype(dtype)
def loss (self, X, y = None ):
X = X.astype( self .dtype)
mode = "test" if y is None else "train"
# Set train/test mode for batchnorm params and dropout param since they
# behave differently during training and testing.
if self .use_dropout:
self .dropout_param[ "mode" ] = mode
if self .normalization == "batchnorm" :
for bn_param in self .bn_params:
bn_param[ "mode" ] = mode
scores = None
cache = {}
for l in range ( self .num_layers):
keys = [ f 'W { l + 1} ' , f 'b { l + 1} ' , f 'gamma { l + 1} ' , f 'beta { l + 1} ' ] # list of params
w, b, gamma, beta = ( self .params.get(k, None ) for k in keys) # get param vals
bn = self .bn_params[l] if gamma is not None else None # bn params if exist
do = self .dropout_param if self .use_dropout else None # do params if exist
X, cache[l] = generic_forward(X, w, b, gamma, beta, bn, do, l == self .num_layers - 1 ) # generic forward pass
scores = X
# If test mode return early.
if mode == "test" :
return scores
loss, grads = 0.0 , {}
loss, dout = softmax_loss(scores, y)
loss += 0.5 * self .reg * np.sum([np.sum(W ** 2 ) for k, W in self .params.items() if 'W' in k])
for l in reversed ( range ( self .num_layers)):
dout, dW, db, dgamma, dbeta = generic_backward(dout, cache[l])
grads[ f 'W { l + 1} ' ] = dW + self .reg * self .params[ f 'W { l + 1} ' ]
grads[ f 'b { l + 1} ' ] = db
if dgamma is not None and l < self .num_layers - 1 :
grads[ f 'gamma { l + 1} ' ] = dgamma
grads[ f 'beta { l + 1} ' ] = dbeta
return loss, grads
Implement SGD with momentum according to these specs
def sgd_momentum (w, dw, config):
"""
Performs stochastic gradient descent with momentum.
config format:
- learning_rate: Scalar learning rate.
- momentum: Scalar between 0 and 1 giving the momentum value.
Setting momentum = 0 reduces to sgd.
- velocity: A numpy array of the same shape as w and dw used to store a
moving average of the gradients.
"""
For the following algorithm, you can use these pre-defined variables from a config
dictionary
learning_rate
momentum
veocity
Answer
Get velocity
Update weights
def sgd_momentum (w, dw, config):
v = config.get( "velocity" , np.zeros_like(w))
next_w = None
v = config[ 'momentum' ] * v - config[ 'learning_rate' ] * dw # update velocity
next_w = w + v # update position
return next_w
Implement RMSProp update rule according to these specs
def rmsprop (w, dw, config):
"""
Uses the RMSProp update rule, which uses a moving average of squared
gradient values to set adaptive per-parameter learning rates.
config format:
- learning_rate: Scalar learning rate.
- decay_rate: Scalar between 0 and 1 giving the decay rate for the squared
gradient cache.
- epsilon: Small scalar used for smoothing to avoid dividing by zero.
- cache: Moving average of second moments of gradients.
"""
For the following algorithm, you can use these pre-defined variables from a config
dictionary
learning_rate
decay_rate
epsilon
cache
Answer
Load the 4 values
Update cache
Update weights
def rmsprop (w, dw, config):
next_w = None
keys = [ 'learning_rate' , 'decay_rate' , 'epsilon' , 'cache' ] # keys in this order
lr, dr, eps, cache = (config.get(key) for key in keys) # vals in this order
cache = dr * cache + ( 1 - dr) * dw ** 2 # update cache
next_w = w - lr * dw / (np.sqrt(cache) + eps) # update w
return next_w
Implement ADAM update rule to these specs
def adam (w, dw, config = None ):
"""
Uses the Adam update rule, which incorporates moving averages of both the
gradient and its square and a bias correction term.
config format:
- learning_rate: Scalar learning rate.
- beta1: Decay rate for moving average of first moment of gradient.
- beta2: Decay rate for moving average of second moment of gradient.
- epsilon: Small scalar used for smoothing to avoid dividing by zero.
- m: Moving average of gradient.
- v: Moving average of squared gradient.
- t: Iteration number.
"""
For the following algorithm, you can use these pre-defined variables from a config
dictionary
learning_rate
veocity
beta1
beta2
`betam
`m“
v
`t
Extract configuration values (learning_rate
, beta1
, beta2
, epsilon
, m
, v
, t
) from config
.
Increment timestep t
in config
.
Update m
in config
with decayed average of past gradients (momentum).
Apply bias correction to m
.
Update v
in config
with decayed average of past squared gradients (RMSprop).
Apply bias correction to v
.
Calculate the new weights next_w
using Adam update rule.
def adam (w, dw, config = None ):
next_w = None
keys = [ 'learning_rate' , 'beta1' , 'beta2' , 'epsilon' , 'm' , 'v' , 't' ] # keys in this order
lr, beta1, beta2, eps, m, v, t = (config.get(k) for k in keys) # vals in this order
config[ 't' ] = t = t + 1 # iteration counter
config[ 'm' ] = m = beta1 * m + ( 1 - beta1) * dw # gradient smoothing (Momentum)
mt = m / ( 1 - beta1 ** t) # bias correction
config[ 'v' ] = v = beta2 * v + ( 1 - beta2) * (dw ** 2 ) # gradient smoothing (RMSprop)
vt = v / ( 1 - beta2 ** t) # bias correction
next_w = w - lr * mt / (np.sqrt(vt) + eps) # weight update
return next_w, config
Batchnorm
Implement forward pass for batchnorm
according to these specs
def batchnorm_forward (x, gamma, beta, bn_param):
"""Forward pass for batch normalization.
During training the sample mean and (uncorrected) sample variance are
computed from minibatch statistics and used to normalize the incoming data.
During training we also keep an exponentially decaying running mean of the
mean and variance of each feature, and these averages are used to normalize
data at test-time.
At each timestep we update the running averages for mean and variance using
an exponential decay based on the momentum parameter:
running_mean = momentum * running_mean + (1 - momentum) * sample_mean
running_var = momentum * running_var + (1 - momentum) * sample_var
Note that the batch normalization paper suggests a different test-time
behavior: they compute sample mean and variance for each feature using a
large number of training images rather than using a running average. For
this implementation we have chosen to use running averages instead since
they do not require an additional estimation step; the torch7
implementation of batch normalization also uses running averages.
Input:
- x: Data of shape (N, D)
- gamma: Scale parameter of shape (D,)
- beta: Shift paremeter of shape (D,)
- bn_param: Dictionary with the following keys:
- mode: 'train' or 'test'; required
- eps: Constant for numeric stability
- momentum: Constant for running mean / variance.
- running_mean: Array of shape (D,) giving running mean of features
- running_var Array of shape (D,) giving running variance of features
Returns a tuple of:
- out: of shape (N, D)
- cache: A tuple of values needed in the backward pass
"""
Answer
Extract mode
, eps
, and momentum
from bn_param
.
Get the shape N, D
from input x
.
Initialize running_mean
and running_var
with zeros if not in bn_param
.
If mode
is “train”:
4. Compute mean mu
and variance var
of x
along axis 0.
5. Calculate standard deviation std
of x
.
6. Normalize x
to x_hat
.
7. Scale and shift x_hat
to out
using gamma
and beta
.
8. Store necessary values in cache
for backpropagation.
9. Update running_mean
and running_var
using momentum
.
If mode
is “test”:
10. Normalize x
using running_mean
and running_var
.
11. Scale and shift as in training mode.
12. Raise error if mode
is invalid.
13. Update bn_param
with new running_mean
and running_var
.
def batchnorm_forward (x, gamma, beta, bn_param):
mode = bn_param[ "mode" ]
eps = bn_param.get( "eps" , 1e-5 )
momentum = bn_param.get( "momentum" , 0.9 )
N, D = x.shape
running_mean = bn_param.get( "running_mean" , np.zeros(D, dtype = x.dtype))
running_var = bn_param.get( "running_var" , np.zeros(D, dtype = x.dtype))
out, cache = None , None
if mode == "train" :
mu = x.mean( axis = 0 ) # batch mean for each feature
var = x.var( axis = 0 ) # batch variance for each feature
std = np.sqrt(var + eps) # batch standard deviation for each feature
x_hat = (x - mu) / std # standartized x
out = gamma * x_hat + beta # scaled and shifted x_hat
shape = bn_param.get( 'shape' , (N, D)) # reshape used in backprop
axis = bn_param.get( 'axis' , 0 ) # axis to sum used in backprop
cache = x, mu, var, std, gamma, x_hat, shape, axis # save for backprop
if axis == 0 : # if not batchnorm
running_mean = momentum * running_mean + ( 1 - momentum) * mu # update overall mean
running_var = momentum * running_var + ( 1 - momentum) * var # update overall variance
elif mode == "test" :
x_hat = (x - running_mean) / np.sqrt(running_var + eps)
out = gamma * x_hat + beta
else :
raise ValueError ( 'Invalid forward batchnorm mode " %s "' % mode)
# Store the updated running means back into bn_param
bn_param[ "running_mean" ] = running_mean
bn_param[ "running_var" ] = running_var
return out, cache
Implement backward pass for batchnorm
according to these specs
def batchnorm_backward (dout, cache):
"""Backward pass for batch normalization.
For this implementation, you should write out a computation graph for
batch normalization on paper and propagate gradients backward through
intermediate nodes.
Inputs:
- dout: Upstream derivatives, of shape (N, D)
- cache: Variable of intermediates from batchnorm_forward.
Returns a tuple of:
- dx: Gradient with respect to inputs x, of shape (N, D)
- dgamma: Gradient with respect to scale parameter gamma, of shape (D,)
- dbeta: Gradient with respect to shift parameter beta, of shape (D,)
"""
Answer
def batchnorm_backward (dout, cache):
dx, dgamma, dbeta = None , None , None
# https://kratzert.github.io/2016/02/12/understanding-the-gradient-flow-through-the-batch-normalization-layer.html
x, mu, var, std, gamma, x_hat, shape, axis = cache # expand cache
dbeta = dout.reshape(shape, order = 'F' ).sum(axis) # derivative w.r.t. beta
dgamma = (dout * x_hat).reshape(shape, order = 'F' ).sum(axis) # derivative w.r.t. gamma
dx_hat = dout * gamma # derivative w.t.r. x_hat
dstd = - np.sum(dx_hat * (x - mu), axis = 0 ) / (std ** 2 ) # derivative w.t.r. std
dvar = 0.5 * dstd / std # derivative w.t.r. var
dx1 = dx_hat / std + 2 * (x - mu) * dvar / len (dout) # partial derivative w.t.r. dx
dmu = - np.sum(dx1, axis = 0 ) # derivative w.t.r. mu
dx2 = dmu / len (dout) # partial derivative w.t.r. dx
dx = dx1 + dx2 # full derivative w.t.r. x
return dx, dgamma, dbeta
Implement an alternative backward pass for batchnorm
according to these specs
def batchnorm_backward_alt (dout, cache):
"""Alternative backward pass for batch normalization.
For this implementation you should work out the derivatives for the batch
normalizaton backward pass on paper and simplify as much as possible. You
should be able to derive a simple expression for the backward pass.
See the jupyter notebook for more hints.
Note: This implementation should expect to receive the same cache variable
as batchnorm_backward, but might not use all of the values in the cache.
Inputs / outputs: Same as batchnorm_backward
"""
Answer
def batchnorm_backward_alt (dout, cache):
dx, dgamma, dbeta = None , None , None
_, _, _, std, gamma, x_hat, shape, axis = cache # expand cache
S = lambda x: x.sum( axis = 0 ) # helper function
dbeta = dout.reshape(shape, order = 'F' ).sum(axis) # derivative w.r.t. beta
dgamma = (dout * x_hat).reshape(shape, order = 'F' ).sum(axis) # derivative w.r.t. gamma
dx = dout * gamma / ( len (dout) * std) # temporarily initialize scale value
dx = len (dout) * dx - S(dx * x_hat) * x_hat - S(dx) # derivative w.r.t. unnormalized x
return dx, dgamma, dbeta
Implement an forward pass for layernorm
according to these specs
def layernorm_forward (x, gamma, beta, ln_param):
"""Forward pass for layer normalization.
During both training and test-time, the incoming data is normalized per data-point,
before being scaled by gamma and beta parameters identical to that of batch normalization.
Note that in contrast to batch normalization, the behavior during train and test-time for
layer normalization are identical, and we do not need to keep track of running averages
of any sort.
Input:
- x: Data of shape (N, D)
- gamma: Scale parameter of shape (D,)
- beta: Shift paremeter of shape (D,)
- ln_param: Dictionary with the following keys:
- eps: Constant for numeric stability
Returns a tuple of:
- out: of shape (N, D)
- cache: A tuple of values needed in the backward pass
"""
Answer
def layernorm_forward (x, gamma, beta, ln_param):
out, cache = None , None
eps = ln_param.get( "eps" , 1e-5 )
bn_param = { "mode" : "train" , "axis" : 1 , ** ln_param} # same as batchnorm in train mode + over which axis to sum for grad
[gamma, beta] = np.atleast_2d(gamma, beta) # assure 2D to perform transpose
out, cache = batchnorm_forward(x.T, gamma.T, beta.T, bn_param) # same as batchnorm
out = out.T # transpose back
return out, cache
Implement a backward pass for layernorm
according to these specs
def layernorm_backward (dout, cache):
"""Backward pass for layer normalization.
For this implementation, you can heavily rely on the work you've done already
for batch normalization.
Inputs:
- dout: Upstream derivatives, of shape (N, D)
- cache: Variable of intermediates from layernorm_forward.
Returns a tuple of:
- dx: Gradient with respect to inputs x, of shape (N, D)
- dgamma: Gradient with respect to scale parameter gamma, of shape (D,)
- dbeta: Gradient with respect to shift parameter beta, of shape (D,)
"""
Answer
def layernorm_backward (dout, cache):
dx, dgamma, dbeta = None , None , None
dx, dgamma, dbeta = batchnorm_backward_alt(dout.T, cache) # same as batchnorm backprop
dx = dx.T # transpose back dx
return dx, dgamma, dbeta
Dropout
Implement dropout forward pass
according to these specs
def dropout_forward (x, dropout_param):
"""Forward pass for inverted dropout.
Note that this is different from the vanilla version of dropout.
Here, p is the probability of keeping a neuron output, as opposed to
the probability of dropping a neuron output.
See http://cs231n.github.io/neural-networks-2/#reg for more details.
Inputs:
- x: Input data, of any shape
- dropout_param: A dictionary with the following keys:
- p: Dropout parameter. We keep each neuron output with probability p.
- mode: 'test' or 'train'. If the mode is train, then perform dropout;
if the mode is test, then just return the input.
- seed: Seed for the random number generator. Passing seed makes this
function deterministic, which is needed for gradient checking but not
in real networks.
Outputs:
- out: Array of the same shape as x.
- cache: tuple (dropout_param, mask). In training mode, mask is the dropout
mask that was used to multiply the input; in test mode, mask is None.
"""
Answer
def dropout_forward (x, dropout_param):
p, mode = dropout_param[ "p" ], dropout_param[ "mode" ]
if "seed" in dropout_param:
np.random.seed(dropout_param[ "seed" ])
mask = None
out = None
if mode == "train" :
mask = (np.random.rand( * x.shape) < p) / p
out = x * mask
elif mode == "test" :
out = x
cache = (dropout_param, mask)
out = out.astype(x.dtype, copy = False )
return out, cache
Implement dropout backward pass
according to these specs
def dropout_backward (dout, cache):
"""Backward pass for inverted dropout.
Inputs:
- dout: Upstream derivatives, of any shape
- cache: (dropout_param, mask) from dropout_forward.
"""
Answer
def dropout_backward (dout, cache
dropout_param, mask = cache
mode = dropout_param[ "mode" ]
dx = None
if mode == "train" :
dx = dout * mask
elif mode == "test" :
dx = dout
return dx
CNN
Implement convolutional forward pass
according to these specs
def conv_forward (x, w, b, conv_param):
"""A implementation of the forward pass for a convolutional layer.
The input consists of N data points, each with C channels, height H and
width W. We convolve each input with F different filters, where each filter
spans all C channels and has height HH and width WW.
Input:
- x: Input data of shape (N, C, H, W)
- w: Filter weights of shape (F, C, HH, WW)
- b: Biases, of shape (F,)
- conv_param: A dictionary with the following keys:
- 'stride': The number of pixels between adjacent receptive fields in the
horizontal and vertical directions.
- 'pad': The number of pixels that will be used to zero-pad the input.
During padding, 'pad' zeros should be placed symmetrically (i.e equally on both sides)
along the height and width axes of the input. Be careful not to modfiy the original
input x directly.
Returns a tuple of:
- out: Output data, of shape (N, F, H', W') where H' and W' are given by
H' = 1 + (H + 2 * pad - HH) / stride
W' = 1 + (W + 2 * pad - WW) / stride
- cache: (x, w, b, conv_param)
"""
Answer
def conv_forward (x, w, b, conv_param):
out = None
P1 = P2 = P3 = P4 = conv_param[ 'pad' ] # padding: up = right = down = left
S1 = S2 = conv_param[ 'stride' ] # stride: up = down
N, C, HI , WI = x.shape # input dims
F, _, HF , WF = w.shape # filter dims
HO = 1 + ( HI + P1 + P3 - HF ) // S1 # output height
WO = 1 + ( WI + P2 + P4 - WF ) // S2 # output width
# Helper function (warning: numpy version 1.20 or above is required for usage)
to_fields = lambda x: np.lib.stride_tricks.sliding_window_view(x, ( WF , HF ,C,N))
w_row = w.reshape(F, - 1 ) # weights as rows
x_pad = np.pad(x, (( 0 , 0 ), ( 0 , 0 ), (P1, P3), (P2, P4)), 'constant' ) # padded inputs
x_col = to_fields(x_pad.T).T[ ... ,::S1,::S2].reshape(N, C * HF * WF , - 1 ) # inputs as cols
out = (w_row @ x_col).reshape(N, F, HO , WO ) + np.expand_dims(b, axis = ( 2 , 1 ))
x = x_pad # we will use padded version as well during backpropagation
cache = (x, w, b, conv_param)
return out, cache
Faster Answer
def conv_forward_strides (x, w, b, conv_param):
N, C, H, W = x.shape
F, _, HH , WW = w.shape
stride, pad = conv_param[ "stride" ], conv_param[ "pad" ]
# Check dimensions
# assert (W + 2 * pad - WW) % stride == 0, 'width does not work'
# assert (H + 2 * pad - HH) % stride == 0, 'height does not work'
# Pad the input
p = pad
x_padded = np.pad(x, (( 0 , 0 ), ( 0 , 0 ), (p, p), (p, p)), mode = "constant" )
# Figure out output dimensions
H += 2 * pad
W += 2 * pad
out_h = (H - HH ) // stride + 1
out_w = (W - WW ) // stride + 1
# Perform an im2col operation by picking clever strides
shape = (C, HH , WW , N, out_h, out_w)
strides = (H * W, W, 1 , C * H * W, stride * W, stride)
strides = x.itemsize * np.array(strides)
x_stride = np.lib.stride_tricks.as_strided(x_padded, shape = shape, strides = strides)
x_cols = np.ascontiguousarray(x_stride)
x_cols.shape = (C * HH * WW , N * out_h * out_w)
# Now all our convolutions are a big matrix multiply
res = w.reshape(F, - 1 ).dot(x_cols) + b.reshape( - 1 , 1 )
# Reshape the output
res.shape = (F, N, out_h, out_w)
out = res.transpose( 1 , 0 , 2 , 3 )
# Be nice and return a contiguous array
# The old version of conv_forward_fast doesn't do this, so for a fair
# comparison we won't either
out = np.ascontiguousarray(out)
cache = (x, w, b, conv_param, x_cols)
return out, cache
Implement convolutional backward pass
according to these specs
def conv_backward (dout, cache):
"""An implementation of the backward pass for a convolutional layer.
Inputs:
- dout: Upstream derivatives.
- cache: A tuple of (x, w, b, conv_param) as in conv_forward
Returns a tuple of:
- dx: Gradient with respect to x
- dw: Gradient with respect to w
- db: Gradient with respect to b
"""
Answer
def conv_backward (dout, cache):
dx, dw, db = None , None , None
# Helper function (warning: numpy 1.20+ is required)
to_fields = np.lib.stride_tricks.sliding_window_view
x_pad, w, b, conv_param = cache # extract parameters from cache
S1 = S2 = conv_param[ 'stride' ] # stride: up = down
P1 = P2 = P3 = P4 = conv_param[ 'pad' ] # padding: up = right = down = left
F, C, HF , WF = w.shape # filter dims
N, _, HO , WO = dout.shape # output dims
dout = np.insert(dout, [ * range ( 1 , HO )] * (S1 - 1 ), 0 , axis = 2 ) # "missing" rows
dout = np.insert(dout, [ * range ( 1 , WO )] * (S2 - 1 ), 0 , axis = 3 ) # "missing" columns
dout_pad = np.pad(dout, (( 0 ,), ( 0 ,), ( HF - 1 ,), ( WF - 1 ,)), 'constant' ) # for full convolution
x_fields = to_fields(x_pad, (N, C, dout.shape[ 2 ], dout.shape[ 3 ])) # input local regions w.r.t. dout
dout_fields = to_fields(dout_pad, (N, F, HF , WF )) # dout local regions w.r.t. filter
w_rot = np.rot90(w, 2 , axes = ( 2 , 3 )) # rotated kernel (for convolution)
db = np.einsum( 'ijkl->j' , dout) # sum over
dw = np.einsum( 'ijkl,mnopiqkl->jqop' , dout, x_fields) # correlate
dx = np.einsum( 'ijkl,mnopqikl->qjop' , w_rot, dout_fields)[ ... , P1: - P3, P2: - P4] # convolve
return dx, dw, db
Faster Answer
def conv_backward_strides (dout, cache):
x, w, b, conv_param, x_cols = cache
stride, pad = conv_param[ "stride" ], conv_param[ "pad" ]
N, C, H, W = x.shape
F, _, HH , WW = w.shape
_, _, out_h, out_w = dout.shape
db = np.sum(dout, axis = ( 0 , 2 , 3 ))
dout_reshaped = dout.transpose( 1 , 0 , 2 , 3 ).reshape(F, - 1 )
dw = dout_reshaped.dot(x_cols.T).reshape(w.shape)
dx_cols = w.reshape(F, - 1 ).T.dot(dout_reshaped)
dx_cols.shape = (C, HH , WW , N, out_h, out_w)
dx = col2im_6d_cython(dx_cols, N, C, H, W, HH , WW , pad, stride)
return dx, dw, db
Implement maxpooling forward pass
according to these specs
def max_pool_forward (x, pool_param):
"""An implementation of the forward pass for a max-pooling layer.
Inputs:
- x: Input data, of shape (N, C, H, W)
- pool_param: dictionary with the following keys:
- 'pool_height': The height of each pooling region
- 'pool_width': The width of each pooling region
- 'stride': The distance between adjacent pooling regions
No padding is necessary here, eg you can assume:
- (H - pool_height) % stride == 0
- (W - pool_width) % stride == 0
Returns a tuple of:
- out: Output data, of shape (N, C, H', W') where H' and W' are given by
H' = 1 + (H - pool_height) / stride
W' = 1 + (W - pool_width) / stride
- cache: (x, pool_param)
"""
Answer
def max_pool_forward (x, pool_param):
out = None
S1 = S2 = pool_param[ 'stride' ] # stride: up = down
HP = pool_param[ 'pool_height' ] # pool height
WP = pool_param[ 'pool_width' ] # pool width
N, C, HI , WI = x.shape # input dims
HO = 1 + ( HI - HP ) // S1 # output height
WO = 1 + ( WI - WP ) // S2 # output width
# Helper function (warning: numpy version 1.20 or above is required for usage)
to_fields = lambda x: np.lib.stride_tricks.sliding_window_view(x, ( WP , HP ,C,N))
x_fields = to_fields(x.T).T[ ... ,::S1,::S2].reshape(N, C, HP * WP , - 1 ) # input local regions
out = x_fields.max( axis = 2 ).reshape(N, C, HO , WO ) # pooled output
cache = (x, pool_param)
return out, cache
Faster Answer
def max_pool_forward_fast (x, pool_param):
"""
A fast implementation of the forward pass for a max pooling layer.
This chooses between the reshape method and the im2col method. If the pooling
regions are square and tile the input image, then we can use the reshape
method which is very fast. Otherwise we fall back on the im2col method, which
is not much faster than the naive method.
"""
N, C, H, W = x.shape
pool_height, pool_width = pool_param[ "pool_height" ], pool_param[ "pool_width" ]
stride = pool_param[ "stride" ]
same_size = pool_height == pool_width == stride
tiles = H % pool_height == 0 and W % pool_width == 0
if same_size and tiles:
out, reshape_cache = max_pool_forward_reshape(x, pool_param)
cache = ( "reshape" , reshape_cache)
else :
out, im2col_cache = max_pool_forward_im2col(x, pool_param)
cache = ( "im2col" , im2col_cache)
return out, cache
Implement MaxPooling backward pass
according to these specs
def max_pool_backward (dout, cache):
"""An implementation of the backward pass for a max-pooling layer.
Inputs:
- dout: Upstream derivatives
- cache: A tuple of (x, pool_param) as in the forward pass.
Returns:
- dx: Gradient with respect to x
"""
Answer
def max_pool_backward (dout, cache):
dx = None
x, pool_param = cache # expand cache
N, C, HO , WO = dout.shape # get shape values
dx = np.zeros_like(x) # init derivative
S1 = S2 = pool_param[ 'stride' ] # stride: up = down
HP = pool_param[ 'pool_height' ] # pool height
WP = pool_param[ 'pool_width' ] # pool width
for i in range ( HO ):
for j in range ( WO ):
[ns, cs], h, w = np.indices((N, C)), i * S1, j * S2 # compact indexing
f = x[:, :, h:(h + HP ), w:(w + WP )].reshape(N, C, - 1 ) # input local fields
k, l = np.unravel_index(np.argmax(f, 2 ), ( HP , WP )) # offsets for max vals
dx[ns, cs, h + k, w + l] += dout[ns, cs, i, j] # select areas to update
return dx
Faster Answer
def max_pool_backward_fast (dout, cache):
"""
A fast implementation of the backward pass for a max pooling layer.
This switches between the reshape method an the im2col method depending on
which method was used to generate the cache.
"""
method, real_cache = cache
if method == "reshape" :
return max_pool_backward_reshape(dout, real_cache)
elif method == "im2col" :
return max_pool_backward_im2col(dout, real_cache)
else :
raise ValueError ( 'Unrecognized method " %s "' % method)
Utilities for the faster functions
def max_pool_forward_reshape (x, pool_param):
"""
A fast implementation of the forward pass for the max pooling layer that uses
some clever reshaping.
This can only be used for square pooling regions that tile the input.
"""
N, C, H, W = x.shape
pool_height, pool_width = pool_param[ "pool_height" ], pool_param[ "pool_width" ]
stride = pool_param[ "stride" ]
assert pool_height == pool_width == stride, "Invalid pool params"
assert H % pool_height == 0
assert W % pool_height == 0
x_reshaped = x.reshape(
N, C, H // pool_height, pool_height, W // pool_width, pool_width
)
out = x_reshaped.max( axis = 3 ).max( axis = 4 )
cache = (x, x_reshaped, out)
return out, cache
def max_pool_backward_reshape (dout, cache):
"""
A fast implementation of the backward pass for the max pooling layer that
uses some clever broadcasting and reshaping.
This can only be used if the forward pass was computed using
max_pool_forward_reshape.
NOTE : If there are multiple argmaxes, this method will assign gradient to
ALL argmax elements of the input rather than picking one. In this case the
gradient will actually be incorrect. However this is unlikely to occur in
practice, so it shouldn't matter much. One possible solution is to split the
upstream gradient equally among all argmax elements; this should result in a
valid subgradient. You can make this happen by uncommenting the line below;
however this results in a significant performance penalty (about 40% slower)
and is unlikely to matter in practice so we don't do it.
"""
x, x_reshaped, out = cache
dx_reshaped = np.zeros_like(x_reshaped)
out_newaxis = out[:, :, :, np.newaxis, :, np.newaxis]
mask = x_reshaped == out_newaxis
dout_newaxis = dout[:, :, :, np.newaxis, :, np.newaxis]
dout_broadcast, _ = np.broadcast_arrays(dout_newaxis, dx_reshaped)
dx_reshaped[mask] = dout_broadcast[mask]
dx_reshaped /= np.sum(mask, axis = ( 3 , 5 ), keepdims = True )
dx = dx_reshaped.reshape(x.shape)
return dx
def max_pool_forward_im2col (x, pool_param):
"""
An implementation of the forward pass for max pooling based on im2col.
This isn't much faster than the naive version, so it should be avoided if
possible.
"""
N, C, H, W = x.shape
pool_height, pool_width = pool_param[ "pool_height" ], pool_param[ "pool_width" ]
stride = pool_param[ "stride" ]
assert (H - pool_height) % stride == 0 , "Invalid height"
assert (W - pool_width) % stride == 0 , "Invalid width"
out_height = (H - pool_height) // stride + 1
out_width = (W - pool_width) // stride + 1
x_split = x.reshape(N * C, 1 , H, W)
x_cols = im2col(x_split, pool_height, pool_width, padding = 0 , stride = stride)
x_cols_argmax = np.argmax(x_cols, axis = 0 )
x_cols_max = x_cols[x_cols_argmax, np.arange(x_cols.shape[ 1 ])]
out = x_cols_max.reshape(out_height, out_width, N, C).transpose( 2 , 3 , 0 , 1 )
cache = (x, x_cols, x_cols_argmax, pool_param)
return out, cache
def max_pool_backward_im2col (dout, cache):
"""
An implementation of the backward pass for max pooling based on im2col.
This isn't much faster than the naive version, so it should be avoided if
possible.
"""
x, x_cols, x_cols_argmax, pool_param = cache
N, C, H, W = x.shape
pool_height, pool_width = pool_param[ "pool_height" ], pool_param[ "pool_width" ]
stride = pool_param[ "stride" ]
dout_reshaped = dout.transpose( 2 , 3 , 0 , 1 ).flatten()
dx_cols = np.zeros_like(x_cols)
dx_cols[x_cols_argmax, np.arange(dx_cols.shape[ 1 ])] = dout_reshaped
dx = col2im_indices(
dx_cols, (N * C, 1 , H, W), pool_height, pool_width, padding = 0 , stride = stride
)
dx = dx.reshape(x.shape)
return dx
def conv_backward_im2col (dout, cache):
"""
A fast implementation of the backward pass for a convolutional layer
based on im2col and col2im.
"""
x, w, b, conv_param, x_cols = cache
stride, pad = conv_param[ "stride" ], conv_param[ "pad" ]
db = np.sum(dout, axis = ( 0 , 2 , 3 ))
num_filters, _, filter_height, filter_width = w.shape
dout_reshaped = dout.transpose( 1 , 2 , 3 , 0 ).reshape(num_filters, - 1 )
dw = dout_reshaped.dot(x_cols.T).reshape(w.shape)
dx_cols = w.reshape(num_filters, - 1 ).T.dot(dout_reshaped)
# dx = col2im_indices(dx_cols, x.shape, filter_height, filter_width, pad, stride)
dx = col2im_cython(
dx_cols,
x.shape[ 0 ],
x.shape[ 1 ],
x.shape[ 2 ],
x.shape[ 3 ],
filter_height,
filter_width,
pad,
stride,
)
return dx, dw, db
def conv_forward_im2col (x, w, b, conv_param):
"""
A fast implementation of the forward pass for a convolutional layer
based on im2col and col2im.
"""
N, C, H, W = x.shape
num_filters, _, filter_height, filter_width = w.shape
stride, pad = conv_param[ "stride" ], conv_param[ "pad" ]
# Check dimensions
assert (W + 2 * pad - filter_width) % stride == 0 , "width does not work"
assert (H + 2 * pad - filter_height) % stride == 0 , "height does not work"
# Create output
out_height = (H + 2 * pad - filter_height) // stride + 1
out_width = (W + 2 * pad - filter_width) // stride + 1
out = np.zeros((N, num_filters, out_height, out_width), dtype = x.dtype)
# x_cols = im2col_indices(x, w.shape[2], w.shape[3], pad, stride)
x_cols = im2col_cython(x, w.shape[ 2 ], w.shape[ 3 ], pad, stride)
res = w.reshape((w.shape[ 0 ], - 1 )).dot(x_cols) + b.reshape( - 1 , 1 )
out = res.reshape(w.shape[ 0 ], out.shape[ 2 ], out.shape[ 3 ], x.shape[ 0 ])
out = out.transpose( 3 , 0 , 1 , 2 )
cache = (x, w, b, conv_param, x_cols)
return out, cache