150 lines
5.0 KiB
Python
150 lines
5.0 KiB
Python
from tensorflow import keras
|
|
from keras import layers
|
|
from art import text2art
|
|
import tensorflow as tf
|
|
import numpy as np
|
|
import socket
|
|
import sys
|
|
|
|
# Bloquear el acceso a Internet
|
|
socket.socket = lambda *args, **kwargs: (_ for _ in ()).throw(Exception("Internet access is disabled"))
|
|
|
|
# === Callback Personalizado para Salida Estilizada de Entrenamiento ===
|
|
class CustomTrainingCallback(tf.keras.callbacks.Callback):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.total_epochs = None
|
|
self.bar_length = 40
|
|
|
|
def on_train_begin(self, logs=None):
|
|
self.total_epochs = self.params['epochs']
|
|
print("\033[1;36m" + "=" * 60 + "\033[0m")
|
|
print("\033[1;35mTraining Progress:\033[0m")
|
|
|
|
def on_epoch_end(self, epoch, logs=None):
|
|
current = epoch + 1
|
|
percent = current / self.total_epochs
|
|
filled_len = int(self.bar_length * percent)
|
|
bar = '█' * filled_len + '·' * (self.bar_length - filled_len)
|
|
sys.stdout.write(f"\r\033[1;34m[Epoch {current}/{self.total_epochs}] [{bar}] {percent*100:.1f}%\033[0m")
|
|
sys.stdout.flush()
|
|
|
|
if current == self.total_epochs:
|
|
print("\n\033[1;36m" + "=" * 60 + "\033[0m\n")
|
|
|
|
# === Funciones de Interfaz ===
|
|
def print_banner():
|
|
print("\033[1;34m" + text2art("Miloto AI", font="block"))
|
|
print("\033[1;33m" + "Lottery Prediction Artificial Intelligence".center(60) + "\033[0m")
|
|
print("\033[1;36m" + "=" * 60 + "\033[0m")
|
|
|
|
def print_status(message):
|
|
print(f"\033[1;34m[•]\033[0m \033[1;37m{message}\033[0m")
|
|
|
|
def print_intro():
|
|
print_banner()
|
|
print_status("Starting LotteryAI Prediction System...")
|
|
print("\033[1;36m" + "=" * 60 + "\033[0m")
|
|
|
|
# === Funciones de Carga y Entrenamiento ===
|
|
def load_data():
|
|
try:
|
|
if not tf.io.gfile.exists('data.txt'):
|
|
raise FileNotFoundError("data.txt not found")
|
|
|
|
data = np.genfromtxt('data.txt', delimiter=',', dtype=int)
|
|
if data.size == 0:
|
|
raise ValueError("data.txt is empty")
|
|
|
|
data[data == -1] = 0
|
|
|
|
train_size = int(0.8 * len(data))
|
|
if train_size == 0:
|
|
raise ValueError("Dataset too small to split")
|
|
|
|
train_data = data[:train_size]
|
|
val_data = data[train_size:]
|
|
max_value = np.max(data)
|
|
|
|
return train_data, val_data, max_value
|
|
except Exception as e:
|
|
print(f"\033[1;31mError loading data: {str(e)}\033[0m")
|
|
sys.exit(1)
|
|
|
|
def create_model(num_features, max_value):
|
|
try:
|
|
model = keras.Sequential([
|
|
layers.Embedding(input_dim=max_value+1, output_dim=32),
|
|
layers.LSTM(64, return_sequences=False),
|
|
layers.Dense(num_features, activation='softmax')
|
|
])
|
|
model.compile(
|
|
loss='categorical_crossentropy',
|
|
optimizer='adam',
|
|
metrics=['accuracy']
|
|
)
|
|
return model
|
|
except Exception as e:
|
|
print(f"\033[1;31mError creating model: {str(e)}\033[0m")
|
|
sys.exit(1)
|
|
|
|
def train_model(model, train_data, val_data):
|
|
try:
|
|
callback = CustomTrainingCallback()
|
|
history = model.fit(
|
|
train_data,
|
|
train_data,
|
|
validation_data=(val_data, val_data),
|
|
epochs=100,
|
|
verbose=0, # Desactiva la salida estándar
|
|
callbacks=[callback]
|
|
)
|
|
return history
|
|
except Exception as e:
|
|
print(f"\033[1;31mError training model: {str(e)}\033[0m")
|
|
sys.exit(1)
|
|
|
|
def predict_numbers(model, val_data, num_features):
|
|
try:
|
|
predictions = model.predict(val_data)
|
|
indices = np.argsort(predictions, axis=1)[:, -num_features:]
|
|
predicted_numbers = np.take_along_axis(val_data, indices, axis=1)
|
|
return predicted_numbers
|
|
except Exception as e:
|
|
print(f"\033[1;31mError predicting numbers: {str(e)}\033[0m")
|
|
sys.exit(1)
|
|
|
|
def print_predicted_numbers(predicted_numbers):
|
|
print_status("Generating predictions...")
|
|
print("\033[1;36m" + "=" * 60 + "\033[0m")
|
|
print("\033[1;32m" + "🎯 PREDICTED NUMBERS 🎯".center(60) + "\033[0m")
|
|
print("\033[1;36m" + "-" * 60 + "\033[0m")
|
|
|
|
if predicted_numbers.size > 0:
|
|
print(f"\033[1;37m{', '.join(map(str, predicted_numbers[0]))}\033[0m")
|
|
else:
|
|
print("\033[1;31mNo predictions available\033[0m")
|
|
|
|
print("\033[1;36m" + "=" * 60 + "\033[0m")
|
|
|
|
def main():
|
|
try:
|
|
print("\x1b[H\x1b[2J\x1b[3J") # Limpia la consola
|
|
print_intro()
|
|
|
|
train_data, val_data, max_value = load_data()
|
|
|
|
if train_data.ndim < 2:
|
|
raise ValueError("Training data has invalid dimensions")
|
|
|
|
num_features = train_data.shape[1]
|
|
model = create_model(num_features, max_value)
|
|
train_model(model, train_data, val_data)
|
|
predicted_numbers = predict_numbers(model, val_data, num_features)
|
|
print_predicted_numbers(predicted_numbers)
|
|
except Exception as e:
|
|
print("\033[1;31m[!] Error:\033[0m", str(e))
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |