#!/usr/bin/env python """ usage: python myaverager.py """ import sys import os import time import math import numpy as np from typing import List, Tuple import argparse import torch import torch.nn.functional as F import nedc_image_tools class SimpleAnalyzerWithBins: def __init__(self, gpu_id: int = 0, batch_size: int = 32, hist_bins: int = 16, hist_min: float = -6.0, hist_max: float = 6.0): self.device = torch.device(f'cuda:{gpu_id}') self.batch_size = batch_size self.hist_bins = hist_bins self.hist_min = hist_min self.hist_max = hist_max # Gaussian kernel self.gaussian_kernel = self._create_gaussian_kernel().to(self.device) # Hamming window hamming_y = torch.hamming_window(256, periodic=False, dtype=torch.float32) hamming_x = torch.hamming_window(256, periodic=False, dtype=torch.float32) hamming_2d = hamming_y.unsqueeze(1) * hamming_x.unsqueeze(0) self.hamming_window = hamming_2d.unsqueeze(0).unsqueeze(0).to(self.device) # Bin edges for display self.bin_edges = np.linspace(hist_min, hist_max, hist_bins + 1) print(f"Initialized on {self.device}") print(f"Histogram bins: {hist_bins}, Range: [{hist_min}, {hist_max}]") print("Bin edges (log magnitude):") for i in range(min(5, hist_bins)): print(f" Bin {i}: [{self.bin_edges[i]:.2f}, {self.bin_edges[i+1]:.2f})") if hist_bins > 5: print(f" ... ({hist_bins - 5} more bins)") def _create_gaussian_kernel(self, size: int = 7, sigma: float = 1.0): """Create 7x7 Gaussian kernel""" ax = torch.arange(size, dtype=torch.float32) - size // 2 xx, yy = torch.meshgrid(ax, ax, indexing='ij') kernel = torch.exp(-(xx**2 + yy**2) / (2 * sigma**2)) return (kernel / kernel.sum()).view(1, 1, size, size) def process_image_with_bins(self, image_path: str, max_windows: int = 100) -> torch.Tensor: """Process image with bin information in progress updates""" print(f"\nProcessing: {os.path.basename(image_path)}") start = time.time() try: # Open image reader = nedc_image_tools.Nil() reader.open(image_path) width, height = reader.get_dimension() print(f" Image size: {width}x{height}") # Generate coordinates coords = [] frame_x, frame_y = 128, 128 for x in range(0, width, frame_x): for y in range(0, height, frame_y): coords.append((x, y)) if max_windows and len(coords) >= max_windows: break if max_windows and len(coords) >= max_windows: break print(f" Total windows to process: {len(coords)}") # Process in batches total_hist = torch.zeros(self.hist_bins) total_windows = 0 num_batches = max(1, (len(coords) + self.batch_size - 1) // self.batch_size) for batch_idx in range(num_batches): batch_start = batch_idx * self.batch_size batch_end = min((batch_idx + 1) * self.batch_size, len(coords)) batch_coords = coords[batch_start:batch_end] # Read batch windows = reader.read_data_multithread( batch_coords, npixy=256, npixx=256, color_mode="RGB" ) # Convert to tensors batch_tensors = [] for w in windows: if w is not None: if w.shape[0] == 3: tensor = torch.from_numpy(w.astype(np.float32)) else: tensor = torch.from_numpy(w.astype(np.float32)).permute(2, 0, 1) batch_tensors.append(tensor) if batch_tensors: # Process batch batch = torch.stack(batch_tensors).to(self.device) batch = batch / 255.0 # Gaussian blur blurred = F.conv2d(batch, self.gaussian_kernel.repeat(3, 1, 1, 1), padding=3, groups=3) # FFT windowed = blurred * self.hamming_window fft = torch.fft.fft2(windowed) magnitude = torch.abs(fft) / math.sqrt(256 * 256) magnitude = torch.fft.fftshift(magnitude, dim=(-2, -1)) # Log magnitude magnitude_avg = magnitude.mean(dim=1) log_mag = torch.log10(magnitude_avg + 1e-12) # Histogram scaled = (log_mag - self.hist_min) / (self.hist_max - self.hist_min) bin_indices = torch.clamp((scaled * self.hist_bins).long(), 0, self.hist_bins - 1) # Compute batch histogram batch_hist = torch.zeros(self.hist_bins, device=self.device) for b in range(len(batch_tensors)): flat = bin_indices[b].flatten() counts = torch.bincount(flat, minlength=self.hist_bins) batch_hist += counts.float() total_hist += batch_hist.cpu() total_windows += len(batch_tensors) # Progress update with bin information if (batch_idx + 1) % max(1, num_batches // 5) == 0 or batch_idx == num_batches - 1: elapsed = time.time() - start windows_per_sec = total_windows / elapsed if elapsed > 0 else 0 print(f"\n Batch {batch_idx+1}/{num_batches} complete:") print(f" Windows: {total_windows}/{len(coords)} ({windows_per_sec:.1f}/sec)") reader.close() elapsed = time.time() - start windows_per_sec = total_windows / elapsed if elapsed > 0 else 0 print(f"\n Completed: {total_windows} windows in {elapsed:.1f}s ({windows_per_sec:.1f} windows/sec)") # Final histogram information final_hist_np = total_hist.numpy() total_counts = final_hist_np.sum() #print(f"\n Final histogram for {os.path.basename(image_path)}:") #print(f" Total counts: {total_counts:.0f}") #print(f" Average per window: {total_counts/total_windows if total_windows > 0 else 0:.1f}") # Show distribution #print(f" Distribution across bins:") #for i in range(min(6, self.hist_bins)): # bin_start = self.bin_edges[i] # bin_end = self.bin_edges[i + 1] # count = final_hist_np[i] # percent = (count / total_counts * 100) if total_counts > 0 else 0 # print(f" Bin {i:2d}: [{bin_start:6.2f}, {bin_end:6.2f}) = {count:10.0f} ({percent:5.1f}%)") #if self.hist_bins > 6: # print(f" ...") # for i in range(self.hist_bins - 3, self.hist_bins): # bin_start = self.bin_edges[i] # bin_end = self.bin_edges[i + 1] # count = final_hist_np[i] # percent = (count / total_counts * 100) if total_counts > 0 else 0 # print(f" Bin {i:2d}: [{bin_start:6.2f}, {bin_end:6.2f}) = {count:10.0f} ({percent:5.1f}%)") return total_hist except Exception as e: print(f" Error: {e}") import traceback traceback.print_exc() return torch.zeros(self.hist_bins) def main(): parser = argparse.ArgumentParser(description='Simple Analyzer with Bin Information') parser.add_argument('file_list', help='List of image files') parser.add_argument('--gpu_id', type=int, default=0, help='GPU ID') parser.add_argument('--batch_size', type=int, default=64, help='Batch size') parser.add_argument('--max_windows', type=int, default=300, help='Max windows per image') parser.add_argument('--hist_bins', type=int, default=16, help='Number of histogram bins') parser.add_argument('--hist_min', type=float, default=-6.0, help='Minimum log magnitude') parser.add_argument('--hist_max', type=float, default=6.0, help='Maximum log magnitude') args = parser.parse_args() # Read files with open(args.file_list, 'r') as f: files = [line.strip() for line in f if line.strip()] print(f"\nProcessing {len(files)} images") print(f"GPU: {args.gpu_id}") print(f"Batch size: {args.batch_size}") print(f"Max windows per image: {args.max_windows}") # Initialize analyzer analyzer = SimpleAnalyzerWithBins( gpu_id=args.gpu_id, batch_size=args.batch_size, hist_bins=args.hist_bins, hist_min=args.hist_min, hist_max=args.hist_max ) # Process start = time.time() hists = [] for i, f in enumerate(files): if not os.path.exists(f): print(f"File not found: {f}") continue print(f"\n{'='*60}") print(f"[Image {i+1}/{len(files)}]") hist = analyzer.process_image_with_bins(f, args.max_windows) hists.append(hist) # Results if hists: total_time = time.time() - start stacked = torch.stack(hists) data = stacked.numpy() # Overall statistics avg = np.mean(data) std = np.std(data) # Combined histogram combined_hist = data.sum(axis=0) total_counts = combined_hist.sum() print(f"\n{'='*60}") print("FINAL RESULTS") print(f"{'='*60}") print(f"Images processed: {len(hists)}") print(f"Total time: {total_time:.1f}s") print(f"Time per image: {total_time/len(hists):.1f}s") print(f"Images/sec: {len(hists)/total_time:.3f}") print(f"\nOverall statistics:") print(f" Average histogram count: {avg:.2f}") print(f" Standard deviation: {std:.2f}") # Combined histogram information print(f"\nCombined histogram (all images):") print(f" Total counts: {total_counts:,.0f}") # Show bin distribution bin_edges = np.linspace(args.hist_min, args.hist_max, args.hist_bins + 1) print(f"\n Bin distribution:") for i in range(args.hist_bins): bin_start = bin_edges[i] bin_end = bin_edges[i + 1] count = combined_hist[i] percent = (count / total_counts * 100) if total_counts > 0 else 0 if i < 16: #or i >= args.hist_bins - 5: # Show first and last 5 bins print(f" Bin {i:2d}: [{bin_start:6.2f}, {bin_end:6.2f}) = {count:12,.0f} ({percent:5.1f}%)") elif i == 5: print(f" ... ({args.hist_bins - 10} bins omitted)") print(f"{'='*60}") if __name__ == "__main__": main()