Source code for mlhub.lenet.models

# Model functions
"""
    Models
    ^^^^^^^
    
    .. autoclass:: mlhub.lenet.models.LeNet5
        :members:
        :exclude-members: forward
        :special-members:
    .. autoclass:: mlhub.lenet.models.CustomConvLayer
        :members:
        :exclude-members: forward
        :special-members:
    .. autoclass:: mlhub.lenet.models.RBFUnits
        :members:
        :exclude-members: forward
        :special-members:
    .. autoclass:: mlhub.lenet.models.SubSamplingLayer
        :members:
        :exclude-members: forward
        :special-members:
    .. autoclass:: mlhub.lenet.models.SigmoidSquashingActivation
        :members:
        :exclude-members: forward
        :special-members:
"""

# %%
import sys
import time
import torch
import numpy as np
import einops as ein
import torch.nn as nn
from torchinfo import summary
from torch.nn import functional as F
from typing import Optional, Union, List


# %%
[docs] class SigmoidSquashingActivation(nn.Module): """ Sigmoid squashing activation function from the LeNet paper. It is a scaled hyperbolic tangent function. It can be found in the equation 6 of :ref:`the paper <lecun1998gradient>`. Function does the following .. math:: f(x) = A \; \mathrm{tanh}(S\,x) """
[docs] def __init__(self, A = 1.7159, S = 2/3) -> None: """ :param A: The value of :math:`A` from equation :param S: The value of :math:`S` from equation """ super().__init__() self.A = A self.S = S
def forward(self, x): return self.A * torch.tanh(self.S * x)
# %%
[docs] class SubSamplingLayer(nn.Module): """ Sub-sampling layer for LeNet-5. Labeled as Sx in section 2B of :ref:`the paper <lecun1998gradient>`. .. note:: **TL;DR**: It adds four numbers in a single channel (it depends on the ``kernel_size``), multiplies a weight, adds a bias, and returns the result. The weight and bias are trainable. """
[docs] def __init__(self, in_channels: int, kernel_size: int = 2) \ -> None: """ :param in_channels: Number of channels in the input image. The output has the same number of channels. :param kernel_size: The size of the kernel to use for sub-sampling. """ super().__init__() out_channels = in_channels # Sub-sampling doesn't change this self.in_channels = in_channels self.out_channels = out_channels self.kernel_size = kernel_size self.weights = nn.Parameter( # Learnable parameter torch.randn([1, in_channels, 1, 1])) self.bias = nn.Parameter(torch.randn([out_channels]))
def _create_weight(self): in_c, out_c = self.in_channels, self.out_channels ks = self.kernel_size kernel = torch.ones([out_c, in_c, ks, ks], device=self.weights.device) * 0.25 * self.weights return kernel def forward(self, x): k = self._create_weight() return F.conv2d(x, k, self.bias, stride=2)
# %%
[docs] class CustomConvLayer(nn.Module): """ Custom convolution layer for LeNet5. Details in Table 1 and related text in :ref:`the paper <lecun1998gradient>`. """
[docs] def __init__(self, in_channels: int = 6, out_channels: int = 16, kernel_size: int = 5, bias: bool = True, connected_map: Union[np.ndarray, str] \ = "default") -> None: """ :param in_channels: Number of channels in the input image. :param out_channels: Number of channels in the output image. :param kernel_size: The size of the kernel to use for convolution. Should be an ``int`` (only square kernels supported). :param bias: If True, use bias. Else bias is 0. :param connected_map: The connectivity map to use for convolution. If ``"default"``, the connectivity map is taken from the table 1 of :ref:`the paper <lecun1998gradient>`. If a numpy array is given, it should be of shape ``[out_channels, in_channels]`` where ``[i, j]`` is ``True`` if ``j``-th input channel is connected to the ``i``-th output channel. The array datatype is ``bool``. """ super().__init__() if connected_map == "default": assert in_channels == 6 and out_channels == 16, \ "Default map requires in and out channels to be 6 "\ "and 16, respectively "\ f"({in_channels = }, {out_channels = })" connected_map = np.array([ # [in_ch, ...] # out-ch [True , True , True , False, False, False], # 0 [False, True , True , True , False, False], # 1 [False, False, True , True , True , False], # 2 [False, False, False, True , True , True ], # 3 [True , False, False, False, True , True ], # 4 [True , True , False, False, False, True ], # 5 [True , True , True , True , False, False], # 6 [False, True , True , True , True , False], # 7 [False, False, True , True , True , True ], # 8 [True , False, False, True , True , True ], # 9 [True , True , False, False, True , True ], # 10 [True , True , True , False, False, True ], # 11 [True , True , False, True , True , False], # 12 [False, True , True , False, True , True ], # 13 [True , False, True , True , False, True ], # 14 [True , True , True , True , True , True ], # 15 ]) assert isinstance(connected_map, np.ndarray) \ and connected_map.dtype == bool \ and connected_map.shape == (out_channels, in_channels) # Trainable parameters self.tr_params: List[nn.Parameter] = [] self.ks = kernel_size self.connected_map = connected_map self.out_channels = out_channels self.in_channels = in_channels ks = self.ks for i, ch_conn in enumerate(self.connected_map): tr_param = nn.Parameter( torch.randn([1, ch_conn.sum(), ks, ks])) self.register_parameter(f"w_ol_{i}", tr_param) self.tr_params.append(tr_param) # Bias if bias: self.bias = nn.Parameter(torch.randn([out_channels])) else: self.bias = None
def _create_weight(self): """ Create the weight tensor from the trainable parameters. """ ks = self.ks out_c, in_c = self.out_channels, self.in_channels dev = self.tr_params[0].device weight = torch.zeros([out_c, in_c, ks, ks], device=dev) for i, ch_conn in enumerate(self.connected_map): weight[i, ch_conn] = self.tr_params[i] return weight def forward(self, x): weight = self._create_weight() return F.conv2d(x, weight, self.bias)
# %%
[docs] class RBFUnits(nn.Module): """ Radial Basis Function units for the final classification head of LeNet. It is described in equation 7 and related text in :ref:`the paper <lecun1998gradient>`. """
[docs] def __init__(self, in_features: int = 84, out_features: int = 10, param_vect: Union[np.ndarray, str] = "default", requires_grad: bool = False) \ -> None: """ :param in_features: Number of input features. :param out_features: Number of output features. :param param_vect: Parameter vector for the RBF units. Should be a numpy array. If ``default``, then the default digit templates from Figure 3 of :ref:`the paper <lecun1998gradient>` is used. :param requires_grad: If True, the gradient for the template weights is enabled, else no gradient is enabled (no backprop over the template weights) """ super().__init__() self.in_d = in_features self.out_d = out_features if param_vect == "default": param_vect = np.array([ # List of each unit [ # 0; Each unit is a (12, 7) character repr [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 1, 1, 1, 0, 0], [0, 1, 1, 0, 1, 1, 0], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [0, 1, 1, 0, 1, 1, 0], [0, 0, 1, 1, 1, 0, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 1 [0, 0, 0, 1, 1, 0, 0], [0, 0, 1, 1, 1, 0, 0], [0, 1, 1, 1, 1, 0, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 1, 1, 0, 0], [0, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 2 [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [0, 1, 1, 1, 1, 1, 0], [1, 1, 0, 0, 0, 1, 1], [1, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 1, 1, 0], [0, 0, 1, 1, 1, 0, 0], [0, 1, 1, 0, 0, 0, 0], [1, 1, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 3 [1, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 1, 1, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 0, 1, 1], [1, 0, 0, 0, 0, 1, 1], [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 4 [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], [0, 1, 1, 0, 0, 1, 1], [0, 1, 1, 0, 0, 1, 1], [1, 1, 1, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 1, 1, 1], [0, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 0, 1, 1], ], [ # 5 [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1, 1], [1, 1, 0, 0, 0, 0, 0], [1, 1, 0, 0, 0, 0, 0], [0, 1, 1, 1, 1, 0, 0], [0, 0, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 6 [0, 0, 1, 1, 1, 1, 0], [0, 1, 1, 0, 0, 0, 0], [1, 1, 0, 0, 0, 0, 0], [1, 1, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1, 0], [1, 1, 1, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 1, 0, 0, 1, 1], [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 7 [1, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 1, 1, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 0, 1, 1, 0, 0], [0, 0, 1, 1, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0], [0, 0, 1, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 8 [0, 1, 1, 1, 1, 1, 0], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [0, 1, 1, 1, 1, 1, 0], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [0, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ], [ # 9 [0, 1, 1, 1, 1, 1, 0], [1, 1, 0, 0, 1, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 0, 1, 1], [1, 1, 0, 0, 1, 1, 1], [0, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 1, 1, 0], [0, 1, 1, 1, 1, 0, 0], [0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0], ] ], dtype=np.float32) param_vect = param_vect * 2 - 1 # -1 and +1 range param_vect = ein.rearrange(param_vect, "l h w -> l (h w)") assert isinstance(param_vect, np.ndarray) \ and param_vect.shape == (self.out_d, self.in_d) \ and param_vect.dtype == np.float32 self.param_vect = nn.Parameter(torch.from_numpy(param_vect), requires_grad=requires_grad)
def forward(self, x): ret_batching = True if len(x.shape) == 1: # Add b = 1 and remove it in the end ret_batching = False x = ein.rearrange(x, "i -> 1 i") x = ein.rearrange(x, "b i_c -> b 1 i_c") w = ein.rearrange(self.param_vect, "o i_c -> 1 o i_c") res = torch.sum((x - w) ** 2, dim=2) if not ret_batching: # No batching used for input res = res[0] return res
# %%
[docs] class LeNet5(nn.Module): """ LeNet-5 network presented in section 2 of :ref:`the paper <lecun1998gradient>`. Contains the following Modules as members - :py:class:`SubSamplingLayer` - :py:class:`CustomConvLayer` - :py:class:`RBFUnits` - :py:class:`SigmoidSquashingActivation` """
[docs] def __init__(self) -> None: super().__init__() # C1: convolution layer self.c1 = nn.Conv2d(1, 6, 5) self.c1a = SigmoidSquashingActivation() # S2: sub-sampling layer self.s2 = SubSamplingLayer(6) self.s2a = SigmoidSquashingActivation() # C3: custom convolution layer self.c3 = CustomConvLayer(6, 16, 5) self.c3a = SigmoidSquashingActivation() # S4: sub-sampling layer self.s4 = SubSamplingLayer(16) self.s4a = SigmoidSquashingActivation() # C5: convolution layer self.c5 = nn.Conv2d(16, 120, 5) self.c5a = SigmoidSquashingActivation() # F6: fully connected layer self.f6 = nn.Linear(120, 84) self.f6a = SigmoidSquashingActivation() # Out: output layer self.rbf_out = RBFUnits(84, 10)
def forward(self, x): x = self.c1a(self.c1(x)) x = self.s2a(self.s2(x)) x = self.c3a(self.c3(x)) x = self.s4a(self.s4(x)) x = self.c5a(self.c5(x)) x = x.squeeze() # ([b], c=120, 1, 1) -> ([b], c) x = self.f6a(self.f6(x)) x = self.rbf_out(x) return x
# %% if __name__ == "__main__" and "ipykernel" not in sys.argv[0]: print("Testing the LeNet-5 network (forward pass and FLOPS)") use_gpu = torch.cuda.is_available() model = LeNet5() sample_in = torch.randn(16, 1, 32, 32) if use_gpu: print("GPU found") model.cuda() sample_in = sample_in.cuda() else: print("GPU not found, running on CPU") print(f"Model summary: {model}") summary(model, sample_in.shape) start_time = time.time() sample_out = model(sample_in) end_time = time.time() print(f"Model input shape: {sample_in.shape}") print(f"Model output shape: {sample_out.shape}") # Per sample statistic dur = (end_time - start_time)/sample_in.shape[0] freq_hz = 1/dur print(f"Time: {dur:.3f} secs ({freq_hz:.3f} Hz)") # %%