Time Series Analysis in Machine Learning: Complete Guide with Python
AI-Generated Content Notice
Some code examples and technical explanations in this article were generated with AI assistance. The content has been reviewed for accuracy, but please test any code snippets in your development environment before using them.
Time Series Analysis in Machine Learning: Complete Guide with Python
Introduction
Time series analysis is crucial for understanding temporal patterns in data - from stock prices to sensor readings to web traffic. Unlike traditional machine learning, time series data has inherent ordering and temporal dependencies that require specialized techniques.
This comprehensive guide covers essential time series analysis methods, from traditional statistical approaches to modern deep learning techniques, with practical Python implementations.
Understanding Time Series Data
Components of Time Series
Every time series can be decomposed into:
- Trend: Long-term increase or decrease
- Seasonality: Regular, predictable patterns
- Cyclical: Irregular, long-term fluctuations
- Noise: Random variation
Time Series Characteristics
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
import warnings
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8')
class TimeSeriesAnalyzer:
"""Comprehensive time series analyzer"""
def __init__(self, random_state: int = 42):
self.random_state = random_state
np.random.seed(random_state)
def generate_sample_data(self, n_points: int = 1000) -> Dict:
"""Generate sample time series with different patterns"""
# Time index
dates = pd.date_range(start='2020-01-01', periods=n_points, freq='D')
# Generate different patterns
t = np.arange(n_points)
# Trend component
trend = 0.05 * t + 10
# Seasonal component (yearly and weekly)
seasonal_yearly = 5 * np.sin(2 * np.pi * t / 365.25)
seasonal_weekly = 2 * np.sin(2 * np.pi * t / 7)
# Cyclical component (irregular)
cyclical = 3 * np.sin(2 * np.pi * t / 200 + np.random.normal(0, 0.1, n_points))
# Noise
noise = np.random.normal(0, 1, n_points)
# Combine components
ts_additive = trend + seasonal_yearly + seasonal_weekly + cyclical + noise
ts_multiplicative = trend * (1 + 0.1 * seasonal_yearly/5) * (1 + 0.05 * seasonal_weekly/2) + noise
# Create different series types
ts_stationary = np.random.normal(0, 1, n_points) # White noise
ts_random_walk = np.cumsum(np.random.normal(0, 1, n_points)) # Random walk
ts_with_outliers = ts_additive.copy()
outlier_indices = np.random.choice(n_points, size=20, replace=False)
ts_with_outliers[outlier_indices] += np.random.normal(0, 10, 20)
data = {
'dates': dates,
'additive': ts_additive,
'multiplicative': ts_multiplicative,
'stationary': ts_stationary,
'random_walk': ts_random_walk,
'with_outliers': ts_with_outliers,
'components': {
'trend': trend,
'seasonal_yearly': seasonal_yearly,
'seasonal_weekly': seasonal_weekly,
'cyclical': cyclical,
'noise': noise
}
}
return data
def plot_time_series_patterns(self, data: Dict):
"""Plot different time series patterns"""
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
axes = axes.flatten()
series_names = ['additive', 'multiplicative', 'stationary', 'random_walk', 'with_outliers']
titles = ['Additive Model', 'Multiplicative Model', 'Stationary (White Noise)',
'Random Walk', 'Series with Outliers']
for i, (name, title) in enumerate(zip(series_names, titles)):
axes[i].plot(data['dates'], data[name], linewidth=1, alpha=0.8)
axes[i].set_title(title, fontweight='bold')
axes[i].set_xlabel('Date')
axes[i].set_ylabel('Value')
axes[i].grid(True, alpha=0.3)
# Plot components
components = data['components']
axes[5].plot(data['dates'], components['trend'], label='Trend', linewidth=2)
axes[5].plot(data['dates'], components['seasonal_yearly'], label='Yearly Seasonal', linewidth=1)
axes[5].plot(data['dates'], components['seasonal_weekly'], label='Weekly Seasonal', linewidth=1)
axes[5].plot(data['dates'], components['noise'], label='Noise', alpha=0.5, linewidth=0.5)
axes[5].set_title('Time Series Components', fontweight='bold')
axes[5].set_xlabel('Date')
axes[5].set_ylabel('Value')
axes[5].legend()
axes[5].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def test_stationarity(self, series: np.ndarray, series_name: str = "Series"):
"""Test for stationarity using statistical tests"""
try:
from statsmodels.tsa.stattools import adfuller
from statsmodels.stats.diagnostic import acorr_ljungbox
except ImportError:
print("statsmodels not available. Install with: pip install statsmodels")
return self._manual_stationarity_test(series, series_name)
# Augmented Dickey-Fuller test
result = adfuller(series, autolag='AIC')
print(f"\nStationarity Test for {series_name}:")
print("-" * 40)
print(f"ADF Statistic: {result[0]:.6f}")
print(f"p-value: {result[1]:.6f}")
print(f"Critical Values:")
for key, value in result[4].items():
print(f"\t{key}: {value:.3f}")
if result[1] <= 0.05:
print("✓ Series is stationary (reject null hypothesis)")
else:
print("✗ Series is non-stationary (fail to reject null hypothesis)")
return result[1] <= 0.05
def _manual_stationarity_test(self, series: np.ndarray, series_name: str):
"""Manual stationarity assessment"""
# Split series into chunks and compare means/variances
n_chunks = 5
chunk_size = len(series) // n_chunks
means = []
variances = []
for i in range(n_chunks):
start_idx = i * chunk_size
end_idx = (i + 1) * chunk_size if i < n_chunks - 1 else len(series)
chunk = series[start_idx:end_idx]
means.append(np.mean(chunk))
variances.append(np.var(chunk))
mean_stability = np.std(means) / np.mean(np.abs(means)) if np.mean(np.abs(means)) > 0 else np.inf
var_stability = np.std(variances) / np.mean(variances) if np.mean(variances) > 0 else np.inf
print(f"\nManual Stationarity Assessment for {series_name}:")
print("-" * 40)
print(f"Mean stability (lower is better): {mean_stability:.4f}")
print(f"Variance stability (lower is better): {var_stability:.4f}")
is_stationary = mean_stability < 0.1 and var_stability < 0.5
if is_stationary:
print("✓ Series appears stationary")
else:
print("✗ Series appears non-stationary")
return is_stationary
def make_stationary(self, series: np.ndarray, method: str = 'diff') -> np.ndarray:
"""Make series stationary using different methods"""
if method == 'diff':
# First difference
return np.diff(series)
elif method == 'log_diff':
# Log difference (for positive values)
if np.any(series <= 0):
print("Warning: Log difference requires positive values. Using regular difference.")
return np.diff(series)
log_series = np.log(series)
return np.diff(log_series)
elif method == 'detrend':
# Remove linear trend
x = np.arange(len(series))
slope, intercept = np.polyfit(x, series, 1)
trend = slope * x + intercept
return series - trend
else:
raise ValueError(f"Unknown method: {method}")
def plot_stationarity_analysis(self, data: Dict):
"""Plot stationarity analysis for different series"""
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
series_to_analyze = ['additive', 'random_walk', 'stationary']
for i, series_name in enumerate(series_to_analyze):
series = data[series_name]
# Original series
axes[0, i].plot(data['dates'], series, linewidth=1)
axes[0, i].set_title(f'Original: {series_name.replace("_", " ").title()}', fontweight='bold')
axes[0, i].grid(True, alpha=0.3)
# Test stationarity
is_stationary = self.test_stationarity(series, series_name)
# Make stationary if needed
if not is_stationary and series_name != 'stationary':
stationary_series = self.make_stationary(series, method='diff')
axes[1, i].plot(data['dates'][1:], stationary_series, linewidth=1, color='orange')
axes[1, i].set_title(f'After Differencing: {series_name.replace("_", " ").title()}',
fontweight='bold')
# Test again
self.test_stationarity(stationary_series, f"{series_name} (differenced)")
else:
axes[1, i].plot(data['dates'], series, linewidth=1, color='green')
axes[1, i].set_title(f'Already Stationary: {series_name.replace("_", " ").title()}',
fontweight='bold')
axes[1, i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Generate and analyze sample data
print("Generating Sample Time Series Data...")
ts_analyzer = TimeSeriesAnalyzer()
sample_data = ts_analyzer.generate_sample_data(n_points=1000)
print("\n" + "="*60)
print("TIME SERIES PATTERNS VISUALIZATION")
print("="*60)
ts_analyzer.plot_time_series_patterns(sample_data)
print("\n" + "="*60)
print("STATIONARITY ANALYSIS")
print("="*60)
ts_analyzer.plot_stationarity_analysis(sample_data)
Seasonal Decomposition
Classical and STL Decomposition
class SeasonalDecompositionAnalyzer:
"""Seasonal decomposition analyzer"""
def __init__(self):
pass
def classical_decomposition(self, series: np.ndarray, period: int = 12,
model: str = 'additive') -> Dict:
"""Perform classical seasonal decomposition"""
n = len(series)
# Calculate trend using moving average
trend = np.full(n, np.nan)
half_period = period // 2
if period % 2 == 0: # Even period
for i in range(half_period, n - half_period):
trend[i] = np.mean(series[i-half_period:i+half_period+1])
else: # Odd period
for i in range(half_period, n - half_period):
trend[i] = np.mean(series[i-half_period:i+half_period+1])
# Calculate seasonal component
if model == 'additive':
detrended = series - trend
else: # multiplicative
detrended = series / trend
# Average seasonal pattern
seasonal = np.full(n, np.nan)
seasonal_pattern = np.full(period, np.nan)
for i in range(period):
season_values = []
for j in range(i, n, period):
if not np.isnan(detrended[j]):
season_values.append(detrended[j])
if season_values:
seasonal_pattern[i] = np.mean(season_values)
# Repeat seasonal pattern
for i in range(n):
seasonal[i] = seasonal_pattern[i % period]
# Calculate residual
if model == 'additive':
residual = series - trend - seasonal
else: # multiplicative
residual = series / (trend * seasonal)
return {
'original': series,
'trend': trend,
'seasonal': seasonal,
'residual': residual,
'model': model
}
def plot_decomposition(self, decomposition: Dict, dates: pd.DatetimeIndex = None):
"""Plot seasonal decomposition results"""
if dates is None:
dates = range(len(decomposition['original']))
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
components = ['original', 'trend', 'seasonal', 'residual']
titles = ['Original', 'Trend', 'Seasonal', 'Residual']
for i, (component, title) in enumerate(zip(components, titles)):
data = decomposition[component]
axes[i].plot(dates, data, linewidth=1)
axes[i].set_title(f'{title} Component', fontweight='bold')
axes[i].grid(True, alpha=0.3)
if i == len(components) - 1:
axes[i].set_xlabel('Time')
model_type = decomposition['model'].capitalize()
fig.suptitle(f'{model_type} Seasonal Decomposition', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()
def analyze_seasonality_strength(self, decomposition: Dict) -> Dict:
"""Analyze the strength of seasonal and trend components"""
original = decomposition['original']
trend = decomposition['trend']
seasonal = decomposition['seasonal']
residual = decomposition['residual']
# Remove NaN values for calculations
valid_idx = ~(np.isnan(trend) | np.isnan(seasonal) | np.isnan(residual))
if not np.any(valid_idx):
return {'error': 'No valid data points for analysis'}
orig_valid = original[valid_idx]
trend_valid = trend[valid_idx]
seasonal_valid = seasonal[valid_idx]
residual_valid = residual[valid_idx]
# Calculate strength measures
if decomposition['model'] == 'additive':
# For additive model
total_var = np.var(orig_valid)
trend_var = np.var(trend_valid)
seasonal_var = np.var(seasonal_valid)
residual_var = np.var(residual_valid)
trend_strength = 1 - (residual_var / (trend_var + residual_var)) if (trend_var + residual_var) > 0 else 0
seasonal_strength = 1 - (residual_var / (seasonal_var + residual_var)) if (seasonal_var + residual_var) > 0 else 0
else:
# For multiplicative model - work with log-transformed data
log_orig = np.log(np.abs(orig_valid) + 1e-8)
log_trend = np.log(np.abs(trend_valid) + 1e-8)
log_seasonal = np.log(np.abs(seasonal_valid) + 1e-8)
log_residual = np.log(np.abs(residual_valid) + 1e-8)
total_var = np.var(log_orig)
trend_var = np.var(log_trend)
seasonal_var = np.var(log_seasonal)
residual_var = np.var(log_residual)
trend_strength = 1 - (residual_var / (trend_var + residual_var)) if (trend_var + residual_var) > 0 else 0
seasonal_strength = 1 - (residual_var / (seasonal_var + residual_var)) if (seasonal_var + residual_var) > 0 else 0
return {
'trend_strength': max(0, min(1, trend_strength)),
'seasonal_strength': max(0, min(1, seasonal_strength)),
'trend_variance': trend_var,
'seasonal_variance': seasonal_var,
'residual_variance': residual_var,
'total_variance': total_var
}
def compare_decomposition_models(self, series: np.ndarray, dates: pd.DatetimeIndex,
period: int = 12):
"""Compare additive vs multiplicative decomposition"""
# Additive decomposition
add_decomp = self.classical_decomposition(series, period=period, model='additive')
add_strength = self.analyze_seasonality_strength(add_decomp)
# Multiplicative decomposition
mult_decomp = self.classical_decomposition(series, period=period, model='multiplicative')
mult_strength = self.analyze_seasonality_strength(mult_decomp)
# Plot comparison
fig, axes = plt.subplots(2, 4, figsize=(20, 10))
components = ['original', 'trend', 'seasonal', 'residual']
titles = ['Original', 'Trend', 'Seasonal', 'Residual']
# Additive model (top row)
for i, (component, title) in enumerate(zip(components, titles)):
axes[0, i].plot(dates, add_decomp[component], linewidth=1, color='blue')
axes[0, i].set_title(f'Additive: {title}', fontweight='bold')
axes[0, i].grid(True, alpha=0.3)
# Multiplicative model (bottom row)
for i, (component, title) in enumerate(zip(components, titles)):
axes[1, i].plot(dates, mult_decomp[component], linewidth=1, color='red')
axes[1, i].set_title(f'Multiplicative: {title}', fontweight='bold')
axes[1, i].grid(True, alpha=0.3)
axes[1, i].set_xlabel('Time')
plt.tight_layout()
plt.show()
# Print comparison
print("\nDecomposition Model Comparison:")
print("-" * 40)
print(f"Additive Model:")
print(f" Trend Strength: {add_strength.get('trend_strength', 0):.3f}")
print(f" Seasonal Strength: {add_strength.get('seasonal_strength', 0):.3f}")
print(f" Residual Variance: {add_strength.get('residual_variance', 0):.3f}")
print(f"\nMultiplicative Model:")
print(f" Trend Strength: {mult_strength.get('trend_strength', 0):.3f}")
print(f" Seasonal Strength: {mult_strength.get('seasonal_strength', 0):.3f}")
print(f" Residual Variance: {mult_strength.get('residual_variance', 0):.3f}")
# Recommend model
add_score = add_strength.get('trend_strength', 0) + add_strength.get('seasonal_strength', 0)
mult_score = mult_strength.get('trend_strength', 0) + mult_strength.get('seasonal_strength', 0)
recommended = "Additive" if add_score > mult_score else "Multiplicative"
print(f"\nRecommended Model: {recommended}")
return {'additive': add_decomp, 'multiplicative': mult_decomp}
# Analyze seasonal decomposition
decomp_analyzer = SeasonalDecompositionAnalyzer()
print("\n" + "="*60)
print("SEASONAL DECOMPOSITION ANALYSIS")
print("="*60)
# Use the additive series for decomposition
series_for_decomp = sample_data['additive']
dates_for_decomp = sample_data['dates']
# Perform decomposition with yearly seasonality (assuming daily data)
decomposition_results = decomp_analyzer.compare_decomposition_models(
series_for_decomp, dates_for_decomp, period=365
)
# Show individual decomposition
print("\nDetailed Additive Decomposition:")
add_decomp = decomposition_results['additive']
decomp_analyzer.plot_decomposition(add_decomp, dates_for_decomp)
ARIMA Modeling
ARIMA Implementation and Analysis
class ARIMAAnalyzer:
"""ARIMA model analyzer"""
def __init__(self, random_state: int = 42):
self.random_state = random_state
def find_arima_order(self, series: np.ndarray, max_p: int = 5, max_d: int = 2,
max_q: int = 5) -> Dict:
"""Find optimal ARIMA order using information criteria"""
try:
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.diagnostic import acorr_ljungbox
except ImportError:
print("statsmodels not available. Using manual ARIMA implementation.")
return self._manual_arima_order_selection(series, max_p, max_d, max_q)
results = []
print("Searching for optimal ARIMA order...")
for p in range(max_p + 1):
for d in range(max_d + 1):
for q in range(max_q + 1):
try:
model = ARIMA(series, order=(p, d, q))
fitted_model = model.fit()
aic = fitted_model.aic
bic = fitted_model.bic
results.append({
'order': (p, d, q),
'aic': aic,
'bic': bic,
'model': fitted_model
})
except Exception as e:
continue
if not results:
print("No valid ARIMA models found")
return {}
# Sort by AIC
results.sort(key=lambda x: x['aic'])
best_model = results[0]
print(f"Best ARIMA order by AIC: {best_model['order']}")
print(f"AIC: {best_model['aic']:.2f}, BIC: {best_model['bic']:.2f}")
return {
'best_order': best_model['order'],
'best_model': best_model['model'],
'all_results': results[:10] # Top 10 models
}
def _manual_arima_order_selection(self, series: np.ndarray, max_p: int, max_d: int, max_q: int):
"""Manual ARIMA order selection using simple criteria"""
print("Using manual ARIMA order selection...")
# Simple differencing test
d_order = 0
test_series = series.copy()
for d in range(max_d + 1):
if d > 0:
test_series = np.diff(test_series)
# Check if series looks stationary (variance stabilization)
if len(test_series) < 50:
break
first_half_var = np.var(test_series[:len(test_series)//2])
second_half_var = np.var(test_series[len(test_series)//2:])
var_ratio = max(first_half_var, second_half_var) / (min(first_half_var, second_half_var) + 1e-8)
if var_ratio < 3: # Roughly stationary
d_order = d
break
# Simple AR/MA order selection based on autocorrelation
p_order = min(3, max_p) # Default to moderate AR order
q_order = min(2, max_q) # Default to moderate MA order
print(f"Selected ARIMA order: ({p_order}, {d_order}, {q_order})")
return {
'best_order': (p_order, d_order, q_order),
'best_model': None,
'all_results': []
}
def fit_and_forecast(self, series: np.ndarray, arima_order: Tuple[int, int, int],
forecast_steps: int = 30) -> Dict:
"""Fit ARIMA model and generate forecasts"""
try:
from statsmodels.tsa.arima.model import ARIMA
except ImportError:
return self._manual_arima_forecast(series, arima_order, forecast_steps)
# Split data
train_size = int(len(series) * 0.8)
train_data = series[:train_size]
test_data = series[train_size:]
# Fit model
model = ARIMA(train_data, order=arima_order)
fitted_model = model.fit()
# In-sample predictions
in_sample_pred = fitted_model.fittedvalues
# Out-of-sample predictions
forecast_result = fitted_model.forecast(steps=len(test_data))
out_sample_pred = forecast_result
# Future predictions
future_forecast = fitted_model.forecast(steps=forecast_steps)
# Calculate metrics
train_mse = mean_squared_error(train_data[1:], in_sample_pred[1:]) # Skip first value due to differencing
test_mse = mean_squared_error(test_data, out_sample_pred)
return {
'model': fitted_model,
'train_data': train_data,
'test_data': test_data,
'in_sample_pred': in_sample_pred,
'out_sample_pred': out_sample_pred,
'future_forecast': future_forecast,
'train_mse': train_mse,
'test_mse': test_mse,
'order': arima_order
}
def _manual_arima_forecast(self, series: np.ndarray, arima_order: Tuple[int, int, int],
forecast_steps: int) -> Dict:
"""Manual ARIMA forecasting using simple methods"""
p, d, q = arima_order
# Split data
train_size = int(len(series) * 0.8)
train_data = series[:train_size]
test_data = series[train_size:]
# Simple forecasting based on trend and seasonality
if d > 0:
# Differenced series - use simple autoregression
diff_series = np.diff(train_data, n=d)
# Simple AR model - use last p values
last_values = diff_series[-p:] if len(diff_series) >= p else diff_series
forecast_diff = np.mean(last_values)
# Convert back to original scale
last_original = train_data[-1]
out_sample_pred = [last_original + forecast_diff * (i + 1) for i in range(len(test_data))]
future_forecast = [out_sample_pred[-1] + forecast_diff * (i + 1) for i in range(forecast_steps)]
else:
# No differencing - use simple trend
trend = (train_data[-1] - train_data[0]) / len(train_data)
out_sample_pred = [train_data[-1] + trend * (i + 1) for i in range(len(test_data))]
future_forecast = [out_sample_pred[-1] + trend * (i + 1) for i in range(forecast_steps)]
# Simple in-sample prediction (moving average)
in_sample_pred = np.convolve(train_data, np.ones(min(5, len(train_data))), mode='same') / min(5, len(train_data))
# Calculate metrics
train_mse = mean_squared_error(train_data, in_sample_pred)
test_mse = mean_squared_error(test_data, out_sample_pred)
return {
'model': None,
'train_data': train_data,
'test_data': test_data,
'in_sample_pred': in_sample_pred,
'out_sample_pred': np.array(out_sample_pred),
'future_forecast': np.array(future_forecast),
'train_mse': train_mse,
'test_mse': test_mse,
'order': arima_order
}
def plot_arima_results(self, results: Dict, dates: pd.DatetimeIndex = None):
"""Plot ARIMA model results"""
if dates is None:
dates = range(len(results['train_data']) + len(results['test_data']))
train_size = len(results['train_data'])
test_size = len(results['test_data'])
forecast_size = len(results['future_forecast'])
# Create extended date range
if isinstance(dates, pd.DatetimeIndex):
train_dates = dates[:train_size]
test_dates = dates[train_size:train_size + test_size]
future_dates = pd.date_range(start=dates[-1], periods=forecast_size + 1, freq='D')[1:]
else:
train_dates = list(range(train_size))
test_dates = list(range(train_size, train_size + test_size))
future_dates = list(range(train_size + test_size, train_size + test_size + forecast_size))
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))
# Plot 1: Full time series with predictions
ax1.plot(train_dates, results['train_data'], label='Training Data', color='blue', linewidth=1)
ax1.plot(test_dates, results['test_data'], label='Test Data', color='green', linewidth=1)
ax1.plot(train_dates[1:], results['in_sample_pred'][1:], label='In-sample Prediction',
color='orange', linewidth=1, alpha=0.7)
ax1.plot(test_dates, results['out_sample_pred'], label='Out-sample Prediction',
color='red', linewidth=2)
ax1.plot(future_dates, results['future_forecast'], label='Future Forecast',
color='purple', linewidth=2, linestyle='--')
ax1.set_title(f'ARIMA{results["order"]} Model Results', fontweight='bold')
ax1.set_xlabel('Time')
ax1.set_ylabel('Value')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Plot 2: Residuals analysis
if results['model'] is not None and hasattr(results['model'], 'resid'):
residuals = results['model'].resid
ax2.plot(train_dates[1:], residuals[1:], color='red', alpha=0.7, linewidth=1)
ax2.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax2.set_title('Residuals', fontweight='bold')
ax2.set_xlabel('Time')
ax2.set_ylabel('Residuals')
ax2.grid(True, alpha=0.3)
else:
# Manual residuals calculation
residuals = results['train_data'][1:] - results['in_sample_pred'][1:]
ax2.plot(train_dates[1:], residuals, color='red', alpha=0.7, linewidth=1)
ax2.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax2.set_title('Residuals (Manual Calculation)', fontweight='bold')
ax2.set_xlabel('Time')
ax2.set_ylabel('Residuals')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Print metrics
print(f"\nARIMA{results['order']} Model Performance:")
print("-" * 40)
print(f"Training MSE: {results['train_mse']:.4f}")
print(f"Test MSE: {results['test_mse']:.4f}")
print(f"Test RMSE: {np.sqrt(results['test_mse']):.4f}")
# Analyze ARIMA models
arima_analyzer = ARIMAAnalyzer()
print("\n" + "="*60)
print("ARIMA MODEL ANALYSIS")
print("="*60)
# Use stationary-like series for ARIMA
series_for_arima = sample_data['additive'] # This has trend and seasonality
# Find optimal ARIMA order
arima_order_results = arima_analyzer.find_arima_order(series_for_arima, max_p=3, max_d=2, max_q=3)
if arima_order_results:
# Fit model and forecast
arima_results = arima_analyzer.fit_and_forecast(
series_for_arima,
arima_order_results['best_order'],
forecast_steps=50
)
# Plot results
arima_analyzer.plot_arima_results(arima_results, sample_data['dates'])
Deep Learning for Time Series
LSTM Implementation
class LSTMTimeSeriesAnalyzer:
"""LSTM-based time series analyzer"""
def __init__(self, random_state: int = 42):
self.random_state = random_state
np.random.seed(random_state)
def create_sequences(self, data: np.ndarray, seq_length: int, forecast_horizon: int = 1) -> Tuple[np.ndarray, np.ndarray]:
"""Create sequences for LSTM training"""
X, y = [], []
for i in range(len(data) - seq_length - forecast_horizon + 1):
X.append(data[i:(i + seq_length)])
y.append(data[i + seq_length:i + seq_length + forecast_horizon])
return np.array(X), np.array(y)
def build_lstm_model(self, input_shape: Tuple[int, int], forecast_horizon: int = 1) -> object:
"""Build LSTM model (simplified implementation)"""
try:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
model = Sequential([
LSTM(50, return_sequences=True, input_shape=input_shape),
Dropout(0.2),
LSTM(50, return_sequences=False),
Dropout(0.2),
Dense(25),
Dense(forecast_horizon)
])
model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])
return model
except ImportError:
print("TensorFlow not available. Using simple RNN implementation.")
return self._simple_rnn_model(input_shape, forecast_horizon)
def _simple_rnn_model(self, input_shape: Tuple[int, int], forecast_horizon: int = 1):
"""Simple RNN implementation when TensorFlow is not available"""
class SimpleRNN:
def __init__(self, input_shape, forecast_horizon):
self.seq_length, self.n_features = input_shape
self.forecast_horizon = forecast_horizon
self.weights = None
self.trained = False
def fit(self, X, y, epochs=50, verbose=0):
# Simple linear combination for demonstration
X_flat = X.reshape(X.shape[0], -1)
# Ridge regression-like solution
from sklearn.linear_model import Ridge
self.model = Ridge(alpha=1.0)
self.model.fit(X_flat, y.reshape(y.shape[0], -1))
self.trained = True
return {'loss': [0.1] * epochs} # Dummy history
def predict(self, X):
if not self.trained:
return np.zeros((X.shape[0], self.forecast_horizon))
X_flat = X.reshape(X.shape[0], -1)
predictions = self.model.predict(X_flat)
return predictions.reshape(-1, self.forecast_horizon)
return SimpleRNN(input_shape, forecast_horizon)
def train_lstm(self, series: np.ndarray, seq_length: int = 30,
forecast_horizon: int = 1, test_size: float = 0.2) -> Dict:
"""Train LSTM model on time series"""
# Normalize data
scaler = MinMaxScaler()
series_scaled = scaler.fit_transform(series.reshape(-1, 1)).flatten()
# Create sequences
X, y = self.create_sequences(series_scaled, seq_length, forecast_horizon)
# Reshape for LSTM (samples, time steps, features)
X = X.reshape((X.shape[0], X.shape[1], 1))
# Split data
split_idx = int(len(X) * (1 - test_size))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Build and train model
model = self.build_lstm_model((seq_length, 1), forecast_horizon)
print(f"Training LSTM model with {len(X_train)} training samples...")
try:
history = model.fit(X_train, y_train, epochs=50, batch_size=32,
validation_split=0.2, verbose=0)
except Exception as e:
print(f"Training with simplified approach: {e}")
history = model.fit(X_train, y_train, epochs=50, verbose=0)
# Make predictions
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
# Inverse transform predictions
train_pred_inv = scaler.inverse_transform(train_pred.reshape(-1, 1)).flatten()
test_pred_inv = scaler.inverse_transform(test_pred.reshape(-1, 1)).flatten()
# Inverse transform actual values
y_train_inv = scaler.inverse_transform(y_train.reshape(-1, 1)).flatten()
y_test_inv = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()
# Calculate metrics
train_mse = mean_squared_error(y_train_inv, train_pred_inv)
test_mse = mean_squared_error(y_test_inv, test_pred_inv)
return {
'model': model,
'scaler': scaler,
'history': history,
'train_pred': train_pred_inv,
'test_pred': test_pred_inv,
'y_train': y_train_inv,
'y_test': y_test_inv,
'train_mse': train_mse,
'test_mse': test_mse,
'seq_length': seq_length,
'split_idx': split_idx
}
def generate_forecasts(self, model_results: Dict, series: np.ndarray,
forecast_steps: int = 30) -> np.ndarray:
"""Generate future forecasts using trained model"""
model = model_results['model']
scaler = model_results['scaler']
seq_length = model_results['seq_length']
# Normalize the series
series_scaled = scaler.transform(series.reshape(-1, 1)).flatten()
# Use last seq_length points as starting sequence
current_sequence = series_scaled[-seq_length:].reshape(1, seq_length, 1)
forecasts = []
for _ in range(forecast_steps):
# Predict next point
next_pred = model.predict(current_sequence, verbose=0)
forecasts.append(next_pred[0, 0])
# Update sequence (remove first point, add prediction)
current_sequence = np.roll(current_sequence, -1, axis=1)
current_sequence[0, -1, 0] = next_pred[0, 0]
# Inverse transform forecasts
forecasts_inv = scaler.inverse_transform(np.array(forecasts).reshape(-1, 1)).flatten()
return forecasts_inv
def plot_lstm_results(self, model_results: Dict, series: np.ndarray,
forecasts: np.ndarray = None, dates: pd.DatetimeIndex = None):
"""Plot LSTM model results"""
if dates is None:
dates = range(len(series))
seq_length = model_results['seq_length']
split_idx = model_results['split_idx']
# Calculate indices for plotting
train_start_idx = seq_length
train_end_idx = split_idx + seq_length
test_start_idx = train_end_idx
test_end_idx = len(series)
fig, axes = plt.subplots(2, 1, figsize=(15, 10))
# Plot 1: Time series with predictions
axes[0].plot(dates, series, label='Actual', color='blue', linewidth=1)
# Training predictions
if len(model_results['train_pred']) > 0:
train_dates = dates[train_start_idx:train_start_idx + len(model_results['train_pred'])]
axes[0].plot(train_dates, model_results['train_pred'],
label='Training Predictions', color='orange', alpha=0.7)
# Test predictions
if len(model_results['test_pred']) > 0:
test_dates = dates[test_start_idx:test_start_idx + len(model_results['test_pred'])]
axes[0].plot(test_dates, model_results['test_pred'],
label='Test Predictions', color='red', linewidth=2)
# Future forecasts
if forecasts is not None:
if isinstance(dates, pd.DatetimeIndex):
forecast_dates = pd.date_range(start=dates[-1], periods=len(forecasts) + 1, freq='D')[1:]
else:
forecast_dates = list(range(len(series), len(series) + len(forecasts)))
axes[0].plot(forecast_dates, forecasts, label='Future Forecast',
color='purple', linewidth=2, linestyle='--')
axes[0].set_title('LSTM Time Series Prediction', fontweight='bold')
axes[0].set_xlabel('Time')
axes[0].set_ylabel('Value')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Plot 2: Training history
if hasattr(model_results['history'], 'history'):
history = model_results['history'].history
axes[1].plot(history['loss'], label='Training Loss', color='blue')
if 'val_loss' in history:
axes[1].plot(history['val_loss'], label='Validation Loss', color='red')
else:
# For simple model
axes[1].plot(model_results['history']['loss'], label='Training Loss', color='blue')
axes[1].set_title('Training History', fontweight='bold')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Print metrics
print(f"\nLSTM Model Performance:")
print("-" * 30)
print(f"Training MSE: {model_results['train_mse']:.4f}")
print(f"Test MSE: {model_results['test_mse']:.4f}")
print(f"Test RMSE: {np.sqrt(model_results['test_mse']):.4f}")
# Analyze LSTM models
lstm_analyzer = LSTMTimeSeriesAnalyzer()
print("\n" + "="*60)
print("LSTM DEEP LEARNING ANALYSIS")
print("="*60)
# Use the additive series for LSTM
series_for_lstm = sample_data['additive']
# Train LSTM model
lstm_results = lstm_analyzer.train_lstm(
series_for_lstm,
seq_length=30,
forecast_horizon=1,
test_size=0.2
)
# Generate forecasts
lstm_forecasts = lstm_analyzer.generate_forecasts(
lstm_results,
series_for_lstm,
forecast_steps=50
)
# Plot results
lstm_analyzer.plot_lstm_results(
lstm_results,
series_for_lstm,
lstm_forecasts,
sample_data['dates']
)
Model Comparison and Evaluation
Comprehensive Comparison Framework
class TimeSeriesModelComparison:
"""Comprehensive time series model comparison"""
def __init__(self, random_state: int = 42):
self.random_state = random_state
def evaluate_forecasting_performance(self, actual: np.ndarray, predicted: np.ndarray) -> Dict:
"""Calculate comprehensive forecasting metrics"""
# Remove any NaN values
mask = ~(np.isnan(actual) | np.isnan(predicted))
actual_clean = actual[mask]
predicted_clean = predicted[mask]
if len(actual_clean) == 0:
return {'error': 'No valid data points for evaluation'}
# Basic metrics
mse = mean_squared_error(actual_clean, predicted_clean)
rmse = np.sqrt(mse)
mae = mean_absolute_error(actual_clean, predicted_clean)
# Percentage errors
mape = np.mean(np.abs((actual_clean - predicted_clean) / actual_clean)) * 100
# Symmetric metrics
smape = np.mean(2 * np.abs(predicted_clean - actual_clean) /
(np.abs(actual_clean) + np.abs(predicted_clean))) * 100
# R-squared
r2 = r2_score(actual_clean, predicted_clean)
return {
'mse': mse,
'rmse': rmse,
'mae': mae,
'mape': mape,
'smape': smape,
'r2': r2,
'n_points': len(actual_clean)
}
def compare_models(self, series: np.ndarray, dates: pd.DatetimeIndex = None) -> Dict:
"""Compare different time series models"""
models = {}
# Split data
train_size = int(len(series) * 0.8)
train_data = series[:train_size]
test_data = series[train_size:]
print(f"Comparing models on {len(train_data)} training and {len(test_data)} test points...")
# 1. Naive forecast (last value)
naive_pred = np.full(len(test_data), train_data[-1])
naive_metrics = self.evaluate_forecasting_performance(test_data, naive_pred)
models['Naive'] = {
'predictions': naive_pred,
'metrics': naive_metrics,
'description': 'Last value carried forward'
}
# 2. Linear trend
x_train = np.arange(len(train_data))
slope, intercept = np.polyfit(x_train, train_data, 1)
x_test = np.arange(len(train_data), len(train_data) + len(test_data))
trend_pred = slope * x_test + intercept
trend_metrics = self.evaluate_forecasting_performance(test_data, trend_pred)
models['Linear Trend'] = {
'predictions': trend_pred,
'metrics': trend_metrics,
'description': 'Linear trend extrapolation'
}
# 3. Moving average
window_size = min(30, len(train_data) // 4)
ma_pred = np.full(len(test_data), np.mean(train_data[-window_size:]))
ma_metrics = self.evaluate_forecasting_performance(test_data, ma_pred)
models['Moving Average'] = {
'predictions': ma_pred,
'metrics': ma_metrics,
'description': f'{window_size}-period moving average'
}
# 4. Seasonal naive (if enough data)
if len(train_data) >= 365:
seasonal_period = 365 # Assume daily data with yearly seasonality
seasonal_pred = []
for i in range(len(test_data)):
seasonal_idx = (len(train_data) + i) % seasonal_period
if seasonal_idx < len(train_data):
seasonal_pred.append(train_data[seasonal_idx])
else:
seasonal_pred.append(train_data[-1]) # Fallback
seasonal_pred = np.array(seasonal_pred)
seasonal_metrics = self.evaluate_forecasting_performance(test_data, seasonal_pred)
models['Seasonal Naive'] = {
'predictions': seasonal_pred,
'metrics': seasonal_metrics,
'description': f'Seasonal pattern (period={seasonal_period})'
}
# 5. Random Forest (using lagged features)
try:
lag_features = self._create_lag_features(series, lags=[1, 2, 3, 7, 30])
if lag_features is not None:
rf_pred = self._train_random_forest(lag_features, series, train_size)
if rf_pred is not None:
rf_metrics = self.evaluate_forecasting_performance(test_data, rf_pred)
models['Random Forest'] = {
'predictions': rf_pred,
'metrics': rf_metrics,
'description': 'RF with lagged features'
}
except Exception as e:
print(f"Random Forest model failed: {e}")
return models
def _create_lag_features(self, series: np.ndarray, lags: List[int]) -> np.ndarray:
"""Create lagged features for ML models"""
max_lag = max(lags)
if len(series) <= max_lag:
return None
features = []
for i in range(max_lag, len(series)):
feature_row = []
for lag in lags:
feature_row.append(series[i - lag])
features.append(feature_row)
return np.array(features)
def _train_random_forest(self, features: np.ndarray, series: np.ndarray, train_size: int) -> np.ndarray:
"""Train Random Forest model"""
max_lag = len(series) - len(features)
# Adjust train_size for lagged features
adjusted_train_size = train_size - max_lag
if adjusted_train_size <= 0:
return None
X_train = features[:adjusted_train_size]
y_train = series[max_lag:max_lag + adjusted_train_size]
X_test = features[adjusted_train_size:]
# Train model
rf = RandomForestRegressor(n_estimators=100, random_state=self.random_state)
rf.fit(X_train, y_train)
# Predict
predictions = rf.predict(X_test)
return predictions
def plot_model_comparison(self, models: Dict, series: np.ndarray,
dates: pd.DatetimeIndex = None):
"""Plot model comparison results"""
train_size = int(len(series) * 0.8)
train_data = series[:train_size]
test_data = series[train_size:]
if dates is None:
train_dates = range(train_size)
test_dates = range(train_size, len(series))
else:
train_dates = dates[:train_size]
test_dates = dates[train_size:]
# Plot predictions
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))
# Time series plot
ax1.plot(train_dates, train_data, label='Training Data', color='blue', linewidth=1)
ax1.plot(test_dates, test_data, label='Test Data', color='green', linewidth=2)
colors = ['red', 'orange', 'purple', 'brown', 'pink']
for i, (model_name, model_data) in enumerate(models.items()):
color = colors[i % len(colors)]
ax1.plot(test_dates, model_data['predictions'],
label=f"{model_name}", color=color, linewidth=1, alpha=0.8)
ax1.set_title('Model Predictions Comparison', fontweight='bold')
ax1.set_xlabel('Time')
ax1.set_ylabel('Value')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Metrics comparison
model_names = list(models.keys())
rmse_values = [models[name]['metrics']['rmse'] for name in model_names]
mae_values = [models[name]['metrics']['mae'] for name in model_names]
x_pos = np.arange(len(model_names))
width = 0.35
bars1 = ax2.bar(x_pos - width/2, rmse_values, width, label='RMSE', alpha=0.7)
bars2 = ax2.bar(x_pos + width/2, mae_values, width, label='MAE', alpha=0.7)
ax2.set_xlabel('Models')
ax2.set_ylabel('Error')
ax2.set_title('Model Performance Comparison', fontweight='bold')
ax2.set_xticks(x_pos)
ax2.set_xticklabels(model_names, rotation=45, ha='right')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Print detailed metrics
print("\nDetailed Model Comparison:")
print("=" * 80)
print(f"{'Model':<15} {'RMSE':<8} {'MAE':<8} {'MAPE':<8} {'R²':<8} {'Description'}")
print("-" * 80)
for model_name, model_data in models.items():
metrics = model_data['metrics']
description = model_data['description']
print(f"{model_name:<15} {metrics['rmse']:<8.3f} {metrics['mae']:<8.3f} "
f"{metrics['mape']:<8.2f} {metrics['r2']:<8.3f} {description}")
def create_forecast_intervals(self, predictions: np.ndarray, confidence_level: float = 0.95) -> Dict:
"""Create prediction intervals (simplified approach)"""
# Simple approach: use prediction standard deviation
pred_std = np.std(predictions)
# Z-score for confidence level
if confidence_level == 0.95:
z_score = 1.96
elif confidence_level == 0.90:
z_score = 1.645
else:
z_score = 1.96 # Default to 95%
margin_of_error = z_score * pred_std
lower_bound = predictions - margin_of_error
upper_bound = predictions + margin_of_error
return {
'lower_bound': lower_bound,
'upper_bound': upper_bound,
'margin_of_error': margin_of_error,
'confidence_level': confidence_level
}
# Perform comprehensive model comparison
model_comparison = TimeSeriesModelComparison()
print("\n" + "="*60)
print("COMPREHENSIVE MODEL COMPARISON")
print("="*60)
# Compare models on the additive series
comparison_results = model_comparison.compare_models(sample_data['additive'], sample_data['dates'])
# Plot comparison
model_comparison.plot_model_comparison(comparison_results, sample_data['additive'], sample_data['dates'])
# Create prediction intervals for best model
best_model_name = min(comparison_results.keys(),
key=lambda x: comparison_results[x]['metrics']['rmse'])
best_predictions = comparison_results[best_model_name]['predictions']
intervals = model_comparison.create_forecast_intervals(best_predictions)
print(f"\nPrediction Intervals for {best_model_name}:")
print(f"95% Confidence Interval: ±{intervals['margin_of_error']:.3f}")
Best Practices and Guidelines
Method Selection Guide
Method | Best For | Pros | Cons |
---|---|---|---|
ARIMA | Stationary series, short-term forecasts | Well-established theory | Requires stationarity |
LSTM | Complex patterns, long sequences | Handles nonlinearity | Requires large datasets |
Seasonal Decomposition | Understanding components | Interpretable | Not for forecasting |
Linear Models | Simple trends | Fast, interpretable | Limited complexity |
Key Recommendations
- Start with exploratory analysis to understand patterns
- Test for stationarity before applying ARIMA
- Use cross-validation with time series splits
- Consider seasonality in your models
- Validate on out-of-sample data
- Compare multiple approaches for robustness
Common Pitfalls
- Data leakage in cross-validation
- Ignoring seasonality patterns
- Over-differencing in ARIMA
- Insufficient training data for deep learning
- Not accounting for outliers
Performance Summary
Typical performance characteristics:
- ARIMA: Good for linear patterns, fast training
- LSTM: Excellent for complex patterns, slower training
- Random Forest: Good middle ground, handles missing data
- Naive methods: Surprisingly effective baselines
Conclusion
Time series analysis requires specialized techniques that account for temporal dependencies. Key takeaways:
- Understand your data first through visualization and decomposition
- Test multiple approaches as no single method works best everywhere
- Consider computational constraints for real-time applications
- Validate thoroughly using proper time series evaluation
- Account for uncertainty with prediction intervals
The right approach depends on your data characteristics, forecast horizon, and accuracy requirements.
References
-
Box, G. E. P., Jenkins, G. M., & Reinsel, G. C. (2015). Time series analysis: forecasting and control. John Wiley & Sons.
-
Hyndman, R. J., & Athanasopoulos, G. (2021). Forecasting: principles and practice. OTexts.
-
Hochreiter, S., & Schmidhuber, J. (1997). Long short-term memory. Neural computation.
Connect with me on LinkedIn or X to discuss time series analysis and forecasting!