from siglent_sdg.siglent import SiglentGen from asyncio import sleep from rigol_dho_lib.rigol import RigolOsc, CHANNEL_COUNT import time from rigol_dho_lib import rigol from siglent_sdg import siglent import numpy as np import matplotlib.pyplot as plt SINE_FREQ = 100e3 CYCLE_COUNT = 1000 DIV_COUNT = 10 OSC_CHANNEL_A = 1 # GEN signal OSC_CHANNEL_B = 2 # OUT GEN_CHANNEL = siglent.ChannelID.CH1 UPPER_BOUND_DIV = 7 LOWER_BOUND_DIV = 6 TARGET_DIV = (UPPER_BOUND_DIV + LOWER_BOUND_DIV) / 2 class ImpedanceAnalyzer: def __init__(self, gen_addr: str, osc_addr: str): self.gen: siglent.SiglentGen = siglent.SiglentGen(gen_addr) self.osc: rigol.RigolOsc = rigol.RigolOsc(osc_addr) self.mem_depth = self.osc.getMemoryDepth() self.osc.setPoints(self.mem_depth) self.amplitude = 10 self.debug_voltages = [] self.dc = 0 self.scales = [10] * (CHANNEL_COUNT + 1) return def getImpedance(self, freq: float): setWindowSize(self.osc, CYCLE_COUNT, freq) self.gen.channels[GEN_CHANNEL].apply_sine(freq, self.amplitude, self.dc) self.gen.channels[GEN_CHANNEL].set_output(True) self.osc.run() time.sleep(1) self.osc.single() time.sleep(0.1) channel_A_data = self.getScaledWaveform(OSC_CHANNEL_A) channel_B_data = self.getScaledWaveform(OSC_CHANNEL_B) time_array = self.osc.getChannel(OSC_CHANNEL_A).genTimeArray(channel_A_data) v_a = dft(channel_A_data, time_array, freq) v_o = dft(channel_B_data, time_array, freq) print(f"{freq} V_a {v_a} V_o {v_o}") R0 = 10 z = R0 * (v_a - v_o) / v_o self.debug_voltages.append([np.abs(v_a), np.abs(v_o)]) return z def getScaledWaveform(self, ch): data = self.osc.getChannel(ch).getWaveform() vpp = self.calculateVRMS(data) * 2 * np.sqrt(2) if not (LOWER_BOUND_DIV < vpp / self.scales[ch] < UPPER_BOUND_DIV): self.autoscale(ch) time.sleep(1) self.osc.single() data = self.osc.getChannel(ch).getWaveform() return data def getSweep(self, start: float, stop: float, samples): f = np.logspace(start, stop, samples, endpoint=True, base=10.0) z_array = [] for i in f: z_array.append(self.getImpedance(i)) print(z_array) return z_array, f def calculateVRMS(self, array): return np.sqrt(np.sum(np.pow(array, 2)) / len(array)) def autoscale(self, channel): vpp = 0 self.osc.run() time.sleep(1) while True: rms = self.osc.getChannel(channel).getVrms()[0] vpp = rms * 2 * np.sqrt(2) if LOWER_BOUND_DIV < vpp / self.scales[channel] < UPPER_BOUND_DIV: break elif vpp / self.scales[channel] > UPPER_BOUND_DIV: self.scales[channel] = self.osc.getChannel(channel).clampVscale( self.scales[channel] * 2 ) else: self.scales[channel] = self.osc.getChannel(channel).clampVscale( vpp / TARGET_DIV ) self.osc.getChannel(channel).setVScale(self.scales[channel]) print( f"Autoscaled channel: {channel}, RMS: {rms}, Vpp:{vpp}, Scale:{self.scales[channel]}" ) time.sleep(1) return def setWindowSize(osc, cycles, frequency): period = 1 / frequency * cycles / DIV_COUNT osc.setTimescale(period) def dft(data, time_array, freq): carrier = np.exp(2j * np.pi * freq * time_array) x = carrier * data print(carrier) return sum(x) / len(carrier) * 2 # uv def main(): # time.sleep(0.2) # plt.plot(time_array, channel_A_data) # plt.plot(time_array, channel_B_data) # plt.show() # plt.plot(time_array, np.real(x)) # plt.plot(time_array, np.imag(x)) imp = ImpedanceAnalyzer("TCPIP::10.112.1.15::INSTR", "TCPIP::10.112.1.9::INSTR") imp.gen.channels[GEN_CHANNEL.CH1].set_output(True) z, f = imp.getSweep(3, 7, 20) z_real = np.real(z) z_imag = np.imag(z) fig, ax = plt.subplots(3) ax[0].plot(f, z_real, label="Real") ax[0].plot(f, z_imag, label="Imag") ax[0].set_yscale("log") ax[1].plot(f, np.abs(z), label="Magnitude") ax[1].set_yscale("log") ax_phase = ax[1].twinx() ax_phase.plot(f, np.angle(z), label="Phase", color="orange", linestyle="--") ax_phase.legend() ax[0].set_xscale("log") ax[1].set_xscale("log") ax[2].plot(f, imp.debug_voltages, label="V_a") ax[2].set_xscale("log") ax[0].legend() ax[1].legend() plt.show() return if __name__ == "__main__": main()