Изученая возможности MicroPython для своих целей натолкнулся на одну из реализаций библиотеки asyncio и, после недолгой переписки с Piter Hinch — автором библиотеки, понял, что мне необходимо глубже разобраться с принципами, базовыми понятиями и типичными ошибками использования методов асинхронного программирования. Тем более, что раздел для начинающих — как раз для меня.
Летом 2020 года вышло обновление этой библиотеки, описанное в asyncio v3, при наличии времени и заинтересованности непременно дополню перевод обновленного Руководства.
Это руководство предназначено для пользователей, имеющих разный уровень опыта работы с asyncio, в том числе содержит специальный раздел для начинающих.
Содержание
0. Введение
0.1.___Установка uasyncio на пустое устройство (hardware)
1. Планирование совместного исполнения программ
1.1.___Модули
2. Библиотека uasyncio
2.1.___Структура программы: цикл обработки событий
2.2.___Сопрограммы
2.2.1.______Постановки в очередь сопрограмм для участия в планировании
2.2.2.______Запуск обратного вызова функции (callback)
2.2.3.______Примечания: сопрограммы как связанные методы. Возвращаемые значения.
2.3.___Задержки
3. Синхронизация и ее классы
3.1.___Блокировка Lock
3.1.1.______Блокировки и тайм-ауты
3.2.___Событие Event
3.2.1.______Значение события
3.3.___Барьер Barrier
3.4.___Семафор Semaphore
3.4.1.______Ограниченный Семафор
3.5.___Очередь Queue
3.6.___Другие Классы синхронизации
4. Разработка классов для asyncio
4.1.___Классы с использованием ожидания await
4.1.1.______Использование в менеджерах контекста
4.1.2.______Await в сопрограмме
4.2.___Асинхронные итераторы
4.3.___Асинхронные менеджеры контекста
5. Исключения от тайм-аутов и из-за отмены задач
5.1.___Исключения
5.2.___Исключения вследствие тайм-аутов и из-за отмены задач
5.2.1.______Отмена задач
5.2.2.______Сопрограммы с таймаутами
6. Взаимодействие с оборудованием
6.1.___Проблемы синхронизации
6.2.___Опрос устройств с помощью сопрограмм
6.3.___Использование потокового механизма
6.3.1.______Пример драйвера UART
6.4.___Разработка драйвера для потокового (Stream) устройства
6.5.___Полный пример: aremote.py Драйвер для приемника ИК-пульта дистанционного управления.
6.6.___Драйвер для датчика температуры и влажности HTU21D.
7. Советы и подсказки
7.1.___Программа зависает
7.2.___uasyncio сохраняет состояние
7.3.___Сборка мусора
7.4.___Тестирование
7.5.___Распространенная ошибка. Это может быть трудно найти.
7.6.___Программирование с использованием сокетов (sockets)
7.6.1.______Проблемы с WiFi
7.7.___Аргументы конструктора цикла событий
8. Примечания для начинающих
8.1.___Проблема 1: циклы событий
8.2.___Проблема 2: методы блокировки
8.3.___Подход uasyncio
8.4.___Планирование в uasyncio
8.5.___Почему совместное, а не потоковое планирование (_thread)?
8.6.___Взаимодействие
8.7.___Опрос (polling)
0. Введение
Большая часть этого документа предполагает некоторое знакомство с асинхронным программированием. Для новичков введение можно найти в разделе 7.
Библиотека uasyncio для MicroPython включает подмножество asyncio библиотеки Python и предназначена для использования на микроконтроллерах. Как таковая она занимает небольшой объем оперативной памяти и настроена на быстрое переключение контекста с нулевым распределением оперативной памяти.
Этот документ описывает использование uasyncio с акцентом на создание драйверов для аппаратных устройств.
Цель состоит в том, чтобы спроектировать драйверы таким образом, чтобы приложение продолжало работать, пока драйвер ожидает ответа от устройства. При этом приложение остается чувствительным к другим событиям и взаимодействию с пользователем.
Еще одна важная область применения asyncio — сетевое программирование: в Интернете можно найти достаточно информации по этой теме.
Обратите внимание, что MicroPython основан на Python 3.4 с минимальными дополнениями Python 3.5. За исключением случаев, подробно описанных ниже, функции asyncio версий старше 3.4 не поддерживаются. Данный документ определяет функции, поддерживаемые в этом подмножестве.
Цель этого руководства — представить стиль программирования, совместимый с CPython V3.5 и выше.
0.1 Установка uasyncio на пустое устройство (hardware)
Рекомендуется использовать прошивку MicroPython V1.11 или новее. На многих платформах установка не требуется, так как uasyncioо уже скомпилирована в сборке. Для проверки достаточно набрать в REPL
import uasyncio
Следующие инструкции охватывают случаи, когда модули не установлены предварительно. Модули queues и synchro не являются обязательными, но необходимы для выполнения приводимых здесь примеров.
Устройство, подключенное к интернету
На устройстве, подключенном к Интернету и работающем встроенном программном обеспечении V1.11 или более поздней версии можно выполнить установку с использованием встроенной версии upip. Убедившись, что устройство подключено к вашей сети:
import upip
upip.install ( 'micropython-uasyncio' )
upip.install ( 'micropython-uasyncio.synchro' )
upip.install ( 'micropython-uasyncio.queues' )
Сообщения об ошибках от upip не слишком полезны. Если вы получили непонятную ошибку, лишний раз проверьте подключение к Интернету.
Аппаратное обеспечение без подключения к интернету (micropip)
Если на устройстве отсутствует подключение к Интернету (например, Pyboard V1.x), проще всего запустить на компьютере установку micropip.py в каталог по вашему выбору, а затем скопировать результирующую структуру каталогов на целевое устройство. Утилита micropip.py работает под Python 3.2 или выше и работает под управлением Linux, Windows и OSX. Подробнее информацию можно найти здесь.
Типичный вызов:
$ micropip.py install -p ~/rats micropython-uasyncio
$ micropip.py install -p ~/rats micropython-uasyncio.synchro
$ micropip.py install -p ~/rats micropython-uasyncio.queues
Устройство без подключения к Интернету (копирование исходников)
Если не использовать micropip.py, файлы должны быть скопированы из источника. В следующих инструкциях описывается копирование минимального количества файлов на целевое устройство, а также случай, когда uasyncio, для уменьшения занимаемого объема, необходимо сжать в скомпилированную сборку в виде байт-кода. Для последней версии, совместимой с официальными прошивками, файлы должны быть скопированы с официального сайта micropython-lib.
Клонируйте библиотеку на компьютер командой
$ git clone https://github.com/micropython/micropython-lib.git
На целевом устройстве создайте uasyncio каталог (необязательно в каталоге lib) и скопируйте в него следующие файлы:
• uasyncio/uasyncio/__init__.py
• uasyncio.core/uasyncio/core.py
• uasyncio.synchro/uasyncio/synchro.py
• uasyncio.queues/uasyncio/queues.py
Эти модули uasyncio могут быть сжаты в байткод путем размещения каталога uasyncio и его содержимое в порт каталога modules и перекомпилировав содержимое.
1. Совместное планирование
Техника совместного исполнения нескольких задач широко используется во встроенных системах, что предлагает меньше накладных расходов, чем потоковое (_thread) планирование, избегая многих ловушек, связанных с действительно асинхронными потоками выполнения.
1.1 Модули
Ниже приведен перечень модулей, которые могут исполняться на целевом устройстве.
Библиотеки
1. asyn.py Обеспечивает примитивы синхронизации Lock, Event, Barrier, Semaphore, BoundedSemaphore, Condition, gather. Обеспечивает поддержку отмены задач через классы NamedTask и Cancellable.
2. aswitch.py Представляет классы для сопряжения переключателей и кнопок, а также программный объект с возможностью повторной задержки. Кнопки представляют собой обобщение переключателей, обеспечивающих логическое, а не физическое состояние, а также события, вызываемые двойным и длительным нажатием.
Демонстрационные программы
Первые две наиболее полезны, так как они дают видимые результаты при доступе к оборудованию Pyboard.
- aledflash.py Мигает четырьмя индикаторами Pyboard асинхронно в течение 10 секунд. Простейшая демонстрация uasyncio. Импортируйте ее для запуска.
- apoll.py Драйвер устройства для акселерометра Pyboard. Демонстрирует использование сопрограммы для опроса устройства. Работает в течение 20 с. Импортируйте его для запуска. Требуется Pyboard V1.x.
- astests.py Тестовые / демонстрационные программы для модуля aswitch.
- asyn_demos.py Простые демонстрации отмены задач.
- roundrobin.py Демонстрация кругового планирования. Также эталон планирования производительности.
- awaitable.py Демонстрация класса с ожиданием. Один из способов реализации драйвера устройства, который опрашивает интерфейс.
- chain.py Скопировано из документации по Python. Демонстрация цепочки сопрограмм.
- aqtest.py Демонстрация класса Queue библиотеки uasyncio.
- aremote.py Пример драйвера устройства для ИК-протокола NEC.
- auart.py Демонстрация потокового ввода-вывода через Pyboard UART.
- auart_hd.py Использование Pyboard UART для связи с устройством с использованием полудуплексного протокола. Подходит для устройств, например, использующих набор команд модема «AT».
- iorw.py Демонстрация драйвера устройства чтения/записи с использованием механизма потокового ввода-вывода.
Тестовые программы
- asyntest.py Тесты для классов синхронизации в asyn.py .
- cantest.py Тесты отмены задания.
Утилита
1. check_async_code.py Утилита написана на Python3для обнаружения конкретных ошибок кодирования, которые может быть трудно найти. См. раздел 7.5.
Контроль
Каталог benchmarks содержит сценарии для проверки и характеристик планировщика uasyncio.
2. Библиотека uasyncio
Концепция asyncio основана на организации планирования совместного исполнения нескольких задач, которые в этом документе называются сопрограммами.
2.1 Структура программы: цикл событий
Рассмотрим следующий пример:
import uasyncio as asyncio
async def bar ():
count = 0,
while True :
count + = 1
print ( count )
await asyncio.sleep ( 1 ) # пауза 1с
loop = asyncio.get_event_loop ()
loop.create_task ( bar ()) # Запланировать как можно скорее
loop.run_forever ()
Выполнение программы продолжается до вызова loop.run_forever. На этом этапе выполнение контролируется планировщиком. Строка после loop.run_forever никогда не будет выполнена. Планировщик исполняет код bar потому что он был помещен в очередь планировщика loop.create_task. В этом тривиальном примере есть только одна сопрограмма bar. Если бы были другие, планировщик исполнял бы их в периоды, когда bar была приостановлена.
Большинство встроенных приложений имеют непрерывный цикл обработки событий. Цикл событий также может быть запущен способом, который разрешает завершение, используя метод цикла событий run_until_complete; это в основном используется в тестировании. Примеры можно найти в модуле astests.py.
Экземпляр цикла событий — это одиночный объект, созданный первым вызовом asyncio.get_event_loop() с двумя необязательными целочисленными аргументами, указывающих количество сопрограмм в двух очередях — стартовавшие и ожидающие. Обычно оба аргумента будут иметь одинаковое значение, равное как минимум числу одновременно исполняемых сопрограмм в приложении. Обычно достаточно значение по умолчанию — 16. Если используются значения не по умолчанию, см. Аргументы конструктора цикла событий ( раздел 7.7.).
Если сопрограмме необходимо вызвать метод цикла события (обычно create_task), вызов asyncio.get_event_loop() (без аргументов) эффективно вернет его.
2.2 Сопрограммы
Сопрограмма создается следующим образом:
async def foo ( delay_secs ):
await asyncio.sleep ( delay_secs )
print ( 'Hello' )
Сопрограмма может позволить другим сопрограммам запускаться с помощью await оператора. Сопрограмма должна содержать хотя бы одно await утверждение. Это побуждает сопрограмму выполняться до завершения, прежде чем выполнение перейдет к следующей инструкции. Рассмотрим пример:
await asyncio.sleep ( delay_secs )
await asyncio.sleep ( 0 )
Первая строка заставляет код приостанавливаться на время задержки, а другие сопрограммы используют это время для своего исполнения. Задержка, равная 0, приводит к тому, что все ожидающие сопрограммы планируются к исполнению в циклическом порядке до исполнения следующей строки. Смотрите пример roundrobin.py.
2.2.1. Очередь для планирования сопрограммы
- EventLoop.create_task Аргумент: Сопрограмма для запуска. Планировщик ставит сопрограмму в очередь для запуска как можно скорее. Вызов create_task немедленно возвращается. Сопрограмма в аргументе указывается с помощью синтаксиса вызова функции с необходимыми аргументами.
- EventLoop.run_until_complete Аргумент: Сопрограмма для запуска. Планировщик ставит сопрограмму в очередь для запуска как можно скорее. Сопрограмма в аргументе указывается с помощью синтаксиса вызова функции с необходимыми аргументами. Вызов un_until_complete возвращается, когда сопрограмма завершилась: этот метод обеспечивает способ выхода из планировщика.
- await Аргумент: Сопрограмма для запуска, указанная с помощью синтаксиса вызова функции. Запускает сопрограмму как можно скорее. Ожидающая сопрограмма блокируется до тех пор, пока одна из ожидаемых сопрограмм не завершится.
Выше cказанное совместимо с CPython. Дополнительные методы uasyncio обсуждаются в Примечаниях ( Раздел 2.2.3.).
2.2.2 Запуск функции обратного вызова (callback)
Обратные вызовы должны быть функциями Python, предназначенными для выполнения за короткий промежуток времени. Это связано с тем, что сопрограммы не будут иметь возможность работать в течение всего времени исполнения такой функции.
Следующие методы класса EventLoop используют обратные вызовы:
- call_soon — вызвать как можно скорее. Args: callback обратный вызов для запуска, *args любые позиционные аргументы могут следовать через запятую.
- call_later — вызвать после задержки в сек. Args: delay, callback, *args
- call_later_ms — вызвать после задержки в мс. Args: delay, callback, *args.
loop = asyncio.get_event_loop ()
loop.call_soon ( foo , 5 ) # Запланировать обратный вызов 'foo' как можно скорее с аргументом 5.
loop.call_later ( 2 , foo , 5 ) # запуститься через 2 секунды.
loop.call_later_ms ( 50 , foo , 5 ) # запуститься через 50 мс.
loop.run_forever ()
2.2.3 Примечания
Сопрограмма может содержать return инструкцию с произвольными возвращаемыми значениями. Чтобы получить это значение:
result = await my_coro ()
Сопрограмма может быть ограничена методами и должна содержать хотя бы одно await утверждение.
2.3 Задержки
Есть два варианта для организации задержек в сопрограммах. Для более длительных задержек и тех случаев, когда продолжительность не должна быть точной, можно использовать:
async def foo( delay_secs , delay_ms ):
await asyncio.sleep ( delay_secs )
print ( 'Hello' )
await asyncio.sleep_ms ( delay_ms )
Во время таких задержек планировщик будет исполнять других сопрограммы. Это может вносить неопределенность по времени, так как вызывающая сопрограмма будет запущена только тогда, когда будет выполнена та, которая выполняется в текущий момент. Величина задержки зависит от разработчика приложения, но, вероятно, будет порядка десятков или сотен мс; это обсуждается далее во Взаимодействии с аппаратными устройствами (Раздел 6).
Очень точные задержки могут быть исполнены с помощью функций utime — sleep_ms и sleep_us. Они лучше всего подходят для коротких задержек, так как планировщик не сможет исполнять другие сопрограммы, пока осуществляется задержка.
3.Синхронизация
Часто возникает необходимость обеспечить синхронизацию между сопрограммами. Распространенный пример — избегать так называемых «условий гонки», когда несколько сопрограмм одновременно требуют за доступ к одному ресурсу. Пример приведен в программе astests.py и обсуждается в документации. Другая опасность — «смертельные объятия», когда каждая сопрограмма ждет завершения другой.
В простых приложениях синхронизация может быть достигнута с помощью глобальных флагов или связанных переменных. Более элегантный подход заключается в использовании классов синхронизации. В модуле asyn.py предложены «микро» реализации классов Event, Barrier, Semaphore и Conditios, предназначеных для использования только с asyncio. Они не являются поточно-ориентированными и не должны использоваться с _thread модулем или обработчиком прерываний, если не указано иное. Так же реализован класс Lock, являющийся альтернативой официальной реализации.
Еще одна проблема синхронизации возникает с сопрограммами-производителями и сопрограммами-потребителями. Сопрограмма-производитель генерирует данные, которые использует сопрограмма-потребитель. Для решения подобных задач asyncio предоставляет класс Queue. Сопрограмма-производитель помещает данные в очередь, в то время как сопрограмма-потребитель ожидает его завершения (с другими операциями, запланированными на время). Класс Queue предоставляет гарантии удаления элементов в том порядке, в котором они были получены. В качестве альтернативы можно использовать класс Barrier, если сопрограмма-производитель должна ждать, пока сопрограмма-потребитель не будет готова получить доступ к данным.
Краткий обзор классов приведен ниже. Более подробно в полной документации.
3.1.Блокировка (Lock)
Lock гарантирует уникальный доступ к общему ресурсу. В следующем примере кода создается экземпляр lock класса Lock, который передается всем клиентам, желающим получить доступ к общему ресурсу. Каждая сопрограмма пытается захватить блокировку, приостанавливая выполнение до тех пор, пока не достигнет успеха:
import uasyncio as asyncio
from uasyncio.synchro import Lock
async def task(i, lock):
while 1:
await lock.acquire()
print("Acquired lock in task", i)
await asyncio.sleep(0.5)
lock.release()
async def killer():
await asyncio.sleep(10)
loop = asyncio.get_event_loop()
lock = Lock() # The global Lock instance
loop.create_task(task(1, lock))
loop.create_task(task(2, lock))
loop.create_task(task(3, lock))
loop.run_until_complete(killer()) # работать 10s
3.1.1.Блокировки (Lock) и таймауты
На момент написания (5 января 2018 г.) официально разработка uasycio класса Lock не закончена. Если для сопрограммы задан
тайм-аут(раздел 5.2.2.)
, в момент ожидания блокировки при срабатывании тайм-аут будет неэффективным. Он не получит TimeoutError пока не получит блокировку. То же самое относится и к отмене задачи.
Модуль asyn.py предлагает класс Lock, который работает в этих ситуациях. Данная реализация класса менее эффективна, чем официальный класс, но поддерживает дополнительные интерфейсы согласно версии CPython, включая использование менеджера контекста.
3.2 Событие (Event)
Event создает возможность одной или нескольким сопрограммам сделать паузу, пока другая не подаст сигнал об их продолжении. Экземпляр Event становится доступными для всех сопрограмм, использующих его:
import asyn
event = asyn.Event ()
Сопрограмма ждет события, объявив await event, после чего выполнение приостанавливается, пока другие сопрограммы не объявят event.set(). Полная информация.
Проблема может возникнуть если event.set() выдается в циклической конструкции; код должен подождать, пока все ожидающие объекты не получат доступ к событию, прежде чем устанавливать его снова. В случае, когда одна coro ожидает событие, это может быть достигнуто путем получения coro-события, очищающего событие:
async def eventwait ( event ):
await event
event.clear()
Сопрограмма, инициирующая событие, проверяет, что оно было обслужено:
async def foo ( event ):
while True :
# Получать данные откуда-то
while event.is_set ():
await asyncio.sleep ( 1 ) # Подождите, пока coro ответит на
event.set ()
В случае, когда несколько coros ждут синхронизации одного события, задачу можно решить с помощью события подтверждения. Каждой coro нужно отдельное событие.
async def eventwait ( событие , ack_event ):
await event
ack_event.set ()
Пример этого приведен в event_test функции в asyntest.py. Это громоздко В большинстве случаев — даже с одним ожидающим coro — класс Barrier, представленный ниже, предлагает более простой подход.
Событие также может обеспечить средство связи между обработчиком прерываний и coro. Обработчик обслуживает аппаратное обеспечение и устанавливает событие, которое проверяется coro уже в обычном режиме.
3.2.1 Значения события
Метод event.set() может принимать необязательное значение данных любого типа. Coro, ожидающая события, может получить его с помощью event.value(). Обратите внимание, что event.clear() будет установлено в значение None. Типичное использование этого для coro, устанавливающего событие — выпустить event.set(utime.ticks_ms()). Любая coro, ожидающая события, может определить возникшую задержку, например, чтобы выполнить компенсацию за это.
3.3.Барьер (Barrier)
Существует два варианта применения класса Barrier.
Во-первых, он может приостановить сопрограмму до тех пор, пока одина или несколько других сопрограмм не завершатся.
Во-вторых, он позволяет нескольким сопрограммам встречаться в определенной точке. Например, производитель и потребитель могут синхронизироваться в точке, где у производителя есть данные, и потребитель готов их использовать. В момент исполнения Barrierможет выполнить дополнительный обратный вызов до того, как барьер будет снят, и все ожидающие события могут продолжаться.
Обратный вызов может быть функцией или сопрограммой. В большинстве приложений, скорее всего, будет использоваться функция: она может быть гарантированно выполнена до завершения, прежде чем барьер будет снят.
Примером является функция barrier_test в asyntest.py. Во фрагменте кода этой программы:
import asyn
def callback(text):
print(text)
barrier = asyn.Barrier(3, callback, ('Synch',))
async def report():
for i in range(5):
print('{} '.format(i), end='')
await barrier
несколько экземпляров сопрограммы report печатают свой результат и делают паузу, пока другие экземпляры также не будут завершены и ожидают продолжения barrier. В этот момент выполняется обратный вызов. По его завершении возобновляется изначальная сопрограмма.
3.4 Семафор (Semaphore)
Семафор ограничивает число сопрограмм, которые могут получить доступ к ресурсу. Его можно использовать для ограничения количества экземпляров определенной сопрограммы, которые могут работать одновременно. Это выполняется с использованием счетчика доступа, который инициализируется конструктором и уменьшается каждый раз, когда сопрограмма получает семафор.
Самый простой способ использовать его в менеджере контекста:
import asyn
sema = asyn.Semaphore(3)
async def foo(sema):
async with sema:
# Здесь ограниченный доступ
Примером является функция semaphore_test в asyntest.py.
3.4.1 (Ограниченный) семафор
Работает аналогично классу Semaphore за исключением того, что, если release метод заставляет счетчик доступа превысить свое начальное значение, a ValueError устанавливается.
3.5 Очередь (Queye)
Класс Queue поддерживается официальной uasycio и пример программы aqtest.py демонстрирует его использование. Очередь создается следующим образом:
from uasyncio.queues import Queue
q = Queue ()
Типичная сопрограмма-производитель может работать следующим образом:
async def producer(q):
while True:
result = await slow_process() # так или иначе получить некоторые данные
await q.put(result) # сделать паузу, пока очередь с ограниченным размером не заполнится
и сопрограмма-потребитель может работать следующим образом:
async def consumer(q):
while True:
result = await(q.get()) # Остановиться, если q пуста
print('Result was {}'.format(result))
Класс Queue предоставляет значительные дополнительные функциональные возможности в том случае, когда размер очередей может быть ограничен и может быть опрошен статус. Поведение при пустой очереди (если размер ограничен) и поведение при полной очереди может контролироваться. Документация об этом есть в коде.
3.6 Другие Классы синхронизации
Библиотека asyn.py предоставляет «микро» реализации еще некоторых возможностей CPython.
Класс Condition позволяет сопрограмме уведомлять другие сопрограммы, ождидающие на заблокированном ресурсе. После получения уведомления они получат доступ к ресурсу и снимут блокировку по очереди. Уведомляющая сопрограмма может ограничивать количество сопрограмм, подлежащих уведомлению.
Класс Gather позволяет запустить список сопрограмм. После завершения последней будет возвращен список результатов. Эта «микро» реализация использует другой синтаксис. Тайм-ауты могут быть применены к любой из сопрограмм.
4. Разработка классов для asyncio
В контексте разработки драйверов устройств цель состоит в том, чтобы обеспечить их неблокирующую работу. Сопрограмма-драйвер должна гарантировать, что другие сопрограммы будут исполняться, пока драйвер ожидает исполнения устройством аппаратных операций. Например, задача, ожидающая данных, поступающих в UART, или пользователь, нажимающий кнопку, должен позволять планировать другие события, пока событие не произойдет.
4.1 Классы с использованием ожидания await
Сопрограмма может приостановить выполнение, ожидая объект awaitable. Под CPython пользовательский класс awaitable создается путем реализации специального метода __await__, который возвращает генератор. Класс awaitable используется следующим образом:
import uasyncio as asyncio
class Foo():
def __await__(self):
for n in range(5):
print('__await__ called')
yield from asyncio.sleep(1) # Другие сопрограммы запланированы здесь
return 42
__iter__ = __await__ # См. примечание ниже
async def bar():
foo = Foo() # Foo - awaitable Класс
print('waiting for foo')
res = await foo # Получает результат
print('done', res)
loop = asyncio.get_event_loop()
loop.run_until_complete(bar())
В настоящее время MicroPython не поддерживает __await__ (проблема # 2678) и для решения должна использоваться __iter__. Строка __iter__ = __await__ обеспечивает переносимость между CPython и MicroPython. Примеры кода смотрите в классах Event, Barrier, Cancellable, Condition в asyn.py.
4.1.1 Использование в контекстных менеджерах
Ожидаемые объекты могут использоваться в синхронных или асинхронных контекстных менеджерах, предоставляя необходимые специальные методы. Синтаксис:
with await awaitable as a: # Условие 'as' является необязательным
# код не приводится
async with awaitable as a: # Асинхронный контекстный менеджер (см.ниже)
# делать что-нибудь
Для достижения этого __await__ генератор должен вернуть self. Это передается любой переменной в as предложении, а также позволяет работать специальным методам. Посмотрите asyn.Condition и asyntest.condition_test где класс Condition использует await и при этом может использоваться в синхронном контекстном менеджере.
4.1.2 Await в сопрограмме
Язык Python требует, чтобы __await__ было функцией генератора. В MicroPython генераторы и сопрограммы идентичны, поэтому решение заключается в использовании yield from coro(args).
Цель этого руководства — предложить код, переносимый на CPython 3.5 или выше. В CPython генераторы и сопрограммы различны по смыслу. В CPython сопрограмма есть __await__ специальный метод, который извлекает генератор. Это переносимо:
up = False # Стартуем под MicroPython?
try:
import uasyncio as asyncio
up = True # Или можно использовать sys.implementation.name
except ImportError:
import asyncio
async def times_two(n): # Coro в ожидании
await asyncio.sleep(1)
return 2 * n
class Foo():
def __await__(self):
res = 1
for n in range(5):
print('__await__ called')
if up: # MicroPython
res = yield from times_two(res)
else: # CPython
res = yield from times_two(res).__await__()
return res
__iter__ = __await__
async def bar():
foo = Foo() # foo is awaitable
print('waiting for foo')
res = await foo # Получить значение
print('done', res)
loop = asyncio.get_event_loop()
loop.run_until_complete(bar())
Обратите внимание, что __await__, yield from asyncio.sleep(1) разрешены CPython. Я еще не понял, как это достигается.
4.2 Асинхронные итераторы
Асинхронные итераторы предоставляют средства для возврата конечной или бесконечной последовательности значений и могут использоваться в качестве средства извлечения последовательных элементов данных, когда они поступают с устройства только для чтения. Асинхронный итератор вызывает асинхронный код в своем методе next. Класс должен соответствовать следующим требованиям:
- У него есть метод __aiter__, определенный в async def и возвращающий асинхронный итератор.
- У него есть метод __anext__, который сам является сопрограммой — то есть определен через async def и содержащей, по крайней мере, один оператор await. Чтобы остановить итерацию, она должна вызвать StopAsyncIteration исключение.
Последовательные значения извлекаются c помощью async for как показано ниже:
class AsyncIterable:
def __init__(self):
self.data = (1, 2, 3, 4, 5)
self.index = 0
async def __aiter__(self):
return self
async def __anext__(self):
data = await self.fetch_data()
if data:
return data
else:
raise StopAsyncIteration
async def fetch_data(self):
await asyncio.sleep(0.1) # Другие сопрограммы могут исполняться
if self.index >= len(self.data):
return None
x = self.data[self.index]
self.index += 1
return x
async def run():
ai = AsyncIterable()
async for x in ai:
print(x)
4.3 Асинхронные контекстные менеджеры
Классы могут быть разработаны для поддержки асинхронных контекстных менеджеров, имеющих процедуры входа и выхода, которые являются сопрограмами. Примером является Класс Lock, описанный выше. Он имеет сопрограмму __aenter__, которая логически требуется для асинхронной работы. Для поддержки асинхронного протокола контекстного менеджера его __aexit__ метод также должен быть сопрограмой, что достигается путем включения await asyncio.sleep(0). Такие классы доступны изнутри сопрограммы со следующим синтаксисом:
async def bar ( lock ):
async with lock:
print ( «блокировка bar получена» )
Как и в случае с обычными менеджерами контекста, гарантированно будет вызван метод выхода, когда менеджер контекста завершит работу, как обычно, так и через исключение. Для достижения этой цели используются специальные методы __aenter__и __aexit__ которые должны быть определены, как сопрограммы, ожидающие другую сопрограмму или awaitable объект. Этот пример взят из класса Lock:
async def __aenter__(self):
await self.acquire() # a coro определена с помощью async def
return self
async def __aexit__(self, *args):
self.release() # Обычный метод
await asyncio.sleep_ms(0)
Если async with содержит предложение as variable, переменная получает значение, возвращаемое __aenter__.
Для обеспечения корректного поведения прошивка должна быть V1.9.10 или новее.
5.Исключения от тайм-аутов и из-за отмены задач
Эти темы связаны между собой: uasyncio включает отмену задач и применение тайм-аута к задаче, выдавая исключение для задачи особым образом.
5.1 Исключения
Если в сопрограмме возникает исключение (exeption), оно должно быть обработано либо в этой сопрограмме, либо в сопрограмме, ожидающей ее завершения. Это гарантирует, что исключение не распространяется на планировщик. Если исключение произойдет, планировщик прекратит работу, передав исключение в код, который запустил планировщик. Следовательно, чтобы избежать остановки планировщика, сопрограмма, запущенная с loop.create_task() должна перехватывать любые исключения внутри.
Использование throw или close чтобы запустить исключение в сопрограмме, неразумно. Это разрушает uasyncio, заставляя сопрограмму запускаться и, возможно, завершаться, когда он все еще находится в очереди на исполнение.
Приведенный пример иллюстрирует эту ситуацию. Если разрешено работать до конца, он работает как ожидалось.
import uasyncio as asyncio
async def foo():
await asyncio.sleep(3)
print('About to throw exception.')
1/0
async def bar():
try:
await foo()
except ZeroDivisionError:
print('foo прерывается из-за деления на 0') # Случилось!
raise # обеспечить выход путем распространения цикла.
except KeyboardInterrupt:
print('foo was interrupted by ctrl-c') # НИКОГДА НЕ СЛУЧИТСЯ!
raise
async def shutdown():
print('Shutdown is running.') # Случится в обоих случаях
await asyncio.sleep(1)
print('done')
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(bar())
except ZeroDivisionError:
loop.run_until_complete(shutdown())
except KeyboardInterrupt:
print('Keyboard interrupt at loop level.')
loop.run_until_complete(shutdown())
Однако выдача прерывания клавиатуры приводит к тому, что исключение переходит в цикл обработки событий. Это связано с тем, что выполнение uasyncio.sleep передается в цикл обработки событий. Следовательно, приложения, требующие кода очистки в ответ на прерывание клавиатуры, должны перехватывать исключение на уровне цикла событий.
5.2 Отмена и таймауты
Как указывалось выше, эти функции работают, вызывая исключение для задачи особым образом, используя специальный метод MicroPython сопрограмму pend_throw. Как это работает, зависит от версии. В официальной uasyncio v.2.0 исключение не обрабатывается до следующего запланированного задания. Это налагает задержку, если задача ожидает sleep ввода-вывода. Тайм-ауты могут выходить за пределы своего номинального периода. Задача отмены других задач не может определить, когда отмена завершена.
В настоящее время существует обходной путь и два решения.
- Обходной путь: библиотека asyn предоставляет средства ожидания отмены задач или групп задач. См.Отмена задания (раздел 5.2.1.) .
- Библиотека Paul Sokolovsky предоставляет uasyncio v2.4, но для этого требуется его прошивка Pycopy.
- Fast_io библиотека uasyncio решает эту проблему в Python (в менее элегантной манере) и работает под управлением официальной прошивки.
Иерархия исключений, используемая здесь Exception-CancelledError-TimeoutError.
5.2.1 Отмена задания
uasyncio обеспечивает функцию cancel(coro). Это работает, выбрасывая исключение для использования сопрограммы pend_throw. Так же это работает с вложенными сопрограммами. Использование заключается в следующем:
async def foo():
while True:
# делать что-то каждые 10 secs
await asyncio.sleep(10)
async def bar(loop):
foo_instance = foo() # Создать вхождение coro
loop.create_task(foo_instance)
# code omitted
asyncio.cancel(foo_instance)
Если этот пример запустить под uasyncio v2.0, то когда bar выдаст cancel он не вступит в силу до следующего запланированного foo и при отмене foo может возникнуть задержка до 10 секунд. Другой источник задержки возникнет, если foo ожидает ввода-вывода. Где бы задержка не возникла, bar не сможет определить, была ли foo отменена. Это имеет значение в некоторых случаях использования.
При использовании библиотек Paul Sokolovsky или fast_io достаточно применить sleep(0):
async def foo():
while True:
# делать что-то каждые 10 secs
await asyncio.sleep(10)
async def bar(loop):
foo_instance = foo() # Создать вхождение coro
loop.create_task(foo_instance)
# код не приводится
asyncio.cancel(foo_instance)
await asyncio.sleep(0)
# Теперь задача отменена
Это также будет работать в uasyncio v2.0, если foo (и любой ожидающей сопрограммы foo) никогда не выдавал sleep и не ождая ввода / вывода.
Поведение, которое может удивить неосторожного, возникает, когда ожидается отмена сопрограммы, запущенной create_task и находящеяся в режиме ожидания. Рассмотрим этот фрагмент:
async def foo():
while True:
# делать что-то каждые 10 secs
await asyncio.sleep(10)
async def foo_runner(foo_instance):
await foo_instance
print('Это не будет напечатано')
async def bar(loop):
foo_instance = foo()
loop.create_task(foo_runner(foo_instance))
# код не приводится
asyncio.cancel(foo_instance)
Когда foo отменяется, она удаляется из очереди планировщика; потому что в ней отсутствует return инструкция, вызывающая процедура foo_runner никогда не возобновляется. Рекомендуется всегда перехватывать исключение в самой внешней области действия функции, подлежащей отмене:
async def foo():
try:
while True:
await asyncio.sleep(10)
await my_coro
except asyncio.CancelledError:
return
В этом случае my_coro не нужно перехватывать исключение, так как оно будет распространено в вызывающий канал и захвачено там.
Примечание. Запрещается использовать методы close или throw методы сопрограмм, когда сопрограмма используется вне планировщика. Это подрывает планировщик, заставляя сопрограмму выполнять код, даже если он не запланирован. Это может иметь нежелательные последствия.
5.2.2 Сопрограммы с таймаутами
Таймауты реализуются с помощью uasyncio методов .wait_for() и .wait_for_ms(). Они принимают в качестве аргументов сопрограмму и время ожидания в секундах или мс соответственно. Если тайм-аут истекает, TimeoutError будет вброшен в сопрограмму с помощью pend_throw. Это исключение должно быть перехвачено либо пользователем, либо вызывающим абонентом. Это необходимо по причине, описанной выше: если время ожидания истекает, оно отменяется. Если только ошибка не будет перехвачена и не вернётся, единственный путь, по которому вызывающий может продолжить, — это перехват самого исключения.
Там, где исключение перехвачено сопрограммой, у меня возникли неясные сбои, если исключение не перехвачено во внешней области видимости, как показано ниже:
import uasyncio as asyncio
async def forever():
try:
print('Starting')
while True:
await asyncio.sleep_ms(300)
print('Got here')
except asyncio.TimeoutError:
print('Got timeout') # And return
async def foo():
await asyncio.wait_for(forever(), 5)
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(foo())
В качестве альтернативы можно перехватить вызывающей функцией:
import uasyncio as asyncio
async def forever():
print('Starting')
while True:
await asyncio.sleep_ms(300)
print('Got here')
async def foo():
try:
await asyncio.wait_for(forever(), 5)
except asyncio.TimeoutError:
pass
print('Timeout elapsed.')
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(foo())
Примечение для Uasyncio v2.0.
Это не применимо к библиотекам Paul Sokolovsky или fast_io.
Если сопрограмма запускает await asyncio.sleep(t), с большой задержкой t, сопрограмма не будет перезагружена до истечения t. Если тайм-аут истек до того, как завершится sleep, произойдет TimeoutError при перезагрузке сопрограммы — т.е. когда истечет t. В режиме реального времени и с точки зрения вызывающего абонента, его ответ TimeoutError будет отложен.
Если это важно для приложения, создайте длинную задержку, ожидая короткую в цикле. Сопрограмма asyn.sleep поддерживает это.
6. Взаимодействие с оборудованием
В основе взаимодействия между uasyncio и внешними асинхронными событиями лежат опросы (polling). Аппаратное обеспечение, требующее быстрого ответа, может использовать прерывание. Но взаимодействие между подпрограммой обработки прерываний (ISR) и пользовательской сопрограммой будет основано на опросах. Например, ISR может вызвать Event или установить глобальный флаг, в то время как сопрограмма, ожидающая результата, опрашивает объект каждый раз, когда запрос запланирован.
Опрос может осуществляться двумя способами, явным или неявным. Последнее выполняется с использованием stream I/O механизма, который представляет собой систему, предназначенную для потоковых устройств, таких как UART и сокеты. В самом простом явном опросе может состоять такой код:
async def poll_my_device():
global my_flag # Установлен аппаратным ISR
while True:
if my_flag:
my_flag = False
# service the device
await asyncio.sleep(0)
Вместо глобального флага можно использовать переменную экземпляра класса Event или экземпляр класса, использующего await. Явный опрос обсуждается ниже.
Неявный опрос состоит из разработки драйвера, который будет работать как потоковое устройство ввода-вывода, такое как UART или сокет stream I/O, который опрашивает устройства, используя систему select.poll Python: поскольку опрос выполняется в C, он быстрее и эффективнее, чем явный опрос. Использование stream I/O обсуждается в разделе 6.3.
Благодаря своей эффективности неявный опрос дает преимущество наиболее быстрым драйверам устройств ввода / вывода: потоковые драйверы могут быть созданы для многих устройств, которые обычно не рассматриваются как потоковые устройства. Более подробно это обсуждается в разделе 6.4.
6.1 Проблемы синхронизации
И явный, и неявный опрос в настоящее время основаны на циклическом планировании. Предположим, что ввод-вывод работает одновременно с N пользовательскими сопрограммами, каждая из которых исполняется с нулевой задержкой. Когда ввод-вывод будет обслужен, он будет затем опрошен, как только все пользовательские операции будут запланированы. Предполагаемая задержка должна учитываться при проектировании. Каналы ввода / вывода могут потребовать буферизации, при этом ISR обслуживает оборудование в режиме реального времени от буферов и сопрограмм, заполняя или освобождая буферы в более медленное время.
Также необходимо учитывать возможность выхода за пределы: это тот случай, когда что-то, опрашиваемое сопрограммой, происходит более одного раза до того, как на самом деле запланирована сопрограммой.
Другой проблемой синхронизации является точность задержек. Если сопрограмма издает
await asyncio.sleep_ms ( t )
# следующая строка
планировщик гарантирует, что выполнение будет приостановлено как минимум на t мс. Фактическая задержка может быть больше чем t, что зависит от текущей загрузки системы. Если в это время другие сопрограммы находятся в ожидании завершения ненулевых задержек, следующая строка будет немедленно запланирована к исполнению. Но если другие сопрограммы так же ожидают выполнения (либо потому, что они издали нулевую задержку, либо потому, что их время также истекло), они могут быть запланированы к исполнению раньше. Это вносит неопределенность синхронизации в sleep() и sleep_ms() функции. Значение наихудшего случая для этого переполнения может быть рассчитано путем суммирования значений времени выполнения всех таких сопрограмм для определения наихудшего времени передачи в планировщик.
Fast_io версии uasyncio в этом контексте обеспечивает способ гарантировать, что потоковый ввод/вывод будет опрашиваться на каждой итерации планировщика. Есть надежда, что официальное uasyncio примет соответствующие поправки в свое время.
6.2 Опрос устройств с помощью сопрограмм
Это простой подход, который наиболее подходит для устройств, которые могут опрашиваться с относительно низкой скоростью. Это связано прежде всего с тем, что опрос с коротким (или нулевым) интервалом опроса может привести к тому, что сопрограмма потребляет больше процессорного времени, чем желательно для попадания в интервал.
Пример apoll.py демонстрирует этот подход, опрашивая акселерометр Pyboard с интервалом 100 мс. Он выполняет простую фильтрацию, чтобы игнорировать шум, и печатает сообщение каждые две секунды, если перемещения не происходит.
Пример aswitch.py представляет драйверы для переключателей и кнопочных устройств.
Пример драйвера для устройства, способного считывать и записывать, показан ниже. Для удобства тестирования Pyboard UART 4 эмулирует условное устройство. Драйвер реализует Класс RecordOrientedUart, в котором данные поставляются в записях переменной длины, состоящих из байтовых экземпляров. Объект добавляет разделитель перед отправкой и буферизует входящие данные, пока не будет получен добавленный разделитель. Это лишь демонстрационная версия и неэффективный способ использования UART по сравнению с потоковым вводом/выводом.
В целях демонстрации асинхронной передачи мы предполагаем, что эмулируемое устройство имеет средство проверки того, что передача завершена и что приложение требует, чтобы мы подождали этого. Ни одно из предположений не является верным в этом примере, но код подделывает его путем await asyncio.sleep(0.1).
Для запуска не забудьте соединить выводы Pyboard X1 и X2 (UART Txd и Rxd)
import uasyncio as asyncio
from pyb import UART
class RecordOrientedUart():
DELIMITER = b''
def __init__(self):
self.uart = UART(4, 9600)
self.data = b''
def __iter__(self): # Not __await__ issue #2678
data = b''
while not data.endswith(self.DELIMITER):
yield from asyncio.sleep(0) # Необходимо, потому что:
while not self.uart.any():
yield from asyncio.sleep(0) # timing may mean this is never called
data = b''.join((data, self.uart.read(self.uart.any())))
self.data = data
async def send_record(self, data):
data = b''.join((data, self.DELIMITER))
self.uart.write(data)
await self._send_complete()
# В реальном драйвере устройства мы бы опрашивали аппаратное устройство
# на предмет завершения в цикле с помощью await asyncio.sleep(0)
async def _send_complete(self):
await asyncio.sleep(0.1)
def read_record(self): # Synchronous: await the object before calling
return self.data[0:-1] # Discard delimiter
async def run():
foo = RecordOrientedUart()
rx_data = b''
await foo.send_record(b'A line of text.')
for _ in range(20):
await foo # Другие coros исполняются планировщиком пока мы ожидаем готовности foo
rx_data = foo.read_record()
print('Got: {}'.format(rx_data))
await foo.send_record(rx_data)
rx_data = b''
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
6.3 Использование потокового механизма (Stream)
В примере демонстрируется одновременный ввод-вывод на одном UART микропроцессора Pyboard.
Для запуска следует соединить выводы Pyboard X1 и X2 (UART Txd и Rxd)
import uasyncio as asyncio
from pyb import UART
uart = UART(4, 9600)
async def sender():
swriter = asyncio.StreamWriter(uart, {})
while True:
await swriter.awrite('Hello uartn')
await asyncio.sleep(2)
async def receiver():
sreader = asyncio.StreamReader(uart)
while True:
res = await sreader.readline()
print('Received', res)
loop = asyncio.get_event_loop()
loop.create_task(sender())
loop.create_task(receiver())
loop.run_forever()
Поддерживающий код можно найти в __init__.py в uasyncio библиотеке. Механизм работает, так как драйвер устройства (написанный на C) реализует следующие методы: ioctl, read, readline и write. Раздел 6.4.Написание драйвера потокового устройства раскрывает подробности о том, как такие драйверы могут быть написаны на Python.
UART может получать данные в любое время. Механизм потокового ввода-вывода проверяет наличие ожидающих входящих символов всякий раз, когда планировщик получает контроль. Когда сопрограмма работает, программа обработки прерываний буферизует входящие символы; они будут удалены, когда сопрограмма уступит время планировщику. Следовательно, приложения UART должны быть спроектированы таким образом, чтобы сопрограммы минимизировали время между передачей в планировщик, чтобы избежать переполнения буфера и потери данных. Это можно улучшить, используя больший буфер чтения UART, либо меньшую скорость передачи данных. В качестве альтернативы аппаратное управление потоком обеспечит решение, если источник данных его поддерживает.
6.3.1 Пример драйвера UART
Программа auart_hd.py иллюстрирует способ связи с полудуплексным устройством, таким как устройство, отвечающее на набор команд модема «AT». Полудуплекс означает, что устройство никогда не отправляет незапрошенные данные: его передачи всегда выполняются в ответ на полученную команду от мастера.
Устройство эмулируется путем запуска тест на Pyboard с двумя проводными связями.
(Очень упрощенное) эмулируемое устройство реагирует на любую команду, посылая четыре строки данных с паузой между каждой, чтобы имитировать медленную обработку.
Мастер отправляет команду, но заранее не знает, сколько строк данных будет возвращено. Он запускает таймер повторного запуска, который перезапускается каждый раз при получении строки. Когда таймер истекает, предполагается, что устройство завершило передачу, и возвращается список принятых строк.
Также продемонстрирован случай отказа устройства, что достигается путем пропуска передачи перед ожиданием ответа. После истечения времени ожидания возвращается пустой список. Смотрите комментарии к коду для более подробной информации.
6.4 Разработка драйвера потокового (Stream) устройства
Механизм потокового ввода/вывода (stream I/O) предназначен для управления работой потоковых устройств ввода/вывода, таких как UART и сокеты(socket). Механизм может использоваться драйверами любого регулярно опрашиваемого устройства путем делегирования планировщику, который использует select, опроса готовности любых устройств в очереди. Это более эффективно, чем выполнение нескольких операций сопрограмм, каждый из которых опрашивает устройство, отчасти потому, что select написано на C, а также потому, что сопрограмма, выполняющая опрос, откладывается до тех пор, пока опрашиваемый объект не вернет состояние готовности.
Драйвер устройства, способный обслуживать механизм потокового ввода/вывода желательно должен поддерживать методы StreamReader, StreamWriter . Читаемое устройство должно обеспечивать как минимум один из следующих методов. Обратите внимание, что это синхронные методы. Метод ioctl (смотри ниже) гарантирует, что они называются только тогда, когда имеются данные. Методы должны возвращаться как можно быстрее, используя столько данных, сколько доступно.
readline() Вернуть столько символов, сколько доступно, вплоть до любого символа новой строки. Требуется, если использовать StreamReader.readline()
read(n) Вернуть столько символов, сколько доступно, но не более n. Требуется, если использовать StreamReader.read() или StreamReader.readexactly()
Создаваемый драйвер должен обеспечить следующий синхронный метод при этом с немедленным возвратом:
write с аргументами buf, off, sz.
Где:
buf — это буфер для записи.
off — смещение в буфер первого символа для записи.
sz — запрашиваемое количество символов для записи.
Возвращаемое значение — количество фактически написанных символов (может быть 1, если устройство работает медленно).
Метод ioctl гарантирует, что будет вызван только тогда, когда устройство будет готово к приему данных.
Все устройства должны обеспечивать метод ioctl, который опрашивает оборудование для определения его состояния готовности. Типичный пример для драйвера чтения/записи:
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MyIO(io.IOBase):
# Методы на показаны
def ioctl(self, req, arg): # see ports/stm32/uart.c
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if hardware_has_at_least_one_char_to_read:
ret |= MP_STREAM_POLL_RD
if arg & MP_STREAM_POLL_WR:
if hardware_can_accept_at_least_one_write_character:
ret |= MP_STREAM_POLL_WR
return ret
Ниже приведено описание Класса MillisecTimer ожидания задержки:
import uasyncio as asyncio
import utime
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MillisecTimer(io.IOBase):
def __init__(self):
self.end = 0
self.sreader = asyncio.StreamReader(self)
def __iter__(self):
await self.sreader.readline()
def __call__(self, ms):
self.end = utime.ticks_add(utime.ticks_ms(), ms)
return self
def readline(self):
return b'n'
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if utime.ticks_diff(utime.ticks_ms(), self.end) >= 0:
ret |= MP_STREAM_POLL_RD
return ret
который может быть использован следующим образом:
async def timer_test ( n ):
timer = ms_timer.MillisecTimer ()
await timer ( 30 ) # Пауза 30 мс
По сравнению с официальной uasyncio подобная реализация не дает никаких преимуществ по сравнению с await asyncio.sleep_ms(). Применение fast_io обеспечивает значительно более точные задержки при обычной схеме использования, когда сопрограммы ожидают нулевую задержку.
Можно использовать планирование ввода / вывода, чтобы связать событие с обратным вызовом. Это более эффективно, чем цикл опроса, потому что выполнение опроса не запланировано, пока ioctl не вернется состояние готовности. Далее выполняется обратный вызов, когда обратный вызов меняет состояние.
import uasyncio as asyncio
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class PinCall(io.IOBase):
def __init__(self, pin, *, cb_rise=None, cbr_args=(), cb_fall=None, cbf_args=()):
self.pin = pin
self.cb_rise = cb_rise
self.cbr_args = cbr_args
self.cb_fall = cb_fall
self.cbf_args = cbf_args
self.pinval = pin.value()
self.sreader = asyncio.StreamReader(self)
loop = asyncio.get_event_loop()
loop.create_task(self.run())
async def run(self):
while True:
await self.sreader.read(1)
def read(self, _):
v = self.pinval
if v and self.cb_rise is not None:
self.cb_rise(*self.cbr_args)
return b'n'
if not v and self.cb_fall is not None:
self.cb_fall(*self.cbf_args)
return b'n'
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
v = self.pin.value()
if v != self.pinval:
self.pinval = v
ret = MP_STREAM_POLL_RD
return ret
И вновь — на официальном uasyncio задержка может высокой. В зависимости от дизайна приложения версия fast_io может оказаться более эффективной.
Демонстрационная программа iorw.py иллюстрирует полный пример. Обратите внимание, что на момент написания статьи в официальном uasyncio есть ошибка, из-за которой это не работает. Есть два решения. Обходной путь — написать два отдельных драйвера, один только для чтения, а другой только для записи. Второй — использовать fast_io, который решает эту проблему.
В официальном uasyncio ввод/вывод планируется довольно редко.
6.5 Полный пример: aremote.py
Драйвер предназначен для приема/декодирования сигналов с инфракрасного пульта дистанционного управления. Сам драйвер aremote.py. Следующие примечания являются существенными касательно использования asyncio.
Прерывание на контакте записывает время изменения состояния (в мкс) и устанавливает событие, пропуская время, когда произошло первое изменение состояния. Сопрограмма ожидает события, выдает длительность пакета данных, затем декодирует сохраненные данные перед вызовом указанного пользователем обратного вызова.
Передача времени экземпляру Event позволяет сопрограмме компенсировать любую asyncio задержку при настройке периода задержки.
6.6 HTU21D датчик окружающей среды
Драйвер чипа HTU21D обеспечивает точные измерения температуры и влажности.
Чипу требуется порядка 120 мс, чтобы получить оба элемента данных. Драйвер работает асинхронно, инициируя получение и использование await asyncio.sleep(t) до чтения данных, обновляет переменные temperature и humidity, к которым можно получить доступ в любой момент, что позволяет другим сопрограммам запускаться во время работы драйвера чипа.
7. Советы и подсказки
7.1 Программа зависает
Зависание обычно происходит из-за того, что задача заблокирована без уступки: это приведет к зависанию всей системы. При разработке полезно иметь сопрограмму, которая периодически включает встроенный светодиод. Это обеспечивает подтверждение того, что планировщик еще работает.
7.2 uasyncio сохраняет состояние
При запуске программ, использующих uasyncio в REPL, выполните программный сброс (ctrl-D) между запусками. В связи с тем, что uasyncio сохраняет состояние между запусками, при очередном запуске может происходить непредсказуемое поведению.
7.3 Сборка мусора
Можно исполнить сопрограмму, предварительно указав import gc:
gc.collect ()
gc.treshold ( gc.mem_free () // 4 + gc.mem_alloc ())
Цель этого обсуждается здесь, в разделе о куче (heap).
7.4 Тестирование
Желательно убедиться в том, что драйвер устройства удерживает контроль, когда это необходимо, что можно сделать, запустив один или несколько экземпляров фиктивных сопрограмм, запускающих цикл печати сообщения и проверяя, что оно выполняется в периоды, когда драйвер находится в режиме ожидания:
async def rr(n):
while True:
print('Roundrobin ', n)
await asyncio.sleep(0)
В качестве примера типа опасности, которая может возникнуть, в приведенном выше примере RecordOrientedUart __await__ метод изначально был записан как:
def __await__(self):
data = b''
while not data.endswith(self.DELIMITER):
while not self.uart.any():
yield from asyncio.sleep(0)
data = b''.join((data, self.uart.read(self.uart.any())))
self.data = data
В результате исполнение растягивается до тех пор, пока не будет получена вся запись, а так же с тем, что uart.any() всегда возвращает ненулевое количество полученных символов. К моменту вызова все символы могут быть уже получены. Такую ситуацию можно разрешить использованием внешнего цикла:
def __await__(self):
data = b''
while not data.endswith(self.DELIMITER):
yield from asyncio.sleep(0) # Необходимо, так как:
while not self.uart.any():
yield from asyncio.sleep(0) # возможно по таймингу никогда не будет исполнено
data = b''.join((data, self.uart.read(self.uart.any())))
self.data = data
Возможно, стоит отметить, что эта ошибка не была бы очевидной, если бы данные отправлялись в UART с более низкой скоростью, а не с помощью теста с обратной связью. Добро пожаловать в радости программирования в реальном времени.
7.5 Распространенная ошибка
Если функция или метод определены async def и впоследствии вызваны так, как если бы они были обычными (синхронными) вызываемыми, MicroPython не выдает сообщение об ошибке. Это по замыслу. Обычно это приводит к тому, что программа молча не работает правильно:
async def foo():
# code
loop.create_task(foo) # Вариант 1 1: foo никогда не будет исполняться
foo() # Вариант 2: Аналогично.
У меня есть предложение, которое предлагает ситуацию исправить в 1 варианте с помощью fast_io.
Модуль check_async_code.py пытается обнаружить случаи сомнительного использования сопрограмм. Он написан на Python3 и предназначен для работы на ПК. Используется в сценариях, написанных в соответствии с рекомендациями, изложенными в этом руководстве с сопрограммами, объявленными с использованием async def. Модуль принимает один аргумент, путь к исходному файлу MicroPython (или —help).
Обратите внимание, что он несколько грубоват и предназначен для использования в синтаксически правильном файле, который по умолчанию не запускается. Используйте инструмент, например, pylint для общей проверки синтаксиса (в pylint в настоящее время эта ошибка отсутствует).
Сценарий выдает ложные срабатывания. По замыслу сопрограммы являются объектами первого уровня, их можно передавать функциям и хранить их в структурах данных. В зависимости от логики программы можно сохранить функцию или результат ее выполнения. Сценарий не может определить намерение. Он направлен на игнорирование случаев, которые кажутся правильными, при выявлении других случаев для рассмотрения. Предположим foo, где сопрограмма объявлена как async def:
loop.run_until_complete(foo()) # Нет предупреждения
bar(foo) # Эти строки могут вызвать предупреждение, но могут оказаться и неправильными
bar(foo())
z = (foo,)
z = (foo(),)
foo() # Получит предупреждение: безусловно неправильное использование.
Я нахожу это полезным как есть, но улучшения всегда приветствуются.
7.6 Программирование с использованием сокетов (sockets)
Существует два основных подхода к программированию сокетов uasyncio. По умолчанию, сокеты блокируются до завершения указанной операции чтения или записи. Uasyncio поддерживает блокировку сокетов, используя select.poll для предотвращения их блокирования планировщиком. В большинстве случаев проще всего использовать именно этот механизм. Пример клиентского и серверного кода можно найти в каталоге client_server. Userver использует приложение select.poll явно опрашивая сокет сервера.
Клиентские сокеты используют его неявно в том смысле, что потоковый механизм uasyncio непосредственно использует его.
Обратите внимание, что socket.getaddrinfo в настоящее время блокируемые. Время в примере кода будет минимальным, но если требуется поиск DNS, период блокировки может быть значительным.
Второй подход к программированию сокетов заключается в использовании неблокирующих сокетов. Это добавляет сложности, но необходимо в некоторых приложениях, особенно если подключение осуществляется через WiFi (см. Ниже).
На момент написания статьи (март 2019 г.) поддержка TLS для неблокирующих сокетов находилась в стадии разработки. Его точный статус неизвестен (мне).
Использование неблокирующих сокетов требует некоторого внимания к деталям. Если неблокирующее чтение выполняется из-за задержки сервера, нет гарантии, что все (или любые) запрошенные данные будут возвращены. Аналогичным образом записи могут не перейти к завершению.
Следовательно, асинхронные методы чтения и записи должны итеративно выполнять неблокирующую операцию, пока требуемые данные не будут прочитаны или записаны. На практике может потребоваться тайм-аут, чтобы справиться с перебоями в работе сервера.
Еще одним осложнением является то, что порт ESP32 имел проблемы, которые требовали довольно неприятных взломов для безошибочной работы. Я не проверял, так ли это до сих пор.
Модуль sock_nonblock.py иллюстрирует требуемые методы. Это не рабочая демонстрация и решения, вероятно, будут зависеть от приложения.
7.6.1 Проблемы с WiFi
Потоковый механизм uasyncio не лучший вариант при обнаружении WiFi отключений. Я счел необходимым использовать неблокирующие сокеты для обеспечения отказоустойчивой работы и переподключения клиента при наличии сбоев.
Этот документ описывает проблемы, с которыми я столкнулся в приложениях WiFi, которые держат сокеты открытыми в течение длительных периодов, и обрисовывает в общих чертах решение.
Pltcm предлагает устойчивый асинхронный MQTT-клиент, который обеспечивает целостность сообщений при сбоях WiFi. Описан простой асинхронный полнодуплексный последовательный канал между беспроводным клиентом и проводным сервером с гарантированной доставкой сообщений.
7.7 Аргументы конструктора цикла событий
Небольшая ошибка может возникнуть, если вам нужно создать цикл обработки событий со значениями, отличающимися от заданных по умолчанию. Такой цикл необходимо декларировать перед запуском любого другого кода с использованием asyncio потому, что эти значения могут потребоваться в этом коде. В противном случае код будет инициализирован значениями по умолчанию:
import uasyncio as asyncio
import some_module
bar = some_module.Bar() # Конструктор вызывает get_event_loop()
# задавая значения по умолчанию
loop = asyncio.get_event_loop(runq_len=40, waitq_len=40)
Учитывая, что импорт модуля может выполнять код, самый безопасный способ — создать экземпляр цикла событий сразу после импорта uasyncio.
import uasyncio as asyncio
loop = asyncio.get_event_loop(runq_len=40, waitq_len=40)
import some_module
bar = some_module.Bar() # get_event_loop() вызов теперь безопасен
Я предпочтитаю при написании модулей для использования другими программами избегать запуска кода uasyncio при импорте. Напишите функции и методы для ожидания цикла событий как аргумента. Затем убедитесь, что только приложения верхнего уровня вызывают get_event_loop:
import uasyncio as asyncio
import my_module # Не запускает код при загрузке
loop = asyncio.get_event_loop(runq_len=40, waitq_len=40)
bar = my_module.Bar(loop)
Этот вопрос обсуждается здесь.
8 заметок для начинающих
Эти заметки предназначены для новичков в асинхронном коде и начинаются с описания проблем, которые стараются решить планировщики, а так же дают обзор подхода uasyncio к решениям.
Раздел 8.5 обсуждает относительные достоинства модулей uasyncio и _thread, а так же почему вы можете предпочесть использование сопрограмм uasyncio упреждающему планированию ( _thread).
8.1 Проблема 1: циклы событий
Типичное приложение прошивки работает непрерывно и при этом должно реагировать на внешние события, которые могут включать в себя изменение напряжения на АЦП, появление аппаратного прерывания, или символа поступившего в UART, или данных доступных в сокете. Эти события происходят асинхронно, и код должен иметь возможность отвечать независимо от того, в каком порядке они происходят. Кроме того, может потребоваться выполнение задач, зависящих от времени, например, мигание светодиодов.
Очевидный способ сделать это с помощью цикла событий uasycio. Этот пример не является практическим кодом, но служит для иллюстрации общей формы цикла событий.
def event_loop():
led_1_time = 0
led_1_period = 20
led_2_time = 0
led_2_period = 30
switch_state = switch.state() # Текущее состояние переключателя
while True:
time_now = utime.time()
if time_now >= led_1_time: # Мигание LED #1
led1.toggle()
led_1_time = time_now + led_1_period
if time_now >= led_2_time: # Мигание LED #2
led2.toggle()
led_2_time = time_now + led_2_period
# Можно прицепить еще LEDs
if switch.value() != switch_state:
switch_state = switch.value()
# делать что-нибудь
if uart.any():
# обрабатывать ввод с UART
Такой цикл работает для простых примеров, но по мере увеличения количества событий код быстро становятся громоздкими. Они также нарушают принципы объектно-ориентированного программирования, объединяя большую часть программной логики в одном месте, а не связывая код с контролируемым объектом. Мы хотим разработать класс для светодиода, способного мигать, который можно вставить в модуль и импортировать. Подход ООП к миганию светодиода может выглядеть следующим образом:
import pyb
class LED_flashable():
def __init__(self, led_no):
self.led = pyb.LED(led_no)
def flash(self, period):
while True:
self.led.toggle()
# каким-то образом ждать завершения периода period,
# при этом позволяя другим задачам исполняться одновременно с ожиданием
Планировщик в uasyncio позволяет создавать такие классы.
8.2 Проблема 2: методы блокировки
Предположим, вам нужно прочитать некоторое количество байтов из сокета. Если вы вызываете socket.read(n) с блокирующим сокетом по умолчанию, он будет «блокироваться» (то есть не сможет завершиться), пока не будут получены n байтов. В течение этого периода приложение не будет реагировать на другие события.
С помощью неблокирующего сокета uasyncio вы можете написать метод асинхронного чтения. Задача, требующая данных, будет (обязательно) блокироваться до тех пор, пока они не будут получены, но в течение этого периода будут выполняться другие задачи, что позволит приложению оставаться отзывчивым.
8.3. Подходы uasyncio
В следующем классе предусмотрен светодиод, который можно включать и выключать, а также можно мигать с произвольной скоростью. Экземпляр LED_async использует метод run, который можно использовать для непрерывной работы. Поведением светодиодов можно управлять с помощью методов on(), off() и flash(secs).
import pyb
import uasyncio as asyncio
class LED_async():
def __init__(self, led_no):
self.led = pyb.LED(led_no)
self.rate = 0
loop = asyncio.get_event_loop()
loop.create_task(self.run())
async def run(self):
while True:
if self.rate <= 0:
await asyncio.sleep_ms(200)
else:
self.led.toggle()
await asyncio.sleep_ms(int(500 / self.rate))
def flash(self, rate):
self.rate = rate
def on(self):
self.led.on()
self.rate = 0
def off(self):
self.led.off()
self.rate = 0
Следует отметить, что on(), off() и flash() представляют из себя обычные синхронные методы. Они изменяют поведение светодиода, но возвращаются немедленно. Мигание происходит «в фоновом режиме». Это подробно объясняется в следующем разделе.
Класс соответствует принципу ООП, заключающемуся в сохранении логики, связанной с устройством, в классе. При этом, применение uasyncio гарантирует, что во время мигания светодиода приложение может реагировать на другие события. Программа, приведенная ниже, мигает четырьмя светодиодами Pyboard с различной частотой, а также реагирует на кнопку USR, которая ее завершает.
import pyb
import uasyncio as asyncio
from led_async import LED_async # Класс, описанный выше
async def killer(): # сопрограмма, необходимая для выхода из цикла
sw = pyb.Switch()
while not sw.value():
await asyncio.sleep_ms(100)
leds = [LED_async(n) for n in range(1, 4)]
for n, led in enumerate(leds):
led.flash(0.7 + n/4)
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())
В отличие от первого примера цикла событий логика, связанная с переключателем, находится в функции, отдельной от функциональности светодиода. Обратите внимание на код, используемый для запуска планировщика:
loop = asyncio.get_event_loop()
loop.run_until_complete(killer()) # Выполнение передается сопрограммам
# Здесь будет продолжение только после завершения killer (),
# когда планировщик остановился.
8.4 Планирование в uasyncio
Python 3.5 и MicroPython поддерживают понятие асинхронной функции, также известной как сопрограмма или задача. Сопрограмма должна включать хотя бы одно утверждение await.
async def hello():
for _ in range(10):
print('Hello world.')
await asyncio.sleep(1)
Эта функция печатает сообщение десять раз с интервалом в одну секунду. Пока функция приостановлена в ожидании задержки, планировщик asyncio будет исполнять другие задачи, создавая иллюзию их одновременного выполнения.
Когда сопрограмма выдает await asyncio.sleep_ms() или await asyncio.sleep() текущая задача приостанавливается и помещается в очередь, которая упорядочена по времени и выполнение переходит к задаче, находящейся вверху очереди. Очередь спроектирована таким образом, что, даже если указанный спящий режим равен нулю, другие соответствующие задачи будут выполняться до возобновления текущей. Это «честное круговое» планирование. Обычной практикой является выполнение циклов await asyncio.sleep(0), чтобы задача не задерживала выполнение. Далее показан цикл «занято-ожидание», ожидающий другой задачи для установки глобальной переменной flag. Увы, он монополизирует процессор, предотвращая запуск других сопрограмм:
async def bad_code():
global flag
while not flag:
pass # Плохо
flag = False
# дальнейший код не важен
Проблема здесь в том, что до тех пор, пока цикл flagis False управление не будет передано планировщику, поэтому никакая другая задача не будет запущена. Правильный подход:
async def good_code():
global flag
while not flag:
await asyncio.sleep(0) # Хорошо
flag = False
# дальнейший код не важен
По той же причине плохая практика задавать задержки, например, utime.sleep(1) потому что это блокирует другие задачи на 1 с; правильнее использовать await asyncio.sleep(1).
Обратите внимание, что задержки, формируемые методами uasyncio sleep и sleep_ms в реальности могут превышать указанное время. Это связано с тем, что во время задержки будут выполняться другие задачи. По истечении периода задержки выполнение не возобновится до тех пор, пока запущенная задача не выдаст await или не завершит работу. Хорошо себя ведущая сопрограмма всегда будет декларировать await через регулярные промежутки времени. Там, где требуется точная задержка, особенно если одна меньше нескольких мс, возможно необходимо использовать utime.sleep_us(us).
8.5 Почему совместное, а не потоковое планирование (_thread)?
Первоначальная реакция начинающих на идею совместного планирования сопрограмм часто вызывает разочарование. Наверняка потоковое планирование лучше? Почему я должен явно уступать контроль, если виртуальная машина Python может сделать это для меня?
Когда речь идет о встроенных системах, модель совместной работы имеет два преимущества.
Во-первых, это легкий вес. Возможно иметь большое количество сопрограмм, потому что в отличие от запланированных потоков, приостановленные сопрограммы занимают меньше места.
Во-вторых, это позволяет избежать некоторых тонких проблем, связанных с потоковым планированием.
На практике совместная многозадачность используется широко, особенно в приложениях с пользовательским интерфейсом.
В защиту модели с потоковым планированием покажу одно преимущество: если кто-то пишет
for x in range ( 1000000 ):
# сделать что-то трудоёмкое
это не заблокирует другие задачи. В модели совместной работы предполагается, что цикл должен явно давать контроль каждой задаче определенное количество итераций, например, помещая код в сопрограмму и периодически выпуская await asyncio.sleep(0).
Увы, это преимущество бледнеет по сравнению с недостатками. Некоторые из них описаны в документации по написанию обработчиков прерываний. В модели c потоковым планированием каждый поток может прерывать любой другой поток, изменяя данные, которые могут использоваться в других потоках. Как правило, гораздо проще найти и исправить блокировку, возникающую из-за ошибки, которая не дает результата, чем обнаружение иногда очень тонких и редко встречающихся ошибок, которые могут возникнуть в коде, написанном в рамках модели c потоковым планированием.
Проще говоря, если вы напишите сопрограмму MicroPython, вы можете быть уверены, что переменные не будут внезапно изменены другой сопрограммой: ваша сопрограмма имеет полный контроль, пока не выдаст await asyncio.sleep(0).
Имейте в виду, что обработчики прерываний являются преимущественными. Это относится как к аппаратным, так и к программным прерываниям, которые могут произойти в любой точке вашего кода.
Красноречивое обсуждение проблем потокового планирования можно найти здесь.
8.6 Взаимодействие
В нетривиальных приложениях сопрограммы должны взаимодействовать. При этом могут быть использованы обычные методы Python. К ним относится использование глобальных переменных или объявление сопрограмм в качестве методов объекта: они могут совместно использовать переменные экземпляра. В качестве альтернативы изменяемый объект может быть передан в качестве аргумента сопрограммы.
Модель c потоковым планированием требуют от специалистов, чтобы классы обеспечивали безопасную связь; в модели совместной работы это редко требуется.
8.7. Опрос (Polling)
Некоторые аппаратные устройства, такие как акселерометр Pyboard, не поддерживают прерывания, и поэтому должны опрашиваться (то есть периодически проверяться). Опрос также может использоваться в сочетании с обработчиками прерываний: обработчик прерываний обслуживает оборудование и устанавливает флаг. Сопрограмма опрашивает флаг — если он установлен, происходит обработка данных и флаг сбрасывается. Лучшим подходом является использование класса Event.
1. Вводная
Fuse — это проект, предоставляющий программисту функционал для написания драйвера для произвольной файловой системы. Fuse (в Linux) включает в себя модуль ядра, используемый различными программами для связи с драйвером ФС (который является, по существу, исполняемым файлом).
Стратегия разработки драйвера проста — необходимо реализовать некоторый набор функций с определенными прототипами, указатели на которых передать модулю ядра (с помощью вызова некоторой функции — т.н. fuse_main) — после чего при обращении к ФС будут вызываться именно те функции из драйвера, указатели на которые были переданы в fuse_main. Нетрудно догадаться, что диапазон применения fuse очень и очень широк — с ее помощью можно разработать самые «неожиданные» ФС
Плюсы fuse:
- Код драйвера ФС выполняется в user-space — сразу исчезают многие проблемы, возникающие при отладке модуля ядра
- У fuse есть много «привязок» к разным ЯП (кроме C, это еще и Python, например)
- Простые принципы разработки драйверов (простое API)
- Стабильность, надежность и защищенность
Подробности разработки драйверов ФС на C/C++ для fuse можно узнать в статье: «Разработка собственной файловой системы с помощью FUSE» (ibm.com Россия).
Однако в этой статье я расскажу, как написать драйвер, используя python.
2. Установка «привязок» к fuse для python
Все просто. Для начала получим исходный код «привязок»:
Проследуйте на страницу FUSEWiki — FusePython и выберите понравившийся вам способ загрузки.
Я использовал Mercurial:
hg clone http://mercurial.creo.hu/repos/fuse-python-hg
После загрузки перейдите в каталог с исходниками fuse-python и сделайте:
python setup.py build
su -c 'python setup.py install'
Вы получите модуль fuse, в котором содержаться вся необходимые классы.
3. Небольшой пример
Давайте напишем драйвер для «игрушечной» виртуальной ФС.
В начале — каркас драйвера:
- Первым делом мы импортируем нужные модули:
import os,stat
import fuse - Теперь необходимо сообщить fuse-python какую ее версию мы используем:
fuse.fuse_python_api = (0, 2)
- Описываем базовый для нашего драйвера класс (он будет наследовать класс fuse.Fuse):
class simpleFS(fuse.Fuse):
#функции
#TODO - Описываем функцию, которая будет «запускать» наш драйвер:
def runSimpleFS():
usage='Simple FS ' + fuse.Fuse.fusage
fs = simpleFS(version="%prog " + fuse.__version__,usage=usage,dash_s_do='setsingle')
fs.parse(errex=1)
fs.main()Как вы могли заметить, принципиально важными являются вызов конструктора fuse.Fuse (с правильным списком аргументов), вызов fs.parse() и вызов fs.main(). fs.main() это и есть как раз та самая fuse_main. Указатели на функцию в fuse-python подменяются функциями-членами_класса fuse.Fuse.
Важное замечание: после вызова fs.main() драйверу перестает быть доступной консоль, из которой, собственно, драйвер и был запущен. В частности, теряется связь с STDIN, STDOUT, STDERR — т.е. мы сможем догадаться об исключенях только по результатам работы драйвера.
- Вызываем runSimpleFS():
runSimpleFS()
Каркас готов.
Теперь давайте определимся с тем, что будет представлять из себя наша ФС:
- 2 каталога — ‘/’ и ‘/simple’ — оба с правами 0555 и root.root в качестве пользователя.группы_пользователя
- 1 файл — ‘/README’ (0444 root.root)
Реализация нашего класса:
- Конструктор. В нем принципиально важно вызвать конструктор базового класса со всеми переданными в конструктор simpleFS аргументами (а self.README — содержимое файла ‘/README’):
def __init__(self, *args, **kw):
fuse.Fuse.__init__(self, *args, **kw)
self.README = 'This is simple FSn' - Пользовательские действия с ФС (системные вызовы и прочее). На каждой такой функции в fuse.Fuse стоит «заглушка». Нам остается только переопределить нужные нам функции:
# getattr вызывается при получении информации об объекте ФС. Например, при использовании команды ls
def getattr(self, path):
# В объекте fuse.Stat() вернем интересующую информацию
st = fuse.Stat()
# "Режим" - права доступа, тип объекта
st.st_mode = 0
# Номер inode
st.st_ino = 0
st.st_dev = 0
# Количество ссылок на объект
st.st_nlink = 0
# ID владельца объекта
st.st_uid = 0
# ID группы владельца объекта
st.st_gid = 0
# Размер объекта
st.st_size = 0
# Временные штампы
st.st_atime = 0
st.st_mtime = 0
st.st_ctime = 0
if path == '/' or path == '/simple':
# Каталоги
st.st_mode = stat.S_IFDIR | 0755
st.st_nlink = 3
elif path == '/README':
# Файлы
st.st_mode = stat.S_IFREG | 0444
st.st_nlink = 1
st.st_size = len(self.README)
else:
# path не существует
return -errno.ENOENT
return st# readdir вызывается при попытке просмотра содержимого каталога. Например, при использовании ls
def readdir(self, path, offset):
# В каждом каталоге есть '.' и '..'
yield fuse.Direntry('.')
yield fuse.Direntry('..')
if path == '/':
# Кроме того, в '/' есть еще и 'README' и 'simple'
yield fuse.Direntry('README')
yield fuse.Direntry('simple')# open вызывается при попытки открыть файл. Мы должны проверить флаги доступа - наш единственный файл '/README' доступен только на чтение
def open(self, path, flags):
if path != '/README':
return -errno.ENOENT
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
if (flags & accmode) != os.O_RDONLY:
# Ошибка доступа
return -errno.EACCES# read вызывается при попытки прочитать данные из файла
# offset - смещение в читаемом файле
# size - размер считываемого ("запрощенного") блока
# read возвращает считанные символыdef read(self, path, size, offset):
if path != '/README':
return -errno.ENOENT
slen = len(self.README)
if offset < slen:
if offset + size > slen:
size = slen - offset
buf = self.README[offset:offset+size]
else:
buf = ''
return buf# statfs вызывается в ответ на запрос информации о ФС
def statfs(self):
# Вернем информацию в объекте класса fuse.StatVfs
st = fuse.StatVfs()
# Размер блока
st.f_bsize = 1024
st.f_frsize = 1024
st.f_bfree = 0
st.f_bavail = 0
# Количество файлов
st.f_files = 2
# Количество блоков
# Если f_blocks == 0, то 'df' не включит ФС в свой список - понадобится сделать 'df -a'
st.f_blocks = 4
st.f_ffree = 0
st.f_favail = 0
st.f_namelen = 255
return st
Вот и все. Драйвер готов (исходный код драйвера лежит здесь).
Проверка:
mkdir smpfs
python simplefs.py smpfs
ls -la smpfs/
ls -la smpfs/simple
cat smpfs/README
df
su -c 'umount smpfs'
Заметьте, что первым параметром в скрипт передается точка монтирования (каталог ‘smpfs’ в нашем случае).
В случае, если этого примера вам показалось мало — посмотрите в каталог ‘example’ дистрибутива fuse-python. Там есть кое-что интересное.
4. Почитать
- В первую очередь: Статья о fuse в Википедии
- Обязательно:
pydoc fuse
- Статья о fuse на ibm.com Россия
- Сайт fuse (там встречаются нерабочие примеры)
5. BashOrgFS
Здесь лежит моя поделка на заданную тему. bash.org.ru представлен в виде файловой системы.
Монтировать так:
./bashorg ТОЧКА_МОНТИРОВАНИЯ
.
Заранее — это просто пример. Используйте на свой страх и риск. Предварительно читайте README
6. За сим все. Успехов!
7 ответы
да. Вы не можете создавать «классические» драйверы режима ядра. Однако, начиная с XP, Windows предлагает Структура драйвера пользовательского режима. Очевидно, что они не могут делать все — любой драйвер, используемый при загрузке ОС, очевидно, должен быть в режиме ядра. Но с UMDF вам нужно только реализовать COM-компоненты.
Помимо драйверов времени загрузки, вы также не можете писать драйверы UMDF, которые:
- Обработка прерываний
- Прямой доступ к оборудованию, например прямой доступ к памяти (DMA)
- иметь строгие временные петли
- Использовать невыгружаемый пул или другие ресурсы, зарезервированные для режима ядра.
Создан 11 июн.
Окончательный ответ не возможен без встраивания интерпретатора в ваш драйвер C/assembly. Если у кого-то нет фреймворка, то ответ — нет. Когда у вас есть интерпретатор и привязки, остальная логика может быть выполнена на Python.
Однако написание драйверов — одна из тех задач, для которых С лучше всего подходит. Я предполагаю, что получившийся код Python будет очень похож на код C и перечеркнет смысл накладных расходов интерпретатора.
Создан 11 июн.
Хороший способ понять, почему это практически невозможно, — прочитать Совет Microsoft об использовании C++ в драйверах. Как производная от C, использование C++ кажется простым. На практике не так.
Например, вы должны решить для каждой функции (и действительно для каждой ассемблерной инструкции), находится ли она в страничной или нестраничной памяти. Это требует расширений для C, тщательного использования новых возможностей C++ или, в данном случае, специального расширения для языка Python и виртуальной машины. Кроме того, ваша виртуальная машина, совместимая с драйверами, также должна иметь дело с различными IRQL — существует иерархия «уровней», которые ограничивают то, что вы можете и что не можете делать.
Создан 11 июн.
Python работает на виртуальной машине, так что нет.
НО:
Вы можете написать компилятор, который переводит код Python на машинный язык. Как только вы это сделали, вы можете это сделать.
Создан 11 июн.
Я не знаю ограничений драйверов для Windows (схемы распределения памяти, динамическая загрузка библиотек и все такое), но вы можете встроить интерпретатор Python в свой драйвер, после чего вы можете делать все, что хотите. Не то, чтобы я думаю, что это хорошая идея
Создан 11 июн.
Никогда не говори никогда, но э… нет.
Возможно, вы сможете взломать что-нибудь вместе, чтобы запускать части драйверов пользовательского режима в python. Но что-то в режиме ядра можно делать только на C или ассемблере.
Создан 11 июн.
Нет, они не могут. Драйверы Windows должны быть написаны на языке,
- Интерфейс с API на основе C
- Компилировать в машинный код
Опять же, ничто не мешает вам написать компилятор, который переводит python в машинный код;)
Создан 11 июн.
Не тот ответ, который вы ищете? Просмотрите другие вопросы с метками
python
windows
drivers
or задайте свой вопрос.
Морфемный разбор слова:
Однокоренные слова к слову:
Selenium для Python. Глава 2. Первые Шаги
Продолжение перевода неофициальной документации Selenium для Python.
Перевод сделан с разрешения автора Baiju Muthukadan.
Оригинал можно найти здесь.
Содержание:
2. Первые шаги
2.1. Простое использование
Если вы установили привязку Selenium к Python, вы можете начать использовать ее с помощью интерпретатора Python.
Код выше может быть сохранен в файл (к примеру, python_org_search.py), и запущен:
Запускаемый вами Python должен содержать установленный модуль selenium.
2.2. Пошаговый разбор примера
Модуль selenium.webdriver предоставляет весь функционал WebDriver’а. На данный момент WebDriver поддерживает реализации Firefox, Chrome, Ie и Remote. Класс Keys обеспечивает взаимодействие с командами клавиатуры, такими как RETURN, F1, ALT и т.д…
Далее создается элемент класса Firefox WebDriver.
Метод driver.get перенаправляет к странице URL в параметре. WebDriver будет ждать пока страница не загрузится полностью (то есть, событие “onload” игнорируется), прежде чем передать контроль вашему тесту или скрипту. Стоит отметить, что если страница использует много AJAX-кода при загрузке, то WebDriver может не распознать, загрузилась ли она полностью:
Следующая строка — это утверждение (англ. assertion), что заголовок содержит слово “Python” [assert позволяет проверять предположения о значениях произвольных данных в произвольном месте программы. По своей сути assert напоминает констатацию факта, расположенную посреди кода программы. В случаях, когда произнесенное утверждение не верно, assert возбуждает исключение. Такое поведение позволяет контролировать выполнение программы в строго определенном русле. Отличие assert от условий заключается в том, что программа с assert не приемлет иного хода событий, считая дальнейшее выполнение программы или функции бессмысленным — Прим. пер.]:
WebDriver предоставляет ряд способов получения элементов с помощью методов find_element_by_*. Для примера, элемент ввода текста input может быть найден по его атрибуту name методом find_element_by_name. Подробное описание методов поиска элементов можно найти в главе Поиск Элементов:
После этого мы посылаем нажатия клавиш (аналогично введению клавиш с клавиатуры). Специальные команды могут быть переданы с помощью класса Keys импортированного из selenium.webdriver.common.keys:
После ответа страницы, вы получите результат, если таковой ожидается. Чтобы удостовериться, что мы получили какой-либо результат, добавим утверждение:
В завершение, окно браузера закрывается. Вы можете также вызывать метод quit вместо close. Метод quit закроет браузер полностью, в то время как close закроет одну вкладку. Однако, в случае, когда открыта только одна вкладка, по умолчанию большинство браузеров закрывается полностью:
2.3. Использование Selenium для написания тестов
Selenium чаще всего используется для написания тестовых ситуаций. Сам пакет selenium не предоставляет никаких тестовых утилит или инструментов разработки. Вы можете писать тесты с помощью модуля Python unittest. Другим вашим выбором в качестве тестовых утилит/инструментов разработки могут стать py.test и nose.
В этой главе, в качестве выбранной утилиты будет использоваться unittest. Ниже приводится видоизмененный пример с использованием этого модуля. Данный скрипт тестирует функциональность поиска на сайте python.org:
Вы можете запустить тест выше из командной строки следующей командой:
Результат сверху показывает, что тест завершился успешно.
2.4. Пошаговый разбор примера
Сначала были импортированы все основные необходимые модули. Модуль unittest встроен в Python и реализован на Java’s JUnit. Этот модуль предоставляет собой утилиту для организации тестов.
Модуль selenium.webdriver предоставляет весь функционал WebDriver’а. На данный момент WebDriver поддерживает реализации Firefox, Chrome, Ie и Remote. Класс Keys обеспечивает взаимодействие с командами клавиатуры, такими как RETURN, F1, ALT и т.д…
Класс теста унаследован от unittest.TestCase. Наследование класса TestCase является способом сообщения модулю unittest, что это тест:
setUp — это часть инициализации, этот метод будет вызываться перед каждым методом теста, который вы собираетесь написать внутри класса теста. Здесь мы создаем элемент класса Firefox WebDriver.
Далее описан метод нашего теста. Метод теста всегда должен начинаться с фразы test. Первая строка метода создает локальную ссылку на объект драйвера, созданный методом setUp.
Метод driver.get перенаправляет к странице URL в параметре. WebDriver будет ждать пока страница не загрузится полностью (то есть, событие “onload” игнорируется), прежде чем передать контроль вашему тесту или скрипту. Стоит отметить, что если страница использует много AJAX-кода при загрузке, то WebDriver может не распознать, загрузилась ли она полностью:
Следующая строка — это утверждение, что заголовок содержит слово “Python”:
WebDriver предоставляет ряд способов получения элементов с помощью методов find_element_by_*. Для примера, элемент ввода текста input может быть найден по его атрибуту name методом find_element_by_name. Подробное описание методов поиска элементов можно найти в главе Поиск Элементов:
После этого мы посылаем нажатия клавиш (аналогично введению клавиш с клавиатуры). Специальные команды могут быть переданы с помощью класса Keys импортированного из selenium.webdriver.common.keys:
После ответа страницы, вы получите результат, если таковой ожидается. Чтобы удостовериться, что мы получили какой-либо результат, добавим утверждение:
Метод tearDown будет вызван после каждого метода теста. Это метод для действий чистки. В текущем методе реализовано закрытие окна браузера. Вы можете также вызывать метод quit вместо close. Метод quit закроет браузер полностью, в то время как close закроет одну вкладку. Однако, в случае, когда открыта только одна вкладка, по умолчанию большинство браузеров закрывается полностью.:
Завершающий код — это стандартная вставка кода для запуска набора тестов [Сравнение __name__ с «__main__» означает, что модуль (файл программы) запущен как отдельная программа («main» (англ.) — «основная», «главная») (а не импортирован из другого модуля). Если вы импортируете модуль, атрибут модуля __name__ будет равен имени файла без каталога и расширения — Прим. пер.]:
2.5. Использование Selenium с remote WebDriver
Строка выше сообщает о том, что вы можете использовать указанный URL для подключения remote WebDriver. Ниже приводится несколько примеров:
Переменная desired_capabilities — это словарь. Вместо того, чтобы использовать словари по умолчанию, вы можете явно прописать значения:
Источник
Содержание статьи
С чего все началось?
Все началось с кампании на Kickstarter. Дэмьен Джордж (Damien George), разработчик из Англии, спроектировал микроконтроллерную плату, предназначенную специально для Python. И кампания «выстрелила». Изначально была заявлена сумма в 15 тысяч фунтов стерлингов, но в результате было собрано в шесть с половиной раз больше — 97 803 фунта стерлингов.
А чем эта плата лучше?
Автор проекта приводил целый ряд преимуществ своей платформы в сравнении с Raspberry Pi и Arduino:
Мощность — MP мощнее в сравнении с микроконтроллером Arduino, здесь используются 32-разрядные ARM-процессоры типа STM32F405 (168 МГц Cortex-M4, 1 Мбайт флеш-памяти, 192 Кбайт ОЗУ).
Простота в использовании — язык MicroPython основан на Python, но несколько упрощен, для того чтобы команды по управлению датчиками и моторами можно было писать буквально в несколько строк.
Отсутствие компилятора — чтобы запустить программу на платформе MicroPython, нет необходимости устанавливать на компьютер дополнительное ПО. Плата определяется ПК как обычный USB-накопитель — стоит закинуть на него текстовый файл с кодом и перезагрузить, программа тут же начнет исполняться. Для удобства все-таки можно установить на ПК эмулятор терминала, который дает возможность вписывать элементы кода с компьютера непосредственно на платформу. Если использовать его, тебе даже не придется перезагружать плату для проверки программы, каждая строка будет тут же исполняться микроконтроллером.
Низкая стоимость — в сравнении с Raspberry Pi платформа PyBoard несколько дешевле и, как следствие, доступнее.
И что, только официальная плата?
Нет. При всех своих достоинствах PyBoard (так называется плата от разработчика MicroPython) — довольно дорогое удовольствие. Но благодаря открытой платформе на многих популярных платах уже можно запустить MicroPython, собранный специально для нее. В данный момент существуют версии:
Подготовка к работе
Перед тем как писать программы, нужно настроить плату, установить на нее прошивку, а также установить на компьютер необходимые программы.
Все примеры проверялись и тестировались на следующем оборудовании:
Прошивка контроллера
Для прошивки платы нам понадобится Python. Точнее, даже не он сам, а утилита esptool, распространяемая с помощью pip. Если у тебя установлен Python (неважно, какой версии), открой терминал (командную строку) и набери:
После установки esptool надо сделать две вещи. Первое — скачать с официального сайта версию прошивки для ESP8266. И второе — определить адрес платы при подключении к компьютеру. Самый простой способ — подключиться к компьютеру, открыть Arduino IDE и посмотреть адрес в списке портов.
Открываем терминал (командную строку) и переходим на рабочий стол:
Форматируем флеш-память платы:
Если при форматировании возникли ошибки, значит, нужно включить режим прошивки вручную. Зажимаем на плате кнопки reset и flash. Затем отпускаем reset и, не отпуская flash, пытаемся отформатироваться еще раз.
И загружаем прошивку на плату:
Взаимодействие с платой
Все взаимодействие с платой может происходить несколькими способами:
При подключении через Serial-порт пользователь в своем терминале (в своей командной строке) видит практически обычный интерпретатор Python.
Подключение через SerialPort
Для подключения по Serial есть разные программы. Для Windows можно использовать PuTTY или TeraTerm. Для Linux — picocom или minicom. В качестве кросс-платформенного решения можно использовать монитор порта Arduino IDE. Главное — правильно определить порт и указать скорость передачи данных 115200.
Кроме этого, уже создано и выложено на GitHub несколько программ, облегчающих разработку, например EsPy. Кроме Serial-порта, он включает в себя редактор Python-файлов с подсветкой синтаксиса, а также файловый менеджер, позволяющий скачивать и загружать файлы на ESP.
Но все перечисленные способы хороши лишь тогда, когда у нас есть возможность напрямую подключиться к устройству с помощью кабеля. Но плата может быть интегрирована в какое-либо устройство, и разбирать его только для того, чтобы обновить программу, как-то неоптимально. Наверное, именно для таких случаев и был создан WebREPL. Это способ взаимодействия с платой через браузер с любого устройства, находящегося в той же локальной сети, если у платы нет статического IP, и с любого компьютера, если такой IP присутствует. Давай настроим WebREPL. Для этого необходимо, подключившись к плате, набрать
Появится сообщение о статусе автозапуска WebREPL и вопрос, включить или выключить его автозапуск.
После ввода q появляется сообщение о выставлении пароля доступа:
Вводим его, а затем подтверждаем. Теперь после перезагрузки мы сможем подключиться к плате по Wi-Fi.
Так как мы не настроили подключение платы к Wi-Fi-сети, она работает в качестве точки доступа. Имя Wi-Fi-сeти — MicroPython-******, где звездочками я заменил часть MAC-адреса. Подключаемся к ней (пароль — micropythoN ).
Открываем WebREPL и нажимаем на Connect. После ввода пароля мы попадаем в тот же интерфейс, что и при прямом подключении. Кроме этого, в WebREPL есть интерфейс для загрузки файлов на плату и скачивания файлов на компьютер.
Среди файлов, загруженных на плату, есть стандартные:
Начинаем разработку
Hello world
Принято, что первой написанной на новом языке программирования должна быть программа, выводящая Hello world. Не будем отходить от традиции и выведем это сообщение с помощью азбуки Морзе.
Итак, что же происходит? Сначала подключаются библиотеки: стандартная Python-библиотека time и специализированная machine. Эта библиотека отвечает за взаимодействие с GPIO. Стандартный встроенный светодиод располагается на втором пине. Подключаем его и указываем, что он работает на выход. Если бы у нас был подключен какой-нибудь датчик, то мы бы указали режим работы IN.
Следующие две функции отвечают за включение и выключение светодиода на определенный интервал времени. Наверное, интересно, почему я сначала выключаю светодиод, а потом включаю? Мне тоже очень интересно, почему сигнал для данного светодиода инвертирован. Оставим это на совести китайских сборщиков. На самом деле команда pin.off() включает светодиод, а pin.on() — отключает.
Ну а дальше все просто: заносим в переменную Hello_world нашу строчку, записанную кодом Морзе, и, пробегаясь по ней, вызываем ту или иную функцию.
Продолжение доступно только участникам
Вариант 1. Присоединись к сообществу «Xakep.ru», чтобы читать все материалы на сайте
Членство в сообществе в течение указанного срока откроет тебе доступ ко ВСЕМ материалам «Хакера», позволит скачивать выпуски в PDF, отключит рекламу на сайте и увеличит личную накопительную скидку! Подробнее
Источник
Могут ли драйверы Windows быть написаны на Python?
могут ли драйверы Windows быть написаны на Python?
7 ответов
помимо драйверов времени загрузки, вы также не можете писать драйверы UMDF, которые:
окончательный ответ не без встраивания интерпретатора в ваш в противном случае драйвер c/assembly. Если у кого-то есть рамки, то ответ нет. Как только у вас есть интерпретатор и привязки на месте, остальная часть логики может быть выполнена в Python.
однако написание драйверов является одной из вещей, для которых C лучше всего подходит. Я предполагаю, что полученный код Python будет выглядеть очень похоже на код C и победит цель интерпретатора накладные расходы.
например, вы должны решить для каждой функции (и действительно для каждой инструкции по сборке), находится ли она в подкачиваемой или не подкачиваемой памяти. Это требует расширения до C, тщательного использования новых функций C++ или в этом случае специального расширения для язык Python и виртуальная машина. Кроме того, ваша совместимая с драйверами виртуальная машина также должна иметь дело с различными IRQLs-существует иерархия «уровней», которые ограничивают то, что вы можете и не можете сделать.
Python работает на виртуальной машине, поэтому нет.
вы можете написать компилятор, который переводит код Python на машинный язык. Как только вы это сделаете, вы сможете это сделать.
Я не знаю ограничений на драйверы в windows (схемы выделения памяти, динамическая загрузка библиотек и все), но вы можете встроить интерпретатор python в свой драйвер, и в этот момент Вы можете делать все, что захотите. Не думаю, что это хорошая идея 🙂
никогда не говори никогда, Но да.. нет!—1—>
вы можете взломать что-то вместе, чтобы запустить пользовательские части драйверов в python. Но материал в режиме ядра может быть выполнен только в C или сборке.
нет они не могут. Драйверы Windows должны быть написаны на языке, который может
опять же, ничто не мешает вам написать компилятор, который переводит python в машинный код;)
Источник
Программирование с PyUSB 1.0
От переводчика:
Это перевод руководства Programming with PyUSB 1.0
Данное руководство написано силами разработчиков PyUSB, однако быстро пробежавшись по коммитам я полагаю, что основной автор руководства — walac.
Позвольте мне представиться
PyUSB 1.0 — это библиотека Python обеспечивающая легкий доступ к USB. PyUSB предоставляет различные функции:
Довольно разговоров, давайте писать код!
Кто есть кто
Для начала, давайте дадим описание модулям PyUSB. Все модули PyUSB находятся под пекетом usb, с последующими модулями:
Модуль | Описание |
---|---|
core | Основной модуль USB. |
util | Вспомогательные функции. |
control | Стандартные запросы управления. |
legacy | Слой совместимости с версиями 0.x. |
backend | Субпакет содержащий встроенные бэкенды. |
К примеру, чтобы импортировать модуль core, введите следующее:
Ну что ж начнём
Далее следует простенькая программа, которая посылает строку ‘test’ в первый найденный источник данных (endpoint OUT):
Первые две строки импортируют модули пакета PyUSB. usb.core — основной модуль, а usb.util содержит вспомогательные функции. Следующая команда ищет наше устройство и возвращает экземпляр объекта, если находит. Если нет, возвращается None. Далее, мы устанавливаем конфигурацию, которую будем использовать. Заметьте: отсутствие аргументов означает, что нужная конфигурация была проставлена по-умолчанию. Как Вы увидите, во многих функциях PyUSB есть настройки по-умолчанию для большинства распространенных устройств. В этом случае, ставится первая найденная конфигурация.
Затем, мы ищем конечную точку в которой заинтересованы. Мы ищем ее внутри первого интерфейса, который у нас есть. После того как нашли эту точку мы посылаем в неё данные.
Если нам заранее известен адрес конечной точки, мы можем просто вызвать функцию write объекта device:
Здесь мы пишем строку ‘test’ в контрольную точку под адресом 1. Все эти функции будут разобраны лучше в последующих разделах.
Что не так?
Каждая функция в PyUSB вызывает исключение в случае ошибки. Помимо стандартных исключений Python, PyUSB определяет usb.core.USBError для ошибок связанных с USB.
Вы также можете использовать функции лога PyUSB. Он использует модуль logging. Для его использования определите переменную окружения PYUSB_DEBUG с одним из следующих уровней логирования: critical, error, warning, info или debug.
По-умолчанию сообщения посылаются в sys.stderr. Если хотите, Вы можете перенаправить сообщения лога в файл определив переменную окружения PYUSB_LOG_FILENAME. Если её значение — корректный путь к файлу, сообщения будут записываться туда, иначе они будут посылаться в sys.stderr.
Где ты?
Функция find() в модуле core используется чтобы найти и пронумеровать устройства присоединенные к системе. К примеру, скажем что у нашего устройства есть vendor ID со значением 0xfffe и product ID равный 0x0001. Если нам нужно найти это устройство мы сделаем так:
Вот и всё, функция возвратит объект usb.core.Device, который представляет наше устройство. Если устройство не найдено оно возвратит None. На самом деле Вы можете использовать любое поле класса Device Descriptor, которое хотите. К примеру, что если мы захотим узнать есть ли USB-принтер подключенный к системе? Это очень легко:
7 — это код для класса принтеров в соответствии со спецификацией USB. О, постойте, что если я хочу пронумеровать все имеющиеся принтеры? Без проблем:
Что случилось? Что ж, время для небольшого объяснения… у find есть параметр, который называется find_all и по-умолчанию имеет значение False. Когда он имеет ложное значение [1], find будет возвращать первое устройство, которое подходит под указанные критерии (скоро об этом поговорим). Если Вы передадите параметру истинное значение, find вместо этого возвратит список из всех устройств подходящих по критериям. Вот и всё! Просто, не правда ли?
Мы закончили? Нет! Я ещё не всё рассказал: многие устройства на самом деле ставят свою информацию о классе в Interface Descriptor вместо Device Descriptor. Так что, чтобы по-настоящему найти все принтеры подключенные к системе, нам нужно будет перебрать все конфигурации, а также все интерфейсы и проверить — выставлено ли у одного из интерфейсов в bInterfaceClass значение 7. Если Вы программист как и я Вы можете задаться вопросом: есть ли путь полегче с помощью которого это можно реализовать. Ответ: да, он есть. Для начала давайте посмотрим на готовый код нахождения всех подключенных принтеров:
Параметр custom_match принимает любой вызываемый объект, который получает объект устройства. Он должен возвращать истинное значение для подходящего устройства и ложное — для неподходящего. Вы также можете скомбинировать custom_match с полями устройства, если захотите:
Здесь нас интересуют принтеры поставщика 0xfffe.
Опиши себя
Ок, мы нашли наше устройство, но перед тем как с ним взаимодействовать, нам хотелось бы узнать больше о нём. Ну Вы знаете, конфигурации, интерфейсы, конечные точки, типы потоков данных…
Если у Вас есть устройство, Вы можете получить доступ к любому полю дескриптора устройства как к свойствам объекта:
Для доступа к имеющимся конфигурациям в устройстве, Вы можете итерировать устройство:
Таким же образом Вы можете итерировать конфигурацию для доступа к интерфейсам, а также итерировать интерфейсы для доступа к их контрольным точкам. У каждого типа объекта есть поля соответствующего дескриптора как атрибуты. Взглянем на пример:
Вы также можете использовать индексы для произвольного доступа к дескрипторам, как здесь:
Как вы можете увидеть индексы отсчитываются с 0. Но постойте! Есть что-то странное в том, как я получаю доступ к интерфейсу… Да, Вы правы, индекс для Configuration принимает ряд из двух значений, из которых первый — это индекс Interface’а, а второй — альтернативная настройка. В общем, чтобы получить доступ к первому интерфейсу, но со второй настройкой, мы напишем cfg[(0,1)].
Теперь время научиться мощному способу поиска дескрипторов — полезной функции find_descriptor. Мы уже видели её в примере поиска принтеров. find_descriptor работает практически также как и find, с двумя исключениями:
Заметьте, что find_descriptor находится в модуле usb.util. Он также принимает описанный ранее параметр custom_match.
Имеем дело с множественными идентичными устройствами
Иногда у Вас может быть два идентичных устройства подсоединенных к компьютеру. Как Вы можете различать их? Объекты Device идут с двумя дополнительными атрибутами, которые не являются частью спецификации USB, но очень полезны: атрибуты bus и address. Прежде всего, стоит сказать, что эти атрибуты идут от бэкенда, а бэкенд может и не поддерживать их — в этом случае они выставлены на None. Тем не менее эти атрибуты представляют номер и адрес шины устройства и, как Вы могли уже догадаться, могут быть использованы для того, чтобы различать два устройства с одинаковыми значениями атрибутов idVendor и idProduct.
Как я должен работать?
Устройства USB после подсоединения должны конфигурироваться с помощью нескольких стандартных запросов. Когда я начал изучать спецификацию USB, я был обескуражен дексрипторами, конфигурациями, интерфейсами, альтернативными настройками, типами передачи и всем этим… И что самое худшее — Вы не можете просто игнорировать их: устройство не работает без установки конфигурации, даже если оно одно! PyUSB пытается сделать Вашу жизнь настолько проще, насколько это возможно. К примеру, после получения Вашего объекта устройства, первым делом, перед тем как взаимодействовать с ним, нужно отправить запрос set_configuration. Параметр конфигурации для этого запроса, который Вас интересует — bConfigurationValue. У большинства устройств есть не более одной конфигурации, а отслеживание значения конфигурации для использования раздражает (хотя большинство кода, который я видел просто жестко кодировали это). Следовательно, в PyUSB, Вы можете просто отправить запрос set_configuration без аргументов. В этом случае он установит первую найденную конфигурацию (если у Вашего устройства она всего одна, Вам вообще не надо беспокоиться о значении конфигурации). К примеру, представим, что у Вас устройство с одним декриптором конфигурации, а его поле bConfigurationValue равно 5 [3], последующие запросы будут работать одинаково:
Вау! Вы можете использовать объект Configuration как параметр для set_configuration! Да, также у него есть метод set для конфигурации самого себя в текущую конфигурацию.
Другая опция, которую Вам нужно или не нужно будет настроить — опция смены интерфейсов. Каждое устройство может иметь только одну активированную конфигурацию в один момент, и у каждой конфигурации может быть больше чем один интерфейс, а Вы можете использовать все интерфейсы в одно и то же время. Вам лучше понять эту концепцию, если Вы думаете о интерфейсе как о логическом устройстве. К примеру, давайте представим многофункциональный принтер, который в одно и то же время и принтер, и сканер. Чтобы не усложнять (или по крайней мере делать настолько просто насколько возможно), давайте будем считать, что у него есть всего одна конфигурация. Т.к. у нас есть принтер и сканер у конфигурации есть 2 интерфейса: один для принтера и один для сканера. Устройство с более чем одним интерфейсом называется композитным устройством. Когда Вы подключаете Ваш многофункциональный принтер к Вашему компьютеру, Операционная Система загрузит два разных драйвера: один для каждого «логического» периферического устройства, которое у Вас есть [4].
Что насчёт альтернытивных настроек интерфейса? Хорошо, что Вы спросили. У интерфейса есть один или более альтернытивных настроек. Интерфейс у которого только одна альтернативная настройка рассматривается как не имеющий альтернативных настроек [5]. Альтернативные настройки для интерфейсов как конфигурации для устройств, то есть на каждый интерфейс у Вас может быть только одна активная альтернативная настройка. К примеру, спецификация USB говорит о том, что у устройства не может быть изохронной контрольной точки в его основной альтернативной настройке [6], так что потоковое устройство должно иметь как минимум две альтернативные настройки, со второй настройкой, имеющей изохронную контрольную точку. Но, в отличии от конфигураций, интерфейсы только с одной альтернативной настройкой не нуждается в настройке [7]. Вы выбираете альтернативную настройку интерфейса с помощью функции set_interface_altsetting:
Спецификация USB говорит, что устройству позволяется возвращать ошибку в случае, если оно получает запрос SET_INTERFACE к интерфейсу у которого нет дополнительных альтернативных настроек. Так что, если Вы не уверены в том, что интерфейс имеет более одной альтернативной настройки или в том, что он принимает запрос SET_INTERFACE, наиболее безопасным методом будет вызвать set_interface_altsetting внутри блока try-except, как здесь:
Вы также можете использовать объект Interface как параметр функции, параметры interface и alternate_setting автоматически наследуются от полей bInterfaceNumber и bAlternateSetting. Пример:
Объект Interface должен принадлежать активному дескриптору конфигурации.
Поговори со мной, милая
А теперь для нас настало время понять как взаимодействовать c USB устройствами. У USB есть четыре типа потоков данных: массовый (bulk transfer), прерывающийся (interrupt transfer), изохронный (isochronous transfer) и управляющий (control transfer). Я не планирую объяснять назначение каждого потока и различия между ними. Поэтому я предполагаю, что у Вас есть по крайней мере базовые знания о потоках данных USB.
Управляющий поток данных — единственный поток, структура которого описана в спецификации, остальные просто отправляют и получают необработанные данные с точки зрения USB. Поэтому у Вас есть различные функции для работы с управляющими потоками, а остальные потоки обрабатываются одними и теми же функциями.
Вы можете обратиться к управляющему потоку данных посредством метода ctrl_transfer. Он используется как для исходящих (OUT) так и для входящих (IN) потоков. Направление потока определяет параметр bmRequestType.
Параметры ctrl_transfer практически совпадают со структурой управляющего запроса. Далее следует пример того, как организовывать управляющий поток данных [8]:
В этом примере предполагается, что наше устройство включает в себя два пользовательских управляющих запроса, которые действуют как loopback pipe. То, что Вы пишете с сообщением CTRL_LOOPBACK_WRITE, Вы можете прочитать с сообщением CTRL_LOOPBACK_READ.
Первые четыре параметра — bmRequestType, bmRequest, wValue и wIndex — поля стандартной структуры управляющего потока. Пятый параметр — это либо пересылаемые данные для исходящего потока данных или кол-во считываемых данных во входящем потоке. Пересылаемые данные могут быть любым типом последовательности, которая может быть подана в качестве параметра на вход метода __init__ для массива. Если нет пересылаемых данных параметр должен иметь значение None (или 0 в случае входящего потока данных). Есть ещё один опциональный параметр указывающий таймаут операции. Если Вы не передаете его, будет использоваться таймаут по-умолчанию (больше об этом дальше). В исходящем потоке данных возвращаемое значение — это количество байтов, реально посылаемое устройству. Во входящем потоке возвращаемое значение — массива со считанными данными.
Для других потоков Вы можете использовать методы write и read, соответственно, чтобы записывать и считывать данные. Вам не нужно беспокоиться о типе потока — он автоматически определяется по адресу контрольной точки. Вот наш пример loopback при условии, что у нас есть loopback pipe в контрольной точке 1:
Первый и третий параметры одинаковы для обоих методов — это адрес контрольной точки и таймаут, соответственно. Второй параметр — пересылаемые данные (write) или количество байтов для считывания (read). Возвращенными данными будут либо экземпляр объекта массива для метода read, либо количество записанных байтов для метода write.
С бета 2 версии вместо количества байтов, Вы можете передать для read или ctrl_transfer объект массива, в который данные будут считываться. В этом случае, количество байтов для считывания будет длиной массива умноженной на значение array.itemsize.
В ctrl_transfer, параметр timeout опционален. Когда timeout опущено, используется свойство Device.default_timeout как операционный таймаут.
Контролируй себя
Кроме функций потоков данных модуль usb.control предоставляет функции, которые включают в себя стандартные управляющие запросы USB, а в модуле usb.util есть удобная функция get_string специально выводящая дескрипторы строк.
Дополнительные темы
За каждой великой абстракцией стоит великая реализация
Раньше был только libusb. Потом пришел libusb 1.0 и у нас были libusb 0.1 и 1.0. После этого мы создали OpenUSB и сейчас мы живем в Вавилонской Башне USB-библиотек [9]. Как PyUSB справляется с этим? Что ж, PyUSB — демократичная библиотека, Вы можете выбрать какую хотите бибилиотеку. На самом деле Вы можете написать вашу собственную библиотеку USB с нуля и сказать PyUSB использовать её.
Функция find имеет ещё один параметр, о котором я Вам не рассказал. Это параметр backend. Если Вы его не передаете — будет использоваться один из встроенных бэкендов. Бэкенд — это объект унаследованный от usb.backend.IBackend, ответственный за введение специфического для операционной системы хлама USB. Как Вы могли догадаться, встроенные libusb 0.1, libusb 1.0 и OpenUSB — бэкенды.
Вы можете написать свой собственный бэкенд и использовать его. Просто наследуйте от IBackend и включите необходимые методы. Вам может понадобиться посмотреть в документацию usb.backend, чтобы понять как это делается.
Не будьте эгоистичны
У Python есть то, что мы называем автоматическое управление памятью. Это значит, что виртуальная машина будет решать когда выгрузить объекты из памяти. Под капотом PyUSB управляет всеми низко-уровневыми ресурсами, с которыми необходимо работать (утверждение интерфейса, регулировки устройства, и т.д.) и большинству пользователей не нужно беспокоиться об этом. Но, из-за непредопределенной природы автоматического уничтожения объектов Python’ом, пользователи не могут предсказать когда выделенные ресурсы будут освобождены. Некоторые приложения нуждаются в том, чтобы выделить и освободить ресурсы детерминировано. Для таких приложений модуль usb.util предоставляет функции для взаимодействия с управлением ресурсами.
Если Вы хотите запрашивать и освобождать интерфейсы вручную, Вы можете использовать функции claim_interface и release_interface. Функция claim_interface будет запрашивать указанный интерфейс, если устройство до сих пор этого не сделало. Если устройство уже запросило интерфейс, она ничего не делает. Так же release_interface будет освобождать указанный интерфейс, если он запрошен. Если интерфейс не запрошен, она ничего не делает. Вы можете использовать ручное запрашивание интерфейсов, чтобы решить описанную в документации libusb проблему выбора конфигурации.
Если Вы хотите освободить все ресурсы выделенные объектом устройства (включая запрошенные интерфейсы), Вы можете использовать функцию dispose_resources. Она освобождает все выделенные ресурсы и переводит объект устройства (но не в аппаратные средства устройства самого по себе) в то состояние, в котором оно было возвращено после использования функции find.
Определение библиотек вручную
В общем, бэкенд — это обертка над общей библиотекой, которая реализует API для доступа к USB. По-умолчанию, бэкенд использует ctypes функцию find_library(). На Linux и других Unix-подобных Операционных Системах, find_library пытается запустить внешние программы (такие как /sbin/ldconfig, gcc и objdump) в целях нахождения файла библиотеки.
В системах в которых эти программы отсутствуют и/или кэш библиотек отключен, эта функция не может использоваться. Чтобы преодолеть ограничения, PyUSB позволяет Вам подавать пользовательскую функцию find_library() на бэкенд.
Примером такого сценария будет:
Заметьте, что find_library — аргумент для функции get_backend(), в котором Вы поставляете функцию, которая ответственная за поиск правильной библиотеки для бэкенда.
Правила старой школы
Если Вы пишите приложение используя старые API PyUSB (0.что-то-там), Вы можете спрашивать себя, нужно ли Вам обновить Ваш код, чтобы использовать новый API. Что ж, Вам стоит это сделать, но это не обязательно. PyUSB 1.0 идет вместе с модулем совместимости usb.legacy. Он включает в себя старое API на основе нового API. «Что ж, должен ли я просто заменить мою строчку import usb на import usb.legacy as usb чтобы заставить моё приложение работать?», спросите Вы. Ответ — да, это будет работать, но это не обязательно. Если Вы запустите свое приложение неизмененным оно будет работать, потому что строчка import usb импортирует все публичные символы из usb.legacy. Если Вы сталкиваетесь с проблемой — скорее всего Вы нашли баг.
Помогите мне, пожалуйста
Если Вас нужна помощь, не пишите мне на e-mail, для этого есть список рассылки. Инструкции по подписке могут быть найдены на сайте PyUSB.
[1] Когда я пишу True или False (с большой буквы), я имею ввиду соответственные значения языка Python. А когда я говорю истинно (true) или ложно (false), я имею ввиду любое выражение Python, которое расценивается как истинное или ложное. (Данное сходство имело место в оригинале и помогает понять понятия истинного и ложного в переводе. — Прим.пер.):
[2] Смотрите конкретную документацию бэкенда.
[3] Спецификация USB не навязывает какое-либо определенное значение для значения конфигурации. То же истинно для номеров интерфейса и альтернативной настройки.
[4] На самом деле всё немного сложнее, но этого просто объяснения для нас хватит.
[5] Я знаю, что это звучит странно.
[6] Это потому, что если нету пропускной способности для изохронных потоков данных во время конфигурации устройства, оно может быть успешно пронумеровано.
[7] Этого не происходит для конфигурации, потому что устройству разрешено быть в несконфигурированном состоянии.
[8] В PyUSB управляющие потоки данных обращаются к контрольной точке 0. Очень очень очень редко устройство имеет альтернативную управляющую контрольную точку (Я никогда не встречал такого устройства).
[9] Это просто шутка, не принимайте это всерьез. Большой выбор лучше, чем без выбора.
Источник
Теперь вы знаете какие однокоренные слова подходят к слову Как написать драйвер на python, а так же какой у него корень, приставка, суффикс и окончание. Вы можете дополнить список однокоренных слов к слову «Как написать драйвер на python», предложив свой вариант в комментариях ниже, а также выразить свое несогласие проведенным с морфемным разбором.
Уведомления
- Начало
- » Python для новичков
- » Можно писать драйвера или нет
#1 Июль 5, 2014 21:23:59
Можно писать драйвера или нет
Добрый вечер !
Скажите можно ли написать на питоне драйвера на 4G модем.
Деля Ubuntu если да то где надо копать спасибо заранее
Офлайн
- Пожаловаться
#2 Июль 6, 2014 03:18:27
Можно писать драйвера или нет
драйвера пишут на C потому что они на низком уровне связанны с ОС. Даже если на Питоне может быть физически и можно, но это делать никто не будет. Вы, точно не напишите, хотя бы потому что задаете такой вопрос. Возьмите драйвера официальные на модем, загляните в исходники, сделайте выводы.
Тема флейм.
_________________________________________________________________________________
полезный блог о python john16blog.blogspot.com
Офлайн
- Пожаловаться
#3 Июль 6, 2014 09:09:40
Можно писать драйвера или нет
Я задал такой вопрос потому что все говорят что на питоне можно писать все.
Можно тогда написать флешь плеер как Adobe Flash Player. Или нет
Офлайн
- Пожаловаться
#4 Июль 6, 2014 09:44:23
Можно писать драйвера или нет
Можно, но не нужно.
Офлайн
- Пожаловаться
#5 Июль 6, 2014 10:19:25
Можно писать драйвера или нет
Можете показать пример программы типа Adobe Flash Player .
Чтобы знать как можно такую программу написать на питоне .
Была пример что и как работает.
Да посоветуйте книги какие по читать.
Если у кого не будь книга Майкд Доусон Программируем на Python.
Дайте ссылку чтобы скачать спасибо заранее
Офлайн
- Пожаловаться
- Начало
- » Python для новичков
- » Можно писать драйвера или нет
7 ответов
Да. Вы не можете создавать «классические» драйверы режима ядра. Однако, начиная с XP, Windows предлагает «Рамка драйверов пользовательского режима» . Они не могут делать все, очевидно — любой драйвер, используемый при загрузке ОС, должен быть в ядре. Но с UMDF вам нужно только реализовать COM-компоненты.
Помимо драйверов загрузки, вы также не можете записывать драйверы UMDF, которые:
- Обработка прерываний
- Непосредственный доступ к оборудованию, например, прямой доступ к памяти (DMA)
- имеют строгие циклы синхронизации.
- Использовать невыгружаемый пул или другие ресурсы, зарезервированные для режима ядра.
MSalters
11 июнь 2009, в 15:43
Поделиться
Хороший способ получить представление о том, почему это практически невозможно, — это прочитать совет Microsoft по использованию С++ в драйверах. Как производная от C, использование С++ представляется простым. На практике это не так.
Например, вы должны решить для каждой функции (и действительно каждой инструкции сборки), будь то в доступной для страниц или нестранимой памяти. Для этого требуются расширения для C, тщательное использование новых возможностей С++ или в этом случае специальное расширение для языка Python и виртуальной машины. Кроме того, ваша совместимая с драйвером VM также должна иметь дело с разными IRQL — там есть иерархия уровней, которые ограничивают то, что вы можете и чего не можете сделать.
MSalters
11 июнь 2009, в 15:16
Поделиться
Окончательный ответ не исключает встраивания интерпретатора в ваш другой драйвер C/assembly. Если у кого-то нет рамки, тогда ответ будет отрицательным. Когда у вас есть интерпретатор и привязки на месте, остальная часть логики может быть выполнена в Python.
Однако писать драйверы — одна из тех вещей, для которых C лучше всего подходит. Я предполагаю, что полученный в результате код Python будет выглядеть очень похож на код C и превзойти цель служебных данных интерпретатора.
Judge Maygarden
11 июнь 2009, в 14:44
Поделиться
Я не знаю ограничений на драйверы для Windows (схемы распределения памяти, динамическая загрузка библиотек и все), но вы можете внедрить интерпретатор python в свой драйвер, после чего вы можете делать все, что хотите, Не то, чтобы я думаю, что это хорошая идея:)
David Cournapeau
11 июнь 2009, в 15:30
Поделиться
Python запускается на виртуальной машине, поэтому нет.
НО:
Вы можете написать компилятор, который переводит код Python на машинный язык. Как только вы это сделаете, вы можете это сделать.
Mark Lutton
11 июнь 2009, в 15:11
Поделиться
Никогда не говори никогда, но эх.. нет
Возможно, вы сможете взломать что-то вместе, чтобы запустить части драйверов в режиме python. Но материал в режиме ядра можно выполнять только в C или сборке.
Mendelt
11 июнь 2009, в 15:32
Поделиться
Нет, они не могут. Драйверы Windows должны быть написаны на языке, который может
- Интерфейс с API на основе C
- Скомпилировать до машинного кода
Тогда опять же ничего не мешает вам писать компилятор, который переводит python на машинный код;)
JaredPar
11 июнь 2009, в 15:12
Поделиться
Ещё вопросы
- 1Преобразовать строку в объект JSON
- 0vim новая строка в конце файла во время ввода ifstream
- 0Как добавить вход формы в переменную без обновления?
- 1Массовая вставка данных из списка словарей в базу данных Postgresql [Faster Way]?
- 0Назовите JS-функцию по clientId
- 1Как доказать, что Java не поддерживает множественное наследование?
- 0Выберите имена для массива из объекта
- 1Нельзя использовать анимированный шейдер смещения с OBJ Loader (TypeError: сетка не определена)
- 1Показать то же изображение из базы данных на следующей странице
- 0C ++ указатели возвращаются назад
- 1Java Regex для соответствия пустой строке
- 0Я хочу получить разные значения для 2 столбцов в одной таблице и связанных столбцов уникальных значений для этих уникальных значений в SQL
- 1Какой атрибут в XSD будет аннотировать поле с @XmlElement (обязательно = false) в сгенерированных JAXB классах
- 1Как выровнять JLabel к нижней части JPanel
- 1Выполнение процесса в Java с паролем
- 0Безопасность MediaWiki: альтернативный файл
- 0Правильна ли моя логика? Это дает ошибку сегментации
- 0C ++: преобразование const uint8_t * в std :: stringstream
- 1Android SQLite Проблема: сбой программы при попытке запроса!
- 0Codeigniter игнорировать настройки электронной почты
- 0Передача переменной текстового поля
- 0Как решить формулу из поля базы данных MySQL в PHP?
- 0хотите отображать контент в материализованных вкладках с помощью mysql и php js
- 0Как я могу перебрать CSV-файл в RUBY, чтобы создать неупорядоченный список HTML?
- 0C ++, как разбить код на несколько файлов?
- 1Конфигурация Android изменена
- 1Как преобразовать шестнадцатеричную строку в цветное изображение в Python?
- 1Невозможно получить значения из входного текста в DOM
- 1Категориальные переменные панды
- 1Как получить ресурсы из ссылочного проекта в VisualSstudio
- 1Привязка DataTable к полям шаблона GridView
- 0мод-переписать простой редирект внутри папки
- 1Как избежать обратной косой черты в xpath webdriver java
- 0Проблемы настройки высоты изображения
- 1Ultralite поддерживается для Android?
- 0На файл .lib все еще ссылаются, когда тип проекта изменяется на dll
- 0неопределенная переменная при отображении данных
- 0php груша почта не может отправлять HTML отформатированную почту
- 1Как создать диспетчер потоков для приложения для Android?
- 0Трудность обработки данных из поста jQuery в pg_query ();
- 1Javascript: Как проверить, есть ли в массиве объекта один и только один элемент с заданным значением?
- 1Как установить нижний колонтитул в верхнем слое приложения в Android?
- 1Комета без AJAX
- 0Opencv: Hough Circles Автоматизировать параметры?
- 1В python Dataframes, Как добавить несколько строк, связанных с одной строкой в другом dataframe, как дополнительные столбцы?
- 0PHP заменяет {replace_me} на <? Php include?> В выходном буфере
- 1Имитация прикосновений к активности другого приложения
- 1хотел бы извлечь составные пары существительное-прилагательное из предложения. Итак, в основном я хочу что-то вроде:
- 1Как запустить файл Java, когда файл не создан
Кровавый, ещё ASM может потребоваться и знание низкоуровневых функций и вызовов ядра Linux.
A.J, на питоне системный софт нельзя писать. Топать надо в сторону изучения C. Потом тебе потребуется промониторить USB, т.е. собрать кучу статистики о том, как родная программа для модема работает с ним и разобраться в том, какие команды модем может принимать и получать, а также догадаться, что они обозначают. Это самая сложная часть работы (реверс-инженеринг). Так как маловероятно, что производители модема поделятся с тобой этой информацией. Для собирания информации можно пойти двумя путями, подробно расписывать не буду, найдешь в гугле сам.
1) Можно поставить винду в виртуалку, настроить проброс портов USB, подключить модем, и собирать статистику этого проброса (тогда гарантированно ты получишь все данные, как бы не старался защитить это производитель). А второй вариант заключаетя в том, что в винду устанавливается такая программа, которая собирает эту статистику (в 99% случаев этого достаточно). После того, как ты разобрался, что к чему надо приступать к кодингу. Пример реверс-инженеринга и разработки простого драйвера можешь поглядеть тут.