Source code for periphery.gpio

import collections
import ctypes
import errno
import fcntl
import os
import os.path
import platform
import select
import time


try:
    KERNEL_VERSION = tuple([int(s) for s in platform.release().split(".")[:2]])
except ValueError:
    KERNEL_VERSION = (0, 0)


[docs]class GPIOError(IOError): """Base class for GPIO errors.""" pass
[docs]class EdgeEvent(collections.namedtuple('EdgeEvent', ['edge', 'timestamp'])): def __new__(cls, edge, timestamp): """EdgeEvent containing the event edge and event time reported by Linux. Args: edge (str): event edge, either "rising" or "falling". timestamp (int): event time in nanoseconds. """ return super(EdgeEvent, cls).__new__(cls, edge, timestamp)
[docs]class GPIO(object): def __new__(cls, *args, **kwargs): if len(args) > 2: return CdevGPIO.__new__(cls, *args, **kwargs) else: return SysfsGPIO.__new__(cls, *args, **kwargs) def __del__(self): self.close() def __enter__(self): return self def __exit__(self, t, value, traceback): self.close() # Methods
[docs] def read(self): """Read the state of the GPIO. Returns: bool: ``True`` for high state, ``False`` for low state. Raises: GPIOError: if an I/O or OS error occurs. """ raise NotImplementedError()
[docs] def write(self, value): """Set the state of the GPIO to `value`. Args: value (bool): ``True`` for high state, ``False`` for low state. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `value` type is not bool. """ raise NotImplementedError()
[docs] def poll(self, timeout=None): """Poll a GPIO for the edge event configured with the .edge property with an optional timeout. For character device GPIOs, the edge event should be consumed with `read_event()`. For sysfs GPIOs, the edge event should be consumed with `read()`. `timeout` can be a positive number for a timeout in seconds, zero for a non-blocking poll, or negative or None for a blocking poll. Default is a blocking poll. Args: timeout (int, float, None): timeout duration in seconds. Returns: bool: ``True`` if an edge event occurred, ``False`` on timeout. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `timeout` type is not None or int. """ raise NotImplementedError()
[docs] def read_event(self): """Read the edge event that occurred with the GPIO. This method is intended for use with character device GPIOs and is unsupported by sysfs GPIOs. Returns: EdgeEvent: a namedtuple containing the string edge event that occurred (either ``"rising"`` or ``"falling"``), and the event time reported by Linux in nanoseconds. Raises: GPIOError: if an I/O or OS error occurs. NotImplementedError: if called on a sysfs GPIO. """ raise NotImplementedError()
[docs] @staticmethod def poll_multiple(gpios, timeout=None): """Poll multiple GPIOs for the edge event configured with the .edge property with an optional timeout. For character device GPIOs, the edge event should be consumed with `read_event()`. For sysfs GPIOs, the edge event should be consumed with `read()`. `timeout` can be a positive number for a timeout in seconds, zero for a non-blocking poll, or negative or None for a blocking poll. Default is a blocking poll. Args: gpios (list): list of GPIO objects to poll. timeout (int, float, None): timeout duration in seconds. Returns: list: list of GPIO objects for which an edge event occurred. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `timeout` type is not None or int. """ if not isinstance(timeout, (int, float, type(None))): raise TypeError("Invalid timeout type, should be integer, float, or None.") # Setup poll p = select.poll() # Register GPIO file descriptors and build map of fd to object fd_gpio_map = {} for gpio in gpios: if isinstance(gpio, SysfsGPIO): p.register(gpio.fd, select.POLLPRI | select.POLLERR) else: p.register(gpio.fd, select.POLLIN | select.POLLRDNORM) fd_gpio_map[gpio.fd] = gpio # Scale timeout to milliseconds if isinstance(timeout, (int, float)) and timeout > 0: timeout *= 1000 # Poll events = p.poll(timeout) # Gather GPIOs that had edge events occur results = [] for (fd, _) in events: gpio = fd_gpio_map[fd] results.append(gpio) if isinstance(gpio, SysfsGPIO): # Rewind for read try: os.lseek(fd, 0, os.SEEK_SET) except OSError as e: raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) return results
[docs] def close(self): """Close the GPIO. Raises: GPIOError: if an I/O or OS error occurs. """ raise NotImplementedError()
# Immutable properties @property def devpath(self): """Get the device path of the underlying GPIO device. :type: str """ raise NotImplementedError() @property def fd(self): """Get the line file descriptor of the GPIO object. :type: int """ raise NotImplementedError() @property def line(self): """Get the GPIO object's line number. :type: int """ raise NotImplementedError() @property def name(self): """Get the line name of the GPIO. This method is intended for use with character device GPIOs and always returns the empty string for sysfs GPIOs. :type: str """ raise NotImplementedError() @property def label(self): """Get the line consumer label of the GPIO. This method is intended for use with character device GPIOs and always returns the empty string for sysfs GPIOs. :type: str """ raise NotImplementedError() @property def chip_fd(self): """Get the GPIO chip file descriptor of the GPIO object. This method is intended for use with character device GPIOs and is unsupported by sysfs GPIOs. Raises: NotImplementedError: if accessed on a sysfs GPIO. :type: int """ raise NotImplementedError() @property def chip_name(self): """Get the name of the GPIO chip associated with the GPIO. :type: str """ raise NotImplementedError() @property def chip_label(self): """Get the label of the GPIO chip associated with the GPIO. :type: str """ raise NotImplementedError() # Mutable properties def _get_direction(self): raise NotImplementedError() def _set_direction(self, direction): raise NotImplementedError() direction = property(_get_direction, _set_direction) """Get or set the GPIO's direction. Can be "in", "out", "high", "low". Direction "in" is input; "out" is output, initialized to low; "high" is output, initialized to high; and "low" is output, initialized to low. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `direction` type is not str. ValueError: if `direction` value is invalid. :type: str """ def _get_edge(self): raise NotImplementedError() def _set_edge(self, edge): raise NotImplementedError() edge = property(_get_edge, _set_edge) """Get or set the GPIO's interrupt edge. Can be "none", "rising", "falling", "both". Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `edge` type is not str. ValueError: if `edge` value is invalid. :type: str """ def _get_bias(self): raise NotImplementedError() def _set_bias(self, bias): raise NotImplementedError() bias = property(_get_bias, _set_bias) """Get or set the GPIO's line bias. Can be "default", "pull_up", "pull_down", "disable". This property is not supported by sysfs GPIOs. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `bias` type is not str. ValueError: if `bias` value is invalid. :type: str """ def _get_drive(self): raise NotImplementedError() def _set_drive(self, drive): raise NotImplementedError() drive = property(_get_drive, _set_drive) """Get or set the GPIO's line drive. Can be "default" (for push-pull), "open_drain", "open_source". This property is not supported by sysfs GPIOs. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `drive` type is not str. ValueError: if `drive` value is invalid. :type: str """ def _get_inverted(self): raise NotImplementedError() def _set_inverted(self, inverted): raise NotImplementedError() inverted = property(_get_inverted, _set_inverted) """Get or set the GPIO's inverted (active low) property. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `inverted` type is not bool. :type: bool """ # String representation def __str__(self): """Get the string representation of the GPIO. :type: str """ raise NotImplementedError()
class _CGpiochipInfo(ctypes.Structure): _fields_ = [ ('name', ctypes.c_char * 32), ('label', ctypes.c_char * 32), ('lines', ctypes.c_uint32), ] class _CGpiolineInfo(ctypes.Structure): _fields_ = [ ('line_offset', ctypes.c_uint32), ('flags', ctypes.c_uint32), ('name', ctypes.c_char * 32), ('consumer', ctypes.c_char * 32), ] class _CGpiohandleRequest(ctypes.Structure): _fields_ = [ ('lineoffsets', ctypes.c_uint32 * 64), ('flags', ctypes.c_uint32), ('default_values', ctypes.c_uint8 * 64), ('consumer_label', ctypes.c_char * 32), ('lines', ctypes.c_uint32), ('fd', ctypes.c_int), ] class _CGpiohandleData(ctypes.Structure): _fields_ = [ ('values', ctypes.c_uint8 * 64), ] class _CGpioeventRequest(ctypes.Structure): _fields_ = [ ('lineoffset', ctypes.c_uint32), ('handleflags', ctypes.c_uint32), ('eventflags', ctypes.c_uint32), ('consumer_label', ctypes.c_char * 32), ('fd', ctypes.c_int), ] class _CGpioeventData(ctypes.Structure): _fields_ = [ ('timestamp', ctypes.c_uint64), ('id', ctypes.c_uint32), ] class CdevGPIO(GPIO): # Constants scraped from <linux/gpio.h> _GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xc040b408 _GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xc040b409 _GPIO_GET_CHIPINFO_IOCTL = 0x8044b401 _GPIO_GET_LINEINFO_IOCTL = 0xc048b402 _GPIO_GET_LINEHANDLE_IOCTL = 0xc16cb403 _GPIO_GET_LINEEVENT_IOCTL = 0xc030b404 _GPIOHANDLE_REQUEST_INPUT = 0x1 _GPIOHANDLE_REQUEST_OUTPUT = 0x2 _GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x4 _GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x8 _GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10 _GPIOHANDLE_REQUEST_BIAS_PULL_UP = 0x20 _GPIOHANDLE_REQUEST_BIAS_PULL_DOWN = 0x40 _GPIOHANDLE_REQUEST_BIAS_DISABLE = 0x80 _GPIOEVENT_REQUEST_RISING_EDGE = 0x1 _GPIOEVENT_REQUEST_FALLING_EDGE = 0x2 _GPIOEVENT_REQUEST_BOTH_EDGES = 0x3 _GPIOEVENT_EVENT_RISING_EDGE = 0x1 _GPIOEVENT_EVENT_FALLING_EDGE = 0x2 _SUPPORTS_LINE_BIAS = KERNEL_VERSION >= (5, 5) def __init__(self, path, line, direction, edge="none", bias="default", drive="default", inverted=False, label=None): """**Character device GPIO** Instantiate a GPIO object and open the character device GPIO with the specified line and direction at the specified GPIO chip path (e.g. "/dev/gpiochip0"). Defaults properties can be overridden with keyword arguments. Args: path (str): GPIO chip character device path. line (int, str): GPIO line number or name. direction (str): GPIO direction, can be "in", "out", "high", or "low". edge (str): GPIO interrupt edge, can be "none", "rising", "falling", or "both". bias (str): GPIO line bias, can be "default", "pull_up", "pull_down", or "disable". drive (str): GPIO line drive, can be "default", "open_drain", or "open_source". inverted (bool): GPIO is inverted (active low). label (str, None): GPIO line consumer label. Returns: CdevGPIO: GPIO object. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `path`, `line`, `direction`, `edge`, `bias`, `drive`, `inverted`, or `label` types are invalid. ValueError: if `direction`, `edge`, `bias`, or `drive` value is invalid. LookupError: if the GPIO line was not found by the provided name. """ self._devpath = None self._line = None self._line_fd = None self._chip_fd = None self._direction = None self._edge = None self._bias = None self._drive = None self._inverted = None self._label = None self._open(path, line, direction, edge, bias, drive, inverted, label) def __new__(self, path, line, direction, **kwargs): return object.__new__(CdevGPIO) def _open(self, path, line, direction, edge, bias, drive, inverted, label): if not isinstance(path, str): raise TypeError("Invalid path type, should be string.") if not isinstance(line, (int, str)): raise TypeError("Invalid line type, should be integer or string.") if not isinstance(direction, str): raise TypeError("Invalid direction type, should be string.") elif direction not in ["in", "out", "high", "low"]: raise ValueError("Invalid direction, can be: \"in\", \"out\", \"high\", \"low\".") if not isinstance(edge, str): raise TypeError("Invalid edge type, should be string.") elif edge not in ["none", "rising", "falling", "both"]: raise ValueError("Invalid edge, can be: \"none\", \"rising\", \"falling\", \"both\".") if not isinstance(bias, str): raise TypeError("Invalid bias type, should be string.") elif bias not in ["default", "pull_up", "pull_down", "disable"]: raise ValueError("Invalid bias, can be: \"default\", \"pull_up\", \"pull_down\", \"disable\".") if not isinstance(drive, str): raise TypeError("Invalid drive type, should be string.") elif drive not in ["default", "open_drain", "open_source"]: raise ValueError("Invalid drive, can be: \"default\", \"open_drain\", \"open_source\".") if not isinstance(inverted, bool): raise TypeError("Invalid drive type, should be bool.") if not isinstance(label, (type(None), str)): raise TypeError("Invalid label type, should be None or str.") if isinstance(line, str): line = self._find_line_by_name(path, line) # Open GPIO chip try: self._chip_fd = os.open(path, 0) except OSError as e: raise GPIOError(e.errno, "Opening GPIO chip: " + e.strerror) self._devpath = path self._line = line self._label = label.encode() if label is not None else b"periphery" self._reopen(direction, edge, bias, drive, inverted) def _reopen(self, direction, edge, bias, drive, inverted): flags = 0 if bias != "default" and not CdevGPIO._SUPPORTS_LINE_BIAS: raise GPIOError(None, "Line bias configuration not supported by kernel version {}.{}.".format(*KERNEL_VERSION)) elif bias == "pull_up": flags |= CdevGPIO._GPIOHANDLE_REQUEST_BIAS_PULL_UP elif bias == "pull_down": flags |= CdevGPIO._GPIOHANDLE_REQUEST_BIAS_PULL_DOWN elif bias == "disable": flags |= CdevGPIO._GPIOHANDLE_REQUEST_BIAS_DISABLE if drive == "open_drain": flags |= CdevGPIO._GPIOHANDLE_REQUEST_OPEN_DRAIN elif drive == "open_source": flags |= CdevGPIO._GPIOHANDLE_REQUEST_OPEN_SOURCE if inverted: flags |= CdevGPIO._GPIOHANDLE_REQUEST_ACTIVE_LOW # FIXME this should really use GPIOHANDLE_SET_CONFIG_IOCTL instead of # closing and reopening, especially to preserve output value on # configuration changes # Close existing line if self._line_fd is not None: try: os.close(self._line_fd) except OSError as e: raise GPIOError(e.errno, "Closing existing GPIO line: " + e.strerror) self._line_fd = None if direction == "in": if edge == "none": request = _CGpiohandleRequest() request.lineoffsets[0] = self._line request.flags = flags | CdevGPIO._GPIOHANDLE_REQUEST_INPUT request.consumer_label = self._label request.lines = 1 try: fcntl.ioctl(self._chip_fd, CdevGPIO._GPIO_GET_LINEHANDLE_IOCTL, request) except (OSError, IOError) as e: raise GPIOError(e.errno, "Opening input line handle: " + e.strerror) self._line_fd = request.fd else: request = _CGpioeventRequest() request.lineoffset = self._line request.handleflags = flags | CdevGPIO._GPIOHANDLE_REQUEST_INPUT request.eventflags = CdevGPIO._GPIOEVENT_REQUEST_RISING_EDGE if edge == "rising" else CdevGPIO._GPIOEVENT_REQUEST_FALLING_EDGE if edge == "falling" else CdevGPIO._GPIOEVENT_REQUEST_BOTH_EDGES request.consumer_label = self._label try: fcntl.ioctl(self._chip_fd, CdevGPIO._GPIO_GET_LINEEVENT_IOCTL, request) except (OSError, IOError) as e: raise GPIOError(e.errno, "Opening input line event handle: " + e.strerror) self._line_fd = request.fd else: request = _CGpiohandleRequest() initial_value = True if direction == "high" else False initial_value ^= inverted request.lineoffsets[0] = self._line request.flags = flags | CdevGPIO._GPIOHANDLE_REQUEST_OUTPUT request.default_values[0] = initial_value request.consumer_label = self._label request.lines = 1 try: fcntl.ioctl(self._chip_fd, CdevGPIO._GPIO_GET_LINEHANDLE_IOCTL, request) except (OSError, IOError) as e: raise GPIOError(e.errno, "Opening output line handle: " + e.strerror) self._line_fd = request.fd self._direction = "in" if direction == "in" else "out" self._edge = edge self._bias = bias self._drive = drive self._inverted = inverted def _find_line_by_name(self, path, line): # Open GPIO chip try: fd = os.open(path, 0) except OSError as e: raise GPIOError(e.errno, "Opening GPIO chip: " + e.strerror) # Get chip info for number of lines chip_info = _CGpiochipInfo() try: fcntl.ioctl(fd, CdevGPIO._GPIO_GET_CHIPINFO_IOCTL, chip_info) except (OSError, IOError) as e: raise GPIOError(e.errno, "Querying GPIO chip info: " + e.strerror) # Get each line info line_info = _CGpiolineInfo() found = False for i in range(chip_info.lines): line_info.line_offset = i try: fcntl.ioctl(fd, CdevGPIO._GPIO_GET_LINEINFO_IOCTL, line_info) except (OSError, IOError) as e: raise GPIOError(e.errno, "Querying GPIO line info: " + e.strerror) if line_info.name.decode() == line: found = True break try: os.close(fd) except OSError as e: raise GPIOError(e.errno, "Closing GPIO chip: " + e.strerror) if found: return i raise LookupError("Opening GPIO line: GPIO line \"{:s}\" not found by name.".format(line)) # Methods def read(self): data = _CGpiohandleData() try: fcntl.ioctl(self._line_fd, CdevGPIO._GPIOHANDLE_GET_LINE_VALUES_IOCTL, data) except (OSError, IOError) as e: raise GPIOError(e.errno, "Getting line value: " + e.strerror) return bool(data.values[0]) def write(self, value): if not isinstance(value, bool): raise TypeError("Invalid value type, should be bool.") elif self._direction != "out": raise GPIOError(None, "Invalid operation: cannot write to input GPIO") data = _CGpiohandleData() data.values[0] = value try: fcntl.ioctl(self._line_fd, CdevGPIO._GPIOHANDLE_SET_LINE_VALUES_IOCTL, data) except (OSError, IOError) as e: raise GPIOError(e.errno, "Setting line value: " + e.strerror) def poll(self, timeout=None): if not isinstance(timeout, (int, float, type(None))): raise TypeError("Invalid timeout type, should be integer, float, or None.") elif self._direction != "in": raise GPIOError(None, "Invalid operation: cannot poll output GPIO") # Setup poll p = select.poll() p.register(self._line_fd, select.POLLIN | select.POLLPRI | select.POLLERR) # Scale timeout to milliseconds if isinstance(timeout, (int, float)) and timeout > 0: timeout *= 1000 # Poll events = p.poll(timeout) return len(events) > 0 def read_event(self): if self._direction != "in": raise GPIOError(None, "Invalid operation: cannot read event of output GPIO") elif self._edge == "none": raise GPIOError(None, "Invalid operation: GPIO edge not set") try: buf = os.read(self._line_fd, ctypes.sizeof(_CGpioeventData)) except OSError as e: raise GPIOError(e.errno, "Reading GPIO event: " + e.strerror) event_data = _CGpioeventData.from_buffer_copy(buf) if event_data.id == CdevGPIO._GPIOEVENT_EVENT_RISING_EDGE: edge = "rising" elif event_data.id == CdevGPIO._GPIOEVENT_EVENT_FALLING_EDGE: edge = "falling" else: edge = "none" timestamp = event_data.timestamp return EdgeEvent(edge, timestamp) def close(self): try: if self._line_fd is not None: os.close(self._line_fd) except OSError as e: raise GPIOError(e.errno, "Closing GPIO line: " + e.strerror) try: if self._chip_fd is not None: os.close(self._chip_fd) except OSError as e: raise GPIOError(e.errno, "Closing GPIO chip: " + e.strerror) self._line_fd = None self._chip_fd = None self._edge = "none" self._direction = "in" self._line = None # Immutable properties @property def devpath(self): return self._devpath @property def fd(self): return self._line_fd @property def line(self): return self._line @property def name(self): line_info = _CGpiolineInfo() line_info.line_offset = self._line try: fcntl.ioctl(self._chip_fd, CdevGPIO._GPIO_GET_LINEINFO_IOCTL, line_info) except (OSError, IOError) as e: raise GPIOError(e.errno, "Querying GPIO line info: " + e.strerror) return line_info.name.decode() @property def label(self): line_info = _CGpiolineInfo() line_info.line_offset = self._line try: fcntl.ioctl(self._chip_fd, CdevGPIO._GPIO_GET_LINEINFO_IOCTL, line_info) except (OSError, IOError) as e: raise GPIOError(e.errno, "Querying GPIO line info: " + e.strerror) return line_info.consumer.decode() @property def chip_fd(self): return self._chip_fd @property def chip_name(self): chip_info = _CGpiochipInfo() try: fcntl.ioctl(self._chip_fd, CdevGPIO._GPIO_GET_CHIPINFO_IOCTL, chip_info) except (OSError, IOError) as e: raise GPIOError(e.errno, "Querying GPIO chip info: " + e.strerror) return chip_info.name.decode() @property def chip_label(self): chip_info = _CGpiochipInfo() try: fcntl.ioctl(self._chip_fd, CdevGPIO._GPIO_GET_CHIPINFO_IOCTL, chip_info) except (OSError, IOError) as e: raise GPIOError(e.errno, "Querying GPIO chip info: " + e.strerror) return chip_info.label.decode() # Mutable properties def _get_direction(self): return self._direction def _set_direction(self, direction): if not isinstance(direction, str): raise TypeError("Invalid direction type, should be string.") if direction not in ["in", "out", "high", "low"]: raise ValueError("Invalid direction, can be: \"in\", \"out\", \"high\", \"low\".") if self._direction == direction: return self._reopen(direction, "none", self._bias, self._drive, self._inverted) direction = property(_get_direction, _set_direction) def _get_edge(self): return self._edge def _set_edge(self, edge): if not isinstance(edge, str): raise TypeError("Invalid edge type, should be string.") if edge not in ["none", "rising", "falling", "both"]: raise ValueError("Invalid edge, can be: \"none\", \"rising\", \"falling\", \"both\".") if self._direction != "in": raise GPIOError(None, "Invalid operation: cannot set edge on output GPIO") if self._edge == edge: return self._reopen(self._direction, edge, self._bias, self._drive, self._inverted) edge = property(_get_edge, _set_edge) def _get_bias(self): return self._bias def _set_bias(self, bias): if not isinstance(bias, str): raise TypeError("Invalid bias type, should be string.") if bias not in ["default", "pull_up", "pull_down", "disable"]: raise ValueError("Invalid bias, can be: \"default\", \"pull_up\", \"pull_down\", \"disable\".") if self._bias == bias: return self._reopen(self._direction, self._edge, bias, self._drive, self._inverted) bias = property(_get_bias, _set_bias) def _get_drive(self): return self._drive def _set_drive(self, drive): if not isinstance(drive, str): raise TypeError("Invalid drive type, should be string.") if drive not in ["default", "open_drain", "open_source"]: raise ValueError("Invalid drive, can be: \"default\", \"open_drain\", \"open_source\".") if self._direction != "out" and drive != "default": raise GPIOError(None, "Invalid operation: cannot set line drive on input GPIO") if self._drive == drive: return self._reopen(self._direction, self._edge, self._bias, drive, self._inverted) drive = property(_get_drive, _set_drive) def _get_inverted(self): return self._inverted def _set_inverted(self, inverted): if not isinstance(inverted, bool): raise TypeError("Invalid drive type, should be bool.") if self._inverted == inverted: return self._reopen(self._direction, self._edge, self._bias, self._drive, inverted) inverted = property(_get_inverted, _set_inverted) # String representation def __str__(self): try: str_name = self.name except GPIOError: str_name = "<error>" try: str_label = self.label except GPIOError: str_label = "<error>" try: str_direction = self.direction except GPIOError: str_direction = "<error>" try: str_edge = self.edge except GPIOError: str_edge = "<error>" try: str_bias = self.bias except GPIOError: str_bias = "<error>" try: str_drive = self.drive except GPIOError: str_drive = "<error>" try: str_inverted = str(self.inverted) except GPIOError: str_inverted = "<error>" try: str_chip_name = self.chip_name except GPIOError: str_chip_name = "<error>" try: str_chip_label = self.chip_label except GPIOError: str_chip_label = "<error>" return "GPIO {:d} (name=\"{:s}\", label=\"{:s}\", device={:s}, line_fd={:d}, chip_fd={:d}, direction={:s}, edge={:s}, bias={:s}, drive={:s}, inverted={:s}, chip_name=\"{:s}\", chip_label=\"{:s}\", type=cdev)" \ .format(self._line, str_name, str_label, self._devpath, self._line_fd, self._chip_fd, str_direction, str_edge, str_bias, str_drive, str_inverted, str_chip_name, str_chip_label) class SysfsGPIO(GPIO): # Number of retries to check for GPIO export or direction write on open GPIO_OPEN_RETRIES = 10 # Delay between check for GPIO export or direction write on open (100ms) GPIO_OPEN_DELAY = 0.1 def __init__(self, line, direction): """**Sysfs GPIO** Instantiate a GPIO object and open the sysfs GPIO with the specified line and direction. `direction` can be "in" for input; "out" for output, initialized to low; "high" for output, initialized to high; or "low" for output, initialized to low. Args: line (int): GPIO line number. direction (str): GPIO direction, can be "in", "out", "high", or "low", Returns: SysfsGPIO: GPIO object. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `line` or `direction` types are invalid. ValueError: if `direction` value is invalid. TimeoutError: if waiting for GPIO export times out. """ self._fd = None self._line = None self._exported = False self._open(line, direction) def __new__(self, line, direction): return object.__new__(SysfsGPIO) def _open(self, line, direction): if not isinstance(line, int): raise TypeError("Invalid line type, should be integer.") if not isinstance(direction, str): raise TypeError("Invalid direction type, should be string.") if direction.lower() not in ["in", "out", "high", "low"]: raise ValueError("Invalid direction, can be: \"in\", \"out\", \"high\", \"low\".") gpio_path = "/sys/class/gpio/gpio{:d}".format(line) if not os.path.isdir(gpio_path): # Export the line try: with open("/sys/class/gpio/export", "w") as f_export: f_export.write("{:d}\n".format(line)) except IOError as e: raise GPIOError(e.errno, "Exporting GPIO: " + e.strerror) # Loop until GPIO is exported for i in range(SysfsGPIO.GPIO_OPEN_RETRIES): if os.path.isdir(gpio_path): self._exported = True break time.sleep(SysfsGPIO.GPIO_OPEN_DELAY) if not self._exported: raise TimeoutError("Exporting GPIO: waiting for \"{:s}\" timed out".format(gpio_path)) # Write direction, looping in case of EACCES errors due to delayed udev # permission rule application after export for i in range(SysfsGPIO.GPIO_OPEN_RETRIES): try: with open(os.path.join(gpio_path, "direction"), "w") as f_direction: f_direction.write(direction.lower() + "\n") break except IOError as e: if e.errno != errno.EACCES or (e.errno == errno.EACCES and i == SysfsGPIO.GPIO_OPEN_RETRIES - 1): raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror) time.sleep(SysfsGPIO.GPIO_OPEN_DELAY) # Open value try: self._fd = os.open(os.path.join(gpio_path, "value"), os.O_RDWR) except OSError as e: raise GPIOError(e.errno, "Opening GPIO: " + e.strerror) self._line = line self._path = gpio_path # Initialize direction if not self._exported: self.direction = direction # Initialize inverted self.inverted = False # Methods def read(self): # Read value try: buf = os.read(self._fd, 2) except OSError as e: raise GPIOError(e.errno, "Reading GPIO: " + e.strerror) # Rewind try: os.lseek(self._fd, 0, os.SEEK_SET) except OSError as e: raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) if buf[0] == b"0"[0]: return False elif buf[0] == b"1"[0]: return True raise GPIOError(None, "Unknown GPIO value: {}".format(buf)) def write(self, value): if not isinstance(value, bool): raise TypeError("Invalid value type, should be bool.") # Write value try: if value: os.write(self._fd, b"1\n") else: os.write(self._fd, b"0\n") except OSError as e: raise GPIOError(e.errno, "Writing GPIO: " + e.strerror) # Rewind try: os.lseek(self._fd, 0, os.SEEK_SET) except OSError as e: raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) def poll(self, timeout=None): if not isinstance(timeout, (int, float, type(None))): raise TypeError("Invalid timeout type, should be integer, float, or None.") # Setup poll p = select.poll() p.register(self._fd, select.POLLPRI | select.POLLERR) # Scale timeout to milliseconds if isinstance(timeout, (int, float)) and timeout > 0: timeout *= 1000 # Poll events = p.poll(timeout) # If GPIO edge interrupt occurred if events: # Rewind try: os.lseek(self._fd, 0, os.SEEK_SET) except OSError as e: raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) return True return False def read_event(self): raise NotImplementedError() def close(self): if self._fd is None: return try: os.close(self._fd) except OSError as e: raise GPIOError(e.errno, "Closing GPIO: " + e.strerror) self._fd = None if self._exported: # Unexport the line try: unexport_fd = os.open("/sys/class/gpio/unexport", os.O_WRONLY) os.write(unexport_fd, "{:d}\n".format(self._line).encode()) os.close(unexport_fd) except OSError as e: raise GPIOError(e.errno, "Unexporting GPIO: " + e.strerror) # Immutable properties @property def devpath(self): return self._path @property def fd(self): return self._fd @property def line(self): return self._line @property def name(self): return "" @property def label(self): return "" @property def chip_fd(self): raise NotImplementedError("Sysfs GPIO does not have a gpiochip file descriptor.") @property def chip_name(self): gpio_path = os.path.join(self._path, "device") gpiochip_path = os.readlink(gpio_path) if '/' not in gpiochip_path: raise GPIOError(None, "Reading gpiochip name: invalid device symlink \"{:s}\"".format(gpiochip_path)) return gpiochip_path.split('/')[-1] @property def chip_label(self): gpio_path = "/sys/class/gpio/{:s}/label".format(self.chip_name) try: with open(gpio_path, "r") as f_label: label = f_label.read() except (GPIOError, IOError) as e: if isinstance(e, IOError): raise GPIOError(e.errno, "Reading gpiochip label: " + e.strerror) raise GPIOError(None, "Reading gpiochip label: " + e.strerror) return label.strip() # Mutable properties def _get_direction(self): # Read direction try: with open(os.path.join(self._path, "direction"), "r") as f_direction: direction = f_direction.read() except IOError as e: raise GPIOError(e.errno, "Getting GPIO direction: " + e.strerror) return direction.strip() def _set_direction(self, direction): if not isinstance(direction, str): raise TypeError("Invalid direction type, should be string.") if direction.lower() not in ["in", "out", "high", "low"]: raise ValueError("Invalid direction, can be: \"in\", \"out\", \"high\", \"low\".") # Write direction try: with open(os.path.join(self._path, "direction"), "w") as f_direction: f_direction.write(direction.lower() + "\n") except IOError as e: raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror) direction = property(_get_direction, _set_direction) def _get_edge(self): # Read edge try: with open(os.path.join(self._path, "edge"), "r") as f_edge: edge = f_edge.read() except IOError as e: raise GPIOError(e.errno, "Getting GPIO edge: " + e.strerror) return edge.strip() def _set_edge(self, edge): if not isinstance(edge, str): raise TypeError("Invalid edge type, should be string.") if edge.lower() not in ["none", "rising", "falling", "both"]: raise ValueError("Invalid edge, can be: \"none\", \"rising\", \"falling\", \"both\".") # Write edge try: with open(os.path.join(self._path, "edge"), "w") as f_edge: f_edge.write(edge.lower() + "\n") except IOError as e: raise GPIOError(e.errno, "Setting GPIO edge: " + e.strerror) edge = property(_get_edge, _set_edge) def _get_bias(self): raise NotImplementedError("Sysfs GPIO does not support line bias property.") def _set_bias(self, bias): raise NotImplementedError("Sysfs GPIO does not support line bias property.") bias = property(_get_bias, _set_bias) def _get_drive(self): raise NotImplementedError("Sysfs GPIO does not support line drive property.") def _set_drive(self, drive): raise NotImplementedError("Sysfs GPIO does not support line drive property.") drive = property(_get_drive, _set_drive) def _get_inverted(self): # Read active_low try: with open(os.path.join(self._path, "active_low"), "r") as f_inverted: inverted = f_inverted.read().strip() except IOError as e: raise GPIOError(e.errno, "Getting GPIO active_low: " + e.strerror) if inverted == "0": return False elif inverted == "1": return True raise GPIOError(None, "Unknown GPIO active_low value: {}".format(inverted)) def _set_inverted(self, inverted): if not isinstance(inverted, bool): raise TypeError("Invalid drive type, should be bool.") # Write active_low try: with open(os.path.join(self._path, "active_low"), "w") as f_active_low: f_active_low.write("1\n" if inverted else "0\n") except IOError as e: raise GPIOError(e.errno, "Setting GPIO active_low: " + e.strerror) inverted = property(_get_inverted, _set_inverted) # String representation def __str__(self): try: str_direction = self.direction except GPIOError: str_direction = "<error>" try: str_edge = self.edge except GPIOError: str_edge = "<error>" try: str_chip_name = self.chip_name except GPIOError: str_chip_name = "<error>" try: str_chip_label = self.chip_label except GPIOError: str_chip_label = "<error>" try: str_inverted = str(self.inverted) except GPIOError: str_inverted = "<error>" return "GPIO {:d} (device={:s}, fd={:d}, direction={:s}, edge={:s}, inverted={:s}, chip_name=\"{:s}\", chip_label=\"{:s}\", type=sysfs)" \ .format(self._line, self._path, self._fd, str_direction, str_edge, str_inverted, str_chip_name, str_chip_label)