impedance_analyzer/main.py
2026-04-07 11:24:59 +02:00

252 lines
6.5 KiB
Python

from PIL.ImageChops import offset
from siglent_sdg.siglent import SiglentGen
from asyncio import sleep
from rigol_dho_lib.rigol import RigolOsc, CHANNEL_COUNT
import time
from datetime import datetime
from rigol_dho_lib import rigol
from siglent_sdg import siglent
import numpy as np
import csv
from scipy.integrate import cumulative_trapezoid
import matplotlib.pyplot as plt
SINE_FREQ = 100e3
CYCLE_COUNT = 100
DIV_COUNT = 10
OSC_CHANNEL_A = 1 # GEN signal
OSC_CHANNEL_B = 2 # OUT
GEN_CHANNEL = siglent.ChannelID.CH1
UPPER_BOUND_DIV = 8
LOWER_BOUND_DIV = 3
TARGET_DIV = (UPPER_BOUND_DIV + LOWER_BOUND_DIV) / 2
PATH = "/home/zychlix/Desktop/pomiary/out"
R0 = 99.4
AMPLITUDE = 3
COUNT = 200
class ImpedanceAnalyzer:
def __init__(self, gen_addr: str, osc_addr: str):
self.gen: siglent.SiglentGen = siglent.SiglentGen(gen_addr)
self.osc: rigol.RigolOsc = rigol.RigolOsc(osc_addr)
self.mem_depth = self.osc.getMemoryDepth()
self.osc.setPoints(self.mem_depth)
self.amplitude = AMPLITUDE
self.debug_voltages = []
self.span_a = 0
self.span_b = 0
self.dc = 0
self.scales = [10] * (CHANNEL_COUNT + 1)
return
def getImpedance(self, freq: float):
setWindowSize(self.osc, CYCLE_COUNT, freq)
self.gen.channels[GEN_CHANNEL].apply_sine(freq, self.amplitude, self.dc)
self.gen.channels[GEN_CHANNEL].set_output(True)
self.osc.run()
time.sleep(1)
self.osc.single()
time.sleep(0.3)
channel_A_data = self.getScaledWaveform(OSC_CHANNEL_A)
channel_B_data = self.getScaledWaveform(OSC_CHANNEL_B)
time_array = self.osc.getChannel(OSC_CHANNEL_A).genTimeArray(channel_A_data)
v_a = dft(channel_A_data, time_array, freq)
v_o = dft(channel_B_data, time_array, freq)
print(f"{freq} V_a {v_a} V_o {v_o}")
z = R0 * (v_a - v_o) / v_o
# z = v_a / (v_o / R0)
self.debug_voltages.append([np.abs(v_a), np.abs(v_o)])
return z
def getScaledWaveform(self, ch):
data = self.osc.getChannel(ch).getWaveform()
vpp = self.calculateVRMS(data) * 2 * np.sqrt(2)
if not (LOWER_BOUND_DIV < vpp / self.scales[ch] < UPPER_BOUND_DIV):
# self.autoscale(ch)
time.sleep(2) # change back to 1
self.osc.single()
data = self.osc.getChannel(ch).getWaveform()
return data
def getSweep(self, start: float, stop: float, samples):
f = np.logspace(start, stop, samples, endpoint=True, base=10.0)
z_array = []
for i in f:
# print(f"Frequency: {f}")
z_array.append(self.getImpedance(i))
print(z_array)
return z_array, f
def calculateVRMS(self, array):
return np.sqrt(np.sum(np.pow(array, 2)) / len(array))
def autoscale(self, channel):
vpp = 0
while True:
# rms = self.osc.getChannel(channel).getVrms()[0]
self.osc.single()
data = self.osc.getChannel(channel).getWaveform()
time.sleep(1)
rms = self.calculateVRMS(data)
vpp = rms * 2 * np.sqrt(2)
if LOWER_BOUND_DIV < vpp / self.scales[channel] < UPPER_BOUND_DIV:
break
elif (
vpp / self.scales[channel] > UPPER_BOUND_DIV
or self.osc.getChannel(channel).getVrms()[0] > 1000
):
self.scales[channel] = self.osc.getChannel(channel).clampVscale(
self.scales[channel] * 2
)
else:
self.scales[channel] = self.osc.getChannel(channel).clampVscale(
vpp / TARGET_DIV
)
self.osc.getChannel(channel).setVScale(self.scales[channel])
print(
f"Autoscaled channel: {channel}, RMS: {rms}, Vpp:{vpp}, Scale:{self.scales[channel]}"
)
return
def PlotBH(self, freq, amplitude):
setWindowSize(self.osc, 1, freq)
self.gen.channels[GEN_CHANNEL].apply_sine(freq, amplitude, self.dc)
self.gen.channels[GEN_CHANNEL].set_output(True)
self.osc.run()
time.sleep(5)
self.osc.single()
time.sleep(0.3)
voltage_data = self.getScaledWaveform(OSC_CHANNEL_A)
current_data = self.getScaledWaveform(OSC_CHANNEL_B)
time_array = self.osc.getChannel(OSC_CHANNEL_A).genTimeArray(voltage_data)
v_l = voltage_data # Voltage induced in the inductor
Offset = np.mean(v_l)
v_l = v_l - Offset
B = (
cumulative_trapezoid(v_l, time_array, initial=0) / 1
) # Correct to proper values
H = current_data / R0 # As well as in here
plt.plot(H, B)
plt.show()
def exportZtoCSV(self, z, filename: str):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
np.savetxt(filename + timestamp + ".csv", z, delimiter=",", fmt="%s")
def setWindowSize(osc, cycles, frequency):
period = 1 / frequency * cycles / DIV_COUNT
osc.setTimescale(period)
# def dft(data, time_array, freq):
# carrier = np.exp(-2j * np.pi * freq * time_array)
# x = carrier * data
# print(carrier)
# return sum(x) / len(carrier) * 2
def dft(data, time_array, freq):
sin_ref = np.sin(2 * np.pi * freq * time_array)
cos_ref = np.cos(2 * np.pi * freq * time_array)
i = 2 * np.mean(data * cos_ref)
q = 2 * np.mean(data * sin_ref)
return i - 1j * q
# uv
def main():
# imp = ImpedanceAnalyzer("TCPIP::10.112.1.2::INSTR", "TCPIP::10.112.1.3::INSTR")
imp = ImpedanceAnalyzer("TCPIP::192.168.1.4::INSTR", "TCPIP::192.168.1.5::INSTR")
imp.gen.channels[GEN_CHANNEL.CH1].set_output(True)
# imp.autoscale(1)
# imp.autoscale(2)
imp.PlotBH(3e3, AMPLITUDE)
return 0
z, f = imp.getSweep(2, 7, COUNT)
z_real = np.real(z)
z_imag = np.imag(z)
fig, ax = plt.subplots(3)
ax[0].plot(f, z_real, label="Real")
ax[0].plot(f, z_imag, label="Imag")
ax[0].set_yscale("linear")
ax[1].plot(f, np.abs(z), label="Magnitude")
ax[1].set_yscale("log")
ax_phase = ax[1].twinx()
ax_phase.plot(
f, np.angle(z) / np.pi * 180, label="Phase", color="orange", linestyle="--"
)
ax_phase.legend()
ax[0].set_xscale("log")
ax[1].set_xscale("log")
ax[2].plot(f, imp.debug_voltages, label="V_a")
ax[2].set_xscale("log")
ax[0].legend()
ax[1].legend()
imp.exportZtoCSV(np.array([f, z]).T, PATH)
plt.show()
return
if __name__ == "__main__":
main()