Autoscaling
This commit is contained in:
parent
c865a0672b
commit
35b9398c52
175
main.py
175
main.py
@ -1,4 +1,6 @@
|
||||
from rigol_dho_lib.rigol import RigolOsc
|
||||
from siglent_sdg.siglent import SiglentGen
|
||||
from asyncio import sleep
|
||||
from rigol_dho_lib.rigol import RigolOsc, CHANNEL_COUNT
|
||||
import time
|
||||
from rigol_dho_lib import rigol
|
||||
from siglent_sdg import siglent
|
||||
@ -12,6 +14,113 @@ CYCLE_COUNT = 1000
|
||||
|
||||
DIV_COUNT = 10
|
||||
|
||||
OSC_CHANNEL_A = 1 # GEN signal
|
||||
|
||||
OSC_CHANNEL_B = 2 # OUT
|
||||
GEN_CHANNEL = siglent.ChannelID.CH1
|
||||
|
||||
UPPER_BOUND_DIV = 7
|
||||
LOWER_BOUND_DIV = 6
|
||||
TARGET_DIV = (UPPER_BOUND_DIV + LOWER_BOUND_DIV) / 2
|
||||
|
||||
|
||||
class ImpedanceAnalyzer:
|
||||
def __init__(self, gen_addr: str, osc_addr: str):
|
||||
self.gen: siglent.SiglentGen = siglent.SiglentGen(gen_addr)
|
||||
self.osc: rigol.RigolOsc = rigol.RigolOsc(osc_addr)
|
||||
|
||||
self.mem_depth = self.osc.getMemoryDepth()
|
||||
self.osc.setPoints(self.mem_depth)
|
||||
|
||||
self.amplitude = 10
|
||||
self.debug_voltages = []
|
||||
|
||||
self.dc = 0
|
||||
|
||||
self.scales = [10] * (CHANNEL_COUNT + 1)
|
||||
return
|
||||
|
||||
def getImpedance(self, freq: float):
|
||||
setWindowSize(self.osc, CYCLE_COUNT, freq)
|
||||
|
||||
self.gen.channels[GEN_CHANNEL].apply_sine(freq, self.amplitude, self.dc)
|
||||
self.gen.channels[GEN_CHANNEL].set_output(True)
|
||||
|
||||
self.osc.run()
|
||||
time.sleep(1)
|
||||
|
||||
self.osc.single()
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
channel_A_data = self.getScaledWaveform(OSC_CHANNEL_A)
|
||||
channel_B_data = self.getScaledWaveform(OSC_CHANNEL_B)
|
||||
|
||||
time_array = self.osc.getChannel(OSC_CHANNEL_A).genTimeArray(channel_A_data)
|
||||
|
||||
v_a = dft(channel_A_data, time_array, freq)
|
||||
v_o = dft(channel_B_data, time_array, freq)
|
||||
|
||||
print(f"{freq} V_a {v_a} V_o {v_o}")
|
||||
|
||||
R0 = 10
|
||||
z = R0 * (v_a - v_o) / v_o
|
||||
|
||||
self.debug_voltages.append([np.abs(v_a), np.abs(v_o)])
|
||||
|
||||
return z
|
||||
|
||||
def getScaledWaveform(self, ch):
|
||||
data = self.osc.getChannel(ch).getWaveform()
|
||||
vpp = self.calculateVRMS(data) * 2 * np.sqrt(2)
|
||||
if not (LOWER_BOUND_DIV < vpp / self.scales[ch] < UPPER_BOUND_DIV):
|
||||
self.autoscale(ch)
|
||||
time.sleep(1)
|
||||
self.osc.single()
|
||||
data = self.osc.getChannel(ch).getWaveform()
|
||||
return data
|
||||
|
||||
def getSweep(self, start: float, stop: float, samples):
|
||||
f = np.logspace(start, stop, samples, endpoint=True, base=10.0)
|
||||
z_array = []
|
||||
for i in f:
|
||||
z_array.append(self.getImpedance(i))
|
||||
print(z_array)
|
||||
return z_array, f
|
||||
|
||||
def calculateVRMS(self, array):
|
||||
return np.sqrt(np.sum(np.pow(array, 2)) / len(array))
|
||||
|
||||
def autoscale(self, channel):
|
||||
|
||||
vpp = 0
|
||||
|
||||
self.osc.run()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
while True:
|
||||
rms = self.osc.getChannel(channel).getVrms()[0]
|
||||
vpp = rms * 2 * np.sqrt(2)
|
||||
|
||||
if LOWER_BOUND_DIV < vpp / self.scales[channel] < UPPER_BOUND_DIV:
|
||||
break
|
||||
elif vpp / self.scales[channel] > UPPER_BOUND_DIV:
|
||||
self.scales[channel] = self.osc.getChannel(channel).clampVscale(
|
||||
self.scales[channel] * 2
|
||||
)
|
||||
else:
|
||||
self.scales[channel] = self.osc.getChannel(channel).clampVscale(
|
||||
vpp / TARGET_DIV
|
||||
)
|
||||
self.osc.getChannel(channel).setVScale(self.scales[channel])
|
||||
print(
|
||||
f"Autoscaled channel: {channel}, RMS: {rms}, Vpp:{vpp}, Scale:{self.scales[channel]}"
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
return
|
||||
|
||||
|
||||
def setWindowSize(osc, cycles, frequency):
|
||||
period = 1 / frequency * cycles / DIV_COUNT
|
||||
@ -21,45 +130,15 @@ def setWindowSize(osc, cycles, frequency):
|
||||
def dft(data, time_array, freq):
|
||||
carrier = np.exp(2j * np.pi * freq * time_array)
|
||||
x = carrier * data
|
||||
print(carrier)
|
||||
|
||||
return sum(x) / len(carrier) * 2
|
||||
|
||||
|
||||
# uv
|
||||
def main():
|
||||
|
||||
try:
|
||||
gen = siglent.SiglentGen("TCPIP::10.112.1.17::INSTR")
|
||||
except ():
|
||||
print("Can't connent!")
|
||||
|
||||
try:
|
||||
osc = rigol.RigolOsc("TCPIP::10.112.1.19::INSTR")
|
||||
except ():
|
||||
print("Can't connent!")
|
||||
|
||||
mem_depth = osc.getMemoryDepth()
|
||||
|
||||
osc.setPoints(mem_depth)
|
||||
|
||||
setWindowSize(osc, CYCLE_COUNT, SINE_FREQ)
|
||||
|
||||
osc.channels[1].setVScale(0.5)
|
||||
gen.channels[siglent.ChannelID.CH1].apply_sine(SINE_FREQ, 10, 0)
|
||||
gen.channels[siglent.ChannelID.CH1].set_output(True)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
osc.single()
|
||||
|
||||
# time.sleep(0.2)
|
||||
channel_A_data = osc.channels[0].getWaveform()
|
||||
channel_B_data = osc.channels[1].getWaveform()
|
||||
|
||||
time_array = osc.channels[0].genTimeArray(channel_A_data)
|
||||
|
||||
v_a = dft(channel_A_data, time_array, SINE_FREQ)
|
||||
|
||||
v_o = dft(channel_B_data, time_array, SINE_FREQ)
|
||||
|
||||
# plt.plot(time_array, channel_A_data)
|
||||
# plt.plot(time_array, channel_B_data)
|
||||
@ -69,13 +148,33 @@ def main():
|
||||
# plt.plot(time_array, np.real(x))
|
||||
# plt.plot(time_array, np.imag(x))
|
||||
|
||||
print(f"v_a: {v_a}")
|
||||
print(f"v_o: {v_o}")
|
||||
imp = ImpedanceAnalyzer("TCPIP::10.112.1.15::INSTR", "TCPIP::10.112.1.9::INSTR")
|
||||
|
||||
R0 = 10
|
||||
z = R0 * (v_a - v_o) / v_o
|
||||
imp.gen.channels[GEN_CHANNEL.CH1].set_output(True)
|
||||
|
||||
print(f"impedance: {z}")
|
||||
z, f = imp.getSweep(3, 7, 20)
|
||||
|
||||
z_real = np.real(z)
|
||||
z_imag = np.imag(z)
|
||||
fig, ax = plt.subplots(3)
|
||||
ax[0].plot(f, z_real, label="Real")
|
||||
ax[0].plot(f, z_imag, label="Imag")
|
||||
ax[0].set_yscale("log")
|
||||
ax[1].plot(f, np.abs(z), label="Magnitude")
|
||||
ax[1].set_yscale("log")
|
||||
ax_phase = ax[1].twinx()
|
||||
ax_phase.plot(f, np.angle(z), label="Phase", color="orange", linestyle="--")
|
||||
ax_phase.legend()
|
||||
ax[0].set_xscale("log")
|
||||
ax[1].set_xscale("log")
|
||||
|
||||
ax[2].plot(f, imp.debug_voltages, label="V_a")
|
||||
ax[2].set_xscale("log")
|
||||
|
||||
ax[0].legend()
|
||||
ax[1].legend()
|
||||
|
||||
plt.show()
|
||||
|
||||
return
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user