Source code for data_processing.processor.zeiss_FCS_processor

import os
import numpy as np
import json
from IO.read_raw_corr_file import read_confo_cor3
import re
from datetime import datetime


[docs] class ZeissFCSProcessor: """ Processes Zeiss ConfoCor3 .raw files and identifies the file with the highest mean photon intensity. At this moment requires 'FCS_points.json' in the result file for reading the corresponding stage positions. """ def __init__(self, folder_path): """ Initialize the FCS processor. :param folder_path: Path to the folder containing .raw files and FCS_points.json """ self.folder_path = folder_path # Collects all .raw files from the folder self.raw_files = [ os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.lower().endswith(".raw") ] with open(os.path.join(self.folder_path, 'FCS_points.json'), 'r') as file: data = json.load(file) self.FCS_measurements_points = data if not self.raw_files: print("No .raw files found in:", folder_path)
[docs] def mean_intensity_from_photon_data(self, photon_data): """ Calculate mean photon intensity (photons per second) from photon arrival time data. :param photon_data: Dictionary containing photon timing data :return: Mean photon intensity [photons/s] """ ph_sync = photon_data["ph_sync"] # photon tick times sync_rate = photon_data["TTResult_SyncRate"] # clock frequency [Hz] # Convert ticks to seconds time_s = ph_sync / sync_rate # Compute histogram of photon arrivals in time bins bin_width = 0.1 # seconds bins = np.arange(0, time_s[-1] + bin_width, bin_width) counts, _ = np.histogram(time_s, bins=bins) # Mean photon rate (counts per second) return np.mean(counts / bin_width)
[docs] def find_highest_intensity_file(self): """ Process all .raw files, compute their mean intensity, and return the file with the highest intensity. :return: (best_file_path, max_intensity) """ results = [] for raw_path in self.raw_files: try: ph_data = read_confo_cor3(raw_path) mean_intensity = self.mean_intensity_from_photon_data(ph_data) results.append((raw_path, mean_intensity)) print(os.path.basename(raw_path), ":", round(mean_intensity, 2)) except Exception as e: print("Error reading file:", raw_path, ":", str(e)) if not results: print("No valid photon data found.") return None # Select the file with the maximum mean intensity best_file, max_intensity = max(results, key=lambda x: x[1]) print("\nHighest mean intensity:", round(max_intensity, 2), "photons/s") print("File:", os.path.basename(best_file)) return best_file, max_intensity
[docs] def get_measurement_points(self): """ Connects chosen file with the position from FCS_points.json """ best_file, max_intensity = self.find_highest_intensity_file() pattern = r"P(\d+)" match = re.search(pattern, best_file) if match: p_tag = match.group() else: raise ValueError('Could not find the FCS_points.json file') positions = self.FCS_measurements_points[p_tag] # build dictionary for that point point_entry = { "position": [positions["x"], positions["y"], positions["z"]], "intensity": max_intensity, "source": self.folder_path, "timestamp": datetime.utcnow().isoformat() + "Z" } measurement_points = {p_tag: point_entry} return measurement_points
[docs] def save_measurement_points(self, saving_path): """Saves measurement_points dict to a JSON file.""" data = self.get_measurement_points() with open(saving_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) print("Saved measurement points to:", saving_path)