Refactors DEPP interface for modularity and enhancements

Reorganizes DEPP functionality into a dedicated package for clarity
Introduces new features like block and stream operations
Improves usability with context manager support and timeout setting
Adds library loader for flexible shared library discovery
Includes detailed examples and register dump for debugging
This commit is contained in:
2025-04-23 16:55:40 +02:00
parent 0645d70dac
commit a00f3cc592
6 changed files with 317 additions and 77 deletions

54
depp.py
View File

@@ -1,54 +0,0 @@
import ctypes
from ctypes import c_bool, c_char_p, c_ubyte, c_void_p, POINTER
# Pfade zu den Digilent Libraries
libdmgr = ctypes.CDLL("/usr/lib64/digilent/adept/libdmgr.so")
libdepp = ctypes.CDLL("/usr/lib64/digilent/adept/libdepp.so")
# Typdefinition für das Handle
HIF = c_void_p
# Funktions-Signaturen definieren
libdmgr.DmgrOpen.argtypes = [POINTER(HIF), c_char_p]
libdmgr.DmgrOpen.restype = c_bool
libdmgr.DmgrClose.argtypes = [HIF]
libdmgr.DmgrClose.restype = c_bool
libdepp.DeppEnable.argtypes = [HIF]
libdepp.DeppEnable.restype = c_bool
libdepp.DeppDisable.argtypes = [HIF]
libdepp.DeppDisable.restype = c_bool
libdepp.DeppGetReg.argtypes = [HIF, c_ubyte, POINTER(c_ubyte), c_bool]
libdepp.DeppGetReg.restype = c_bool
libdepp.DeppPutReg.argtypes = [HIF, c_ubyte, c_ubyte, c_bool]
libdepp.DeppPutReg.restype = c_bool
class AdeptDepp:
def __init__(self, device_name: str):
self.hif = HIF()
ok = libdmgr.DmgrOpen(ctypes.byref(self.hif), device_name.encode())
if not ok:
raise RuntimeError(f"DmgrOpen failed for device '{device_name}'")
if not libdepp.DeppEnable(self.hif):
libdmgr.DmgrClose(self.hif)
raise RuntimeError("DeppEnable failed")
def get_reg(self, reg: int) -> int:
value = c_ubyte()
if not libdepp.DeppGetReg(self.hif, c_ubyte(reg), ctypes.byref(value), False):
raise RuntimeError(f"DeppGetReg failed at reg {reg}")
return value.value
def set_reg(self, reg: int, value: int):
if not libdepp.DeppPutReg(self.hif, c_ubyte(reg), c_ubyte(value), False):
raise RuntimeError(f"DeppPutReg failed at reg {reg}")
def close(self):
libdepp.DeppDisable(self.hif)
libdmgr.DmgrClose(self.hif)

View File

@@ -1,23 +0,0 @@
from depp import AdeptDepp
DEVICE_NAME = "DOnbUsb"
depp = None
depp = AdeptDepp(DEVICE_NAME)
print("[✓] Verbindung erfolgreich")
print("Register 0 lesen...")
val = depp.get_reg(0)
print(f"Wert: {val}")
print("Register 0 auf 42 setzen...")
depp.set_reg(0, 42)
print("Zurücklesen...")
val = depp.get_reg(0)
print(f"Neuer Wert: {val}")
if depp is not None:
depp.close()
print("[✓] Verbindung geschlossen")

157
pydepp/depp.py Normal file
View File

@@ -0,0 +1,157 @@
from typing import List
import ctypes
from ctypes import c_ubyte, c_uint32
from .ffi import libdepp, libdmgr, HIF
class AdeptDepp:
"""
High-level object-oriented wrapper for the Digilent DEPP interface.
Provides convenient methods for register access and data streaming
over a DEPP-compatible USB-FPGA connection using the Adept SDK.
"""
def __init__(self, device_name: str):
"""
Initialize the DEPP interface and open a handle to the given device.
:param device_name: The name of the connected device (e.g., "Basys3")
:raises RuntimeError: If device cannot be opened or enabled
"""
self.hif = HIF()
if not libdmgr.DmgrOpen(ctypes.byref(self.hif), device_name.encode()):
raise RuntimeError(f"DmgrOpen failed for device '{device_name}'")
if not libdepp.DeppEnable(self.hif):
libdmgr.DmgrClose(self.hif)
raise RuntimeError("DeppEnable failed")
def __enter__(self):
"""
Context manager entry: returns self for use with 'with' statement.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Context manager exit: automatically closes the DEPP connection.
"""
self.close()
def get_reg(self, reg: int) -> int:
"""
Read a single byte from a specified register address.
:param reg: Register address (0-255)
:return: Byte value read from the register
:raises RuntimeError: If the read fails
"""
value = c_ubyte()
if not libdepp.DeppGetReg(self.hif, c_ubyte(reg), ctypes.byref(value), False):
raise RuntimeError(f"DeppGetReg failed at reg {reg}")
return value.value
def set_reg(self, reg: int, value: int):
"""
Write a single byte to a specified register address.
:param reg: Register address (0-255)
:param value: Byte value to write
:raises RuntimeError: If the write fails
"""
if not libdepp.DeppPutReg(self.hif, c_ubyte(reg), c_ubyte(value), False):
raise RuntimeError(f"DeppPutReg failed at reg {reg}")
def set_reg_block(self, addr_data: List[int]):
"""
Write multiple address-value pairs in one transaction.
:param addr_data: List of alternating register addresses and values
:raises ValueError: If list length is not even
:raises RuntimeError: If the write fails
"""
if len(addr_data) % 2 != 0:
raise ValueError("List must contain address-data pairs")
buf = (c_ubyte * len(addr_data))(*addr_data)
if not libdepp.DeppPutRegSet(self.hif, buf, len(addr_data) // 2, False):
raise RuntimeError("DeppPutRegSet failed")
def get_reg_block(self, addresses: List[int]) -> List[int]:
"""
Read multiple registers in one transaction.
:param addresses: List of register addresses
:return: List of read values
:raises RuntimeError: If the read fails
"""
addr_buf = (c_ubyte * len(addresses))(*addresses)
data_buf = (c_ubyte * len(addresses))()
if not libdepp.DeppGetRegSet(self.hif, addr_buf, data_buf, len(addresses), False):
raise RuntimeError("DeppGetRegSet failed")
return list(data_buf)
def put_stream(self, reg: int, data: bytes):
"""
Stream a sequence of bytes to a single register address.
:param reg: Register address
:param data: Byte stream to write
:raises RuntimeError: If the write fails
"""
buf = (c_ubyte * len(data)).from_buffer_copy(data)
if not libdepp.DeppPutRegRepeat(self.hif, c_ubyte(reg), buf, len(data), False):
raise RuntimeError("DeppPutRegRepeat failed")
def get_stream(self, reg: int, count: int) -> bytes:
"""
Stream a sequence of bytes from a single register address.
:param reg: Register address
:param count: Number of bytes to read
:return: Byte stream read
:raises RuntimeError: If the read fails
"""
buf = (c_ubyte * count)()
if not libdepp.DeppGetRegRepeat(self.hif, c_ubyte(reg), buf, count, False):
raise RuntimeError("DeppGetRegRepeat failed")
return bytes(buf)
def set_timeout(self, timeout_ns: int):
"""
Set the DEPP communication timeout in nanoseconds.
:param timeout_ns: Desired timeout in nanoseconds
:raises RuntimeError: If setting the timeout fails
"""
actual = c_uint32()
if not libdepp.DeppSetTimeout(self.hif, timeout_ns, ctypes.byref(actual)):
raise RuntimeError("DeppSetTimeout failed")
def __repr__(self) -> str:
"""
Return a detailed string representation of the DEPP interface, including
register values from 0 to 255, shown in both decimal and hexadecimal format.
This method is useful for debugging or inspection, especially in interactive
environments or when printing the object directly.
:return: Formatted string showing register contents.
"""
lines = ["AdeptDepp – Register dump (0-255):"]
lines.append("+--------+-----------+-----------+")
lines.append("| Addr | Value (d) | Value (h) |")
lines.append("+--------+-----------+-----------+")
for i in range(256):
try:
val = self.get_reg(i)
lines.append(f"| {i:6} | {val:9} | 0x{val:02X} |")
except RuntimeError:
lines.append(f"| {i:6} | <fail> | <fail> |")
lines.append("+--------+-----------+-----------+")
return "\n".join(lines)
def close(self):
"""
Cleanly close the DEPP interface and release the device handle.
"""
libdepp.DeppDisable(self.hif)
libdmgr.DmgrClose(self.hif)

58
pydepp/example.py Normal file
View File

@@ -0,0 +1,58 @@
from .depp import AdeptDepp
DEVICE_NAME = "DOnbUsb"
print("[✓] Starting DEPP interface test")
with AdeptDepp(DEVICE_NAME) as depp:
print("[✓] Connection established")
# Single register read/write
print("Reading register 0...")
val = depp.get_reg(0)
print(f"Value: {val}")
print("Writing 42 to register 0...")
depp.set_reg(0, 42)
print("Reading back register 0...")
val = depp.get_reg(0)
print(f"New value: {val}")
# Block write (address-data pairs: [reg, val, reg, val, ...])
print("Writing block to registers 1, 2, 3...")
depp.set_reg_block([1, 11, 2, 22, 3, 33])
# Block read
print("Reading block from registers 1, 2, 3...")
values = depp.get_reg_block([1, 2, 3])
print(f"Read values: {values}")
# Streaming write
print("Streaming bytes to register 4...")
stream_data = bytes([10, 20, 30, 40, 50])
depp.put_stream(4, stream_data)
# Streaming read
print("Reading streamed bytes from register 4...")
received_data = depp.get_stream(4, len(stream_data))
print(f"Received stream: {list(received_data)}")
# Set timeout (e.g., 1ms)
print("Setting timeout to 1 ms...")
depp.set_timeout(1_000_000)
# Dump all 256 8-bit registers with decimal and hex representation
print("\nRegister dump (0-255):")
print("+--------+-----------+-----------+")
print("| Addr | Value (d) | Value (h) |")
print("+--------+-----------+-----------+")
for i in range(256):
try:
val = depp.get_reg(i)
print(f"| {i:6} | {val:9} | 0x{val:02X} |")
except RuntimeError:
print(f"| {i:6} | <fail> | <fail> |")
print("+--------+-----------+-----------+")
print("[✓] DEPP interface test complete")

57
pydepp/ffi.py Normal file
View File

@@ -0,0 +1,57 @@
import ctypes
from ctypes import c_bool, c_char_p, c_ubyte, c_void_p, POINTER, c_uint32
# Load Digilent Adept shared libraries
from .loader import load_library
libdmgr = load_library("libdmgr.so")
libdepp = load_library("libdepp.so")
# Type alias for device handle
HIF = c_void_p
# Configure function signatures for DMGR API
libdmgr.DmgrOpen.argtypes = [POINTER(HIF), c_char_p]
libdmgr.DmgrOpen.restype = c_bool
libdmgr.DmgrClose.argtypes = [HIF]
libdmgr.DmgrClose.restype = c_bool
# Configure function signatures for DEPP API
libdepp.DeppEnable.argtypes = [HIF]
libdepp.DeppEnable.restype = c_bool
libdepp.DeppEnableEx.argtypes = [HIF, ctypes.c_int32]
libdepp.DeppEnableEx.restype = c_bool
libdepp.DeppDisable.argtypes = [HIF]
libdepp.DeppDisable.restype = c_bool
libdepp.DeppGetVersion.argtypes = [c_char_p]
libdepp.DeppGetVersion.restype = c_bool
libdepp.DeppGetPortCount.argtypes = [HIF, POINTER(ctypes.c_int32)]
libdepp.DeppGetPortCount.restype = c_bool
libdepp.DeppGetPortProperties.argtypes = [HIF, ctypes.c_int32, POINTER(ctypes.c_uint32)]
libdepp.DeppGetPortProperties.restype = c_bool
libdepp.DeppSetTimeout.argtypes = [HIF, c_uint32, POINTER(c_uint32)]
libdepp.DeppSetTimeout.restype = c_bool
libdepp.DeppPutReg.argtypes = [HIF, c_ubyte, c_ubyte, c_bool]
libdepp.DeppPutReg.restype = c_bool
libdepp.DeppGetReg.argtypes = [HIF, c_ubyte, POINTER(c_ubyte), c_bool]
libdepp.DeppGetReg.restype = c_bool
libdepp.DeppPutRegSet.argtypes = [HIF, POINTER(c_ubyte), c_uint32, c_bool]
libdepp.DeppPutRegSet.restype = c_bool
libdepp.DeppGetRegSet.argtypes = [HIF, POINTER(c_ubyte), POINTER(c_ubyte), c_uint32, c_bool]
libdepp.DeppGetRegSet.restype = c_bool
libdepp.DeppPutRegRepeat.argtypes = [HIF, c_ubyte, POINTER(c_ubyte), c_uint32, c_bool]
libdepp.DeppPutRegRepeat.restype = c_bool
libdepp.DeppGetRegRepeat.argtypes = [HIF, c_ubyte, POINTER(c_ubyte), c_uint32, c_bool]
libdepp.DeppGetRegRepeat.restype = c_bool

45
pydepp/loader.py Normal file
View File

@@ -0,0 +1,45 @@
import ctypes
import os
def load_library(libname: str) -> ctypes.CDLL:
"""
Load a shared library from system or user-defined locations.
Priority:
1. DEPP_LIB_PATH environment variable
2. Common system paths
3. Rely on LD_LIBRARY_PATH or system defaults
:param libname: The filename of the shared library (e.g. 'libdepp.so')
:return: Loaded ctypes CDLL object
:raises FileNotFoundError: If the library could not be found
"""
search_paths = []
# 1. Environment override
env_path = os.environ.get("DEPP_LIB_PATH")
if env_path:
search_paths.append(os.path.join(env_path, libname))
# 2. Common fallback paths
search_paths += [
f"/usr/lib64/digilent/adept/{libname}",
f"/usr/lib/digilent/adept/{libname}",
f"/usr/local/lib/digilent/adept/{libname}",
f"/opt/digilent/adept/{libname}"
]
# 3. Final fallback – system-wide discovery via LD_LIBRARY_PATH
search_paths.append(libname)
for path in search_paths:
if os.path.exists(path) or '/' not in path:
try:
return ctypes.CDLL(path)
except OSError:
continue
raise FileNotFoundError(
f"Could not locate shared library '{libname}'. Tried paths:\n" +
"\n".join(search_paths)
)