Module degann.search_algorithms.pattern_search
Expand source code
import random
from typing import Union, Any, Tuple, List
import numpy as np
import tensorflow as tf
from degann import MemoryCleaner, get_all_optimizers, get_all_metric_functions
from degann.networks import activations, imodel, losses
_default_shapes = [
[10, 10, 10, 10, 10, 10],
[80, 80, 80],
[32, 16, 8, 4],
[4, 8, 16, 32],
[10, 10],
[30],
]
def _create_random_network(
min_layers=1,
max_layers=5,
min_neurons=3,
max_neurons=60,
) -> list[Union[list[int], list, list[Union[str, Any]], list[None]]]:
"""
Create random neural network from the passed parameters.
Parameters
----------
min_layers: int
Minimal count of layers in neural net
max_layers: int
Maximal count of layers in neural net
min_neurons: int
Minimal count of neurons per layer
max_neurons: int
Maximal count of neurons per layer
Returns
-------
net: network.INetwork
Random neural network
"""
layers = random.randint(min_layers, max_layers)
shape = [random.randint(min_neurons, max_neurons) for _ in range(layers)]
act = []
decorator_param = []
all_act_names = list(activations.get_all_activations().keys())
for _ in shape:
act.append(random.choice(all_act_names))
# TODO: activation func can take additional arguments
# but at this moment I dont create random arguments (instead of *None* in decorator_params)
decorator_param.append(None)
act.append("linear")
decorator_param.append(None)
nets_param = [shape, act, decorator_param]
return nets_param
def _normalize_array(x: np.ndarray) -> Tuple[np.ndarray, float]:
"""
Scale array from [a, b] to [0, 1]
Parameters
----------
x: np.ndarray
Array for scaling
Returns
-------
x: Tuple[np.ndarray, float]
Scaled array and scale coefficient
"""
m = abs(np.amax(x))
if m != 0:
x = x / m
return x, m
def train(
x_data: np.ndarray, y_data: np.ndarray, **kwargs
) -> tuple[imodel.IModel, dict[Union[str, Any], Any]]:
"""
Choose and return neural network which present the minimal average absolute deviation.
x_data and y_data is numpy 2d arrays (in case we don't have multiple-layer input/output).
Parameters
----------
x_data: np.ndarray
Array of inputs --- [input1, input2, ...]
y_data: np.ndarray
List of outputs --- [output1, output2, ...]
Returns
-------
net: imodel.IModel
Best neural network for this dataset
history: Dict[str, list]
History of training for this network
"""
# default config
args = {
"debug": False,
"eps": 1e-2,
"epochs": 100,
"normalize": False,
"name_salt": "",
"loss_function": "MeanSquaredError",
"optimizer": "SGD",
"metrics": [
"MeanSquaredError",
"MeanAbsoluteError",
"CosineSimilarity",
],
"validation_metrics": [
"MeanSquaredError",
"MeanAbsoluteError",
"CosineSimilarity",
],
"use_rand_net": True,
"net_type": "DenseNet",
"nets_param": [
[
shape, # shape
["sigmoid"] * len(shape) + ["linear"], # activation functions
[None] * (len(shape) + 1), # decorator parameters for activation
]
for shape in _default_shapes
],
}
for kw in kwargs:
args[kw] = kwargs[kw]
if args["debug"]:
print("Start train func")
# determining the number of inputs and outputs of the neural network
if type(x_data[0]) is np.ndarray:
input_len = len(x_data[0])
else:
input_len = 1
if type(y_data[0]) is np.ndarray:
output_len = len(y_data[0])
else:
output_len = 1
# prepare data (normalize)
norm_coff = 1
if args["normalize"]:
x_data, norm_coff = _normalize_array(x_data)
# y_data, norm_coff = _normalize_array(y_data)
if args["debug"]:
print(f"Normalization coefficient is {norm_coff}")
x_data = tf.convert_to_tensor(x_data, dtype=tf.float32)
y_data = tf.convert_to_tensor(y_data, dtype=tf.float32)
# prepare neural networks
if args["debug"]:
print("Prepare neural networks and data")
nets = []
for parameters in args["nets_param"]:
shape: list[int] = parameters[0]
act = parameters[1]
decorator_param = parameters[2]
str_shape = "_".join(map(str, shape))
curr_net = imodel.IModel(
input_size=input_len,
block_size=shape,
output_size=output_len,
activation_func=act,
decorator_params=decorator_param,
net_type=args["net_type"],
name=f"net{args['name_salt']}_{str_shape}",
is_debug=args["debug"],
)
nets.append(curr_net)
if args["use_rand_net"]:
rand_net_params = _create_random_network(input_len, output_len)
str_shape = "_".join(map(str, rand_net_params[0]))
rand_net = imodel.IModel(
input_size=input_len,
block_size=rand_net_params[0],
output_size=output_len,
activation_func=rand_net_params[1],
decorator_params=rand_net_params[2],
net_type=args["net_type"],
name=f"net{args['name_salt']}_{str_shape}",
)
nets.append(rand_net)
# compile
for nn in nets:
nn.compile(
rate=args["eps"],
optimizer=args["optimizer"],
loss_func=args["loss_function"],
metrics=args["metrics"],
# run_eagerly=True,
)
if args["debug"]:
print("Success prepared")
# train
history = []
for i, nn in enumerate(nets):
verb = 0
if args["debug"]:
print(nn)
verb = 1
temp_his = nn.train(
x_data,
y_data,
epochs=args["epochs"],
validation_split=args["validation_split"],
callbacks=[MemoryCleaner()],
verbose=verb,
)
temp_last_res = dict()
for key in temp_his.history:
temp_last_res[key] = temp_his.history[key].copy()
history.append(temp_last_res)
result_net = nets[0]
result_history = history[0]
min_err = history[0]["loss"]
for i in range(1, len(nets)):
if history[i]["loss"] < min_err:
min_err = history[i]["loss"]
result_net = nets[i]
result_history = history[i]
if args["debug"]:
print(f"Minimal loss error is {min_err} {args['name_salt']}")
return result_net, result_history
def pattern_search(
x_data: np.ndarray,
y_data: np.ndarray,
x_val: np.ndarray = None,
y_val: np.ndarray = None,
**kwargs,
) -> list[list[dict, float, float, imodel.IModel]]:
"""
Choose and return neural network which present the minimal average absolute deviation.
x_data and y_data is numpy 2d arrays (in case we don't have multiple-layer input/output).
Parameters
----------
x_data: np.ndarray
Array of inputs --- [input1, input2, ...]
y_data: np.ndarray
List of outputs --- [output1, output2, ...]
x_val: np.ndarray
Array of inputs for validate train
y_val: np.ndarray
Array of outputs for validate train
Returns
-------
net: network.INetwork
Best neural network for this dataset
"""
# default config
args = {
"loss_functions": [key for key in losses.get_all_loss_functions()],
"optimizers": [key for key in get_all_optimizers()],
"metrics": [key for key in get_all_metric_functions()],
"validation_metrics": [key for key in get_all_metric_functions()],
"net_shapes": _default_shapes,
"activations": [key for key in activations.get_all_activations()],
"rates": [1e-2, 5e-3, 1e-3],
"epochs": [50, 200],
"normalize": [False],
}
for arg in args:
if kwargs.get(arg) is not None:
args[arg] = kwargs[arg]
kwargs.pop(arg)
val_data = None
if x_val is not None and y_val is not None:
val_data = (x_val, y_val)
# Networks parameters --- shape and activation functions
nets_param = []
for shape in args["net_shapes"]:
if len(shape) != 0:
for activation in args["activations"]:
nets_param.append(
[
shape,
[activation] * len(shape) + ["linear"],
[None] * (len(shape) + 1),
]
)
else:
nets_param.append(
[
shape,
["linear"],
[None],
]
)
metaparams = []
for loss_func in args["loss_functions"]:
for normalize in args["normalize"]:
for optimizer in args["optimizers"]:
for epochs in args["epochs"]:
for rate in args["rates"]:
metaparams.append(dict())
metaparams[-1]["loss_function"] = loss_func
metaparams[-1]["normalize"] = normalize
metaparams[-1]["optimizer"] = optimizer
metaparams[-1]["epochs"] = epochs
metaparams[-1]["eps"] = rate
metaparams[-1]["metrics"] = args["metrics"]
metaparams[-1]["validation_metrics"] = args[
"validation_metrics"
]
metaparams[-1]["nets_param"] = nets_param
metaparams[-1].update(kwargs)
best_nets: List[List[dict, float, float, imodel.IModel]] = []
if kwargs.get("debug"):
print(f"Grid search size {len(metaparams)}")
print("Amount of networks for each set of parameters", len(nets_param))
for i, params in enumerate(metaparams):
if kwargs.get("debug"):
print(f"Number of set {i}")
trained, history = train(x_data, y_data, val_data=val_data, **params)
loss = history["loss"][-1]
val_loss = history["val_loss"][-1]
best_nets.append([params, loss, val_loss, trained])
best_nets.sort(key=lambda x: [x[1], x[2]])
return best_nets
Functions
def pattern_search(x_data: numpy.ndarray, y_data: numpy.ndarray, x_val: numpy.ndarray = None, y_val: numpy.ndarray = None, **kwargs) ‑> list[list[dict, float, float, IModel]]
-
Choose and return neural network which present the minimal average absolute deviation. x_data and y_data is numpy 2d arrays (in case we don't have multiple-layer input/output).
Parameters
x_data
:np.ndarray
- Array of inputs — [input1, input2, …]
y_data
:np.ndarray
- List of outputs — [output1, output2, …]
x_val
:np.ndarray
- Array of inputs for validate train
y_val
:np.ndarray
- Array of outputs for validate train
Returns
net
:network.INetwork
- Best neural network for this dataset
Expand source code
def pattern_search( x_data: np.ndarray, y_data: np.ndarray, x_val: np.ndarray = None, y_val: np.ndarray = None, **kwargs, ) -> list[list[dict, float, float, imodel.IModel]]: """ Choose and return neural network which present the minimal average absolute deviation. x_data and y_data is numpy 2d arrays (in case we don't have multiple-layer input/output). Parameters ---------- x_data: np.ndarray Array of inputs --- [input1, input2, ...] y_data: np.ndarray List of outputs --- [output1, output2, ...] x_val: np.ndarray Array of inputs for validate train y_val: np.ndarray Array of outputs for validate train Returns ------- net: network.INetwork Best neural network for this dataset """ # default config args = { "loss_functions": [key for key in losses.get_all_loss_functions()], "optimizers": [key for key in get_all_optimizers()], "metrics": [key for key in get_all_metric_functions()], "validation_metrics": [key for key in get_all_metric_functions()], "net_shapes": _default_shapes, "activations": [key for key in activations.get_all_activations()], "rates": [1e-2, 5e-3, 1e-3], "epochs": [50, 200], "normalize": [False], } for arg in args: if kwargs.get(arg) is not None: args[arg] = kwargs[arg] kwargs.pop(arg) val_data = None if x_val is not None and y_val is not None: val_data = (x_val, y_val) # Networks parameters --- shape and activation functions nets_param = [] for shape in args["net_shapes"]: if len(shape) != 0: for activation in args["activations"]: nets_param.append( [ shape, [activation] * len(shape) + ["linear"], [None] * (len(shape) + 1), ] ) else: nets_param.append( [ shape, ["linear"], [None], ] ) metaparams = [] for loss_func in args["loss_functions"]: for normalize in args["normalize"]: for optimizer in args["optimizers"]: for epochs in args["epochs"]: for rate in args["rates"]: metaparams.append(dict()) metaparams[-1]["loss_function"] = loss_func metaparams[-1]["normalize"] = normalize metaparams[-1]["optimizer"] = optimizer metaparams[-1]["epochs"] = epochs metaparams[-1]["eps"] = rate metaparams[-1]["metrics"] = args["metrics"] metaparams[-1]["validation_metrics"] = args[ "validation_metrics" ] metaparams[-1]["nets_param"] = nets_param metaparams[-1].update(kwargs) best_nets: List[List[dict, float, float, imodel.IModel]] = [] if kwargs.get("debug"): print(f"Grid search size {len(metaparams)}") print("Amount of networks for each set of parameters", len(nets_param)) for i, params in enumerate(metaparams): if kwargs.get("debug"): print(f"Number of set {i}") trained, history = train(x_data, y_data, val_data=val_data, **params) loss = history["loss"][-1] val_loss = history["val_loss"][-1] best_nets.append([params, loss, val_loss, trained]) best_nets.sort(key=lambda x: [x[1], x[2]]) return best_nets
def train(x_data: numpy.ndarray, y_data: numpy.ndarray, **kwargs) ‑> tuple[IModel, dict[typing.Union[str, typing.Any], typing.Any]]
-
Choose and return neural network which present the minimal average absolute deviation. x_data and y_data is numpy 2d arrays (in case we don't have multiple-layer input/output).
Parameters
x_data
:np.ndarray
- Array of inputs — [input1, input2, …]
y_data
:np.ndarray
- List of outputs — [output1, output2, …]
Returns
net
:imodel.IModel
- Best neural network for this dataset
history
:Dict[str, list]
- History of training for this network
Expand source code
def train( x_data: np.ndarray, y_data: np.ndarray, **kwargs ) -> tuple[imodel.IModel, dict[Union[str, Any], Any]]: """ Choose and return neural network which present the minimal average absolute deviation. x_data and y_data is numpy 2d arrays (in case we don't have multiple-layer input/output). Parameters ---------- x_data: np.ndarray Array of inputs --- [input1, input2, ...] y_data: np.ndarray List of outputs --- [output1, output2, ...] Returns ------- net: imodel.IModel Best neural network for this dataset history: Dict[str, list] History of training for this network """ # default config args = { "debug": False, "eps": 1e-2, "epochs": 100, "normalize": False, "name_salt": "", "loss_function": "MeanSquaredError", "optimizer": "SGD", "metrics": [ "MeanSquaredError", "MeanAbsoluteError", "CosineSimilarity", ], "validation_metrics": [ "MeanSquaredError", "MeanAbsoluteError", "CosineSimilarity", ], "use_rand_net": True, "net_type": "DenseNet", "nets_param": [ [ shape, # shape ["sigmoid"] * len(shape) + ["linear"], # activation functions [None] * (len(shape) + 1), # decorator parameters for activation ] for shape in _default_shapes ], } for kw in kwargs: args[kw] = kwargs[kw] if args["debug"]: print("Start train func") # determining the number of inputs and outputs of the neural network if type(x_data[0]) is np.ndarray: input_len = len(x_data[0]) else: input_len = 1 if type(y_data[0]) is np.ndarray: output_len = len(y_data[0]) else: output_len = 1 # prepare data (normalize) norm_coff = 1 if args["normalize"]: x_data, norm_coff = _normalize_array(x_data) # y_data, norm_coff = _normalize_array(y_data) if args["debug"]: print(f"Normalization coefficient is {norm_coff}") x_data = tf.convert_to_tensor(x_data, dtype=tf.float32) y_data = tf.convert_to_tensor(y_data, dtype=tf.float32) # prepare neural networks if args["debug"]: print("Prepare neural networks and data") nets = [] for parameters in args["nets_param"]: shape: list[int] = parameters[0] act = parameters[1] decorator_param = parameters[2] str_shape = "_".join(map(str, shape)) curr_net = imodel.IModel( input_size=input_len, block_size=shape, output_size=output_len, activation_func=act, decorator_params=decorator_param, net_type=args["net_type"], name=f"net{args['name_salt']}_{str_shape}", is_debug=args["debug"], ) nets.append(curr_net) if args["use_rand_net"]: rand_net_params = _create_random_network(input_len, output_len) str_shape = "_".join(map(str, rand_net_params[0])) rand_net = imodel.IModel( input_size=input_len, block_size=rand_net_params[0], output_size=output_len, activation_func=rand_net_params[1], decorator_params=rand_net_params[2], net_type=args["net_type"], name=f"net{args['name_salt']}_{str_shape}", ) nets.append(rand_net) # compile for nn in nets: nn.compile( rate=args["eps"], optimizer=args["optimizer"], loss_func=args["loss_function"], metrics=args["metrics"], # run_eagerly=True, ) if args["debug"]: print("Success prepared") # train history = [] for i, nn in enumerate(nets): verb = 0 if args["debug"]: print(nn) verb = 1 temp_his = nn.train( x_data, y_data, epochs=args["epochs"], validation_split=args["validation_split"], callbacks=[MemoryCleaner()], verbose=verb, ) temp_last_res = dict() for key in temp_his.history: temp_last_res[key] = temp_his.history[key].copy() history.append(temp_last_res) result_net = nets[0] result_history = history[0] min_err = history[0]["loss"] for i in range(1, len(nets)): if history[i]["loss"] < min_err: min_err = history[i]["loss"] result_net = nets[i] result_history = history[i] if args["debug"]: print(f"Minimal loss error is {min_err} {args['name_salt']}") return result_net, result_history