mirror of
https://github.com/paparazzi/paparazzi.git
synced 2026-06-06 16:58:48 +08:00
[dfu] pep8: 4spaces, print function for python3 compat
This commit is contained in:
Regular → Executable
+113
-110
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# dfu.py: Access USB DFU class devices
|
||||
# Copyright (C) 2009 Black Sphere Technologies
|
||||
# Copyright (C) 2009 Black Sphere Technologies
|
||||
# Copyright (C) 2012 Transition Robotics Inc.
|
||||
# Written by Gareth McMullin <gareth@blacksphere.co.nz>
|
||||
# Modified by Piotr Esden-Tempski <piotr@esden.net>
|
||||
@@ -19,12 +19,13 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import usb
|
||||
|
||||
DFU_DETACH_TIMEOUT = 1000
|
||||
|
||||
# DFU Requests
|
||||
# DFU Requests
|
||||
DFU_DETACH = 0x00
|
||||
DFU_DNLOAD = 0x01
|
||||
DFU_UPLOAD = 0x02
|
||||
@@ -65,136 +66,138 @@ DFU_STATUS_ERROR_UNKNOWN = 0x0e
|
||||
DFU_STATUS_ERROR_STALLEDPKT = 0x0f
|
||||
|
||||
class dfu_status(object):
|
||||
def __init__(self, buf):
|
||||
self.bStatus = buf[0]
|
||||
self.bwPollTimeout = buf[1] + (buf[2]<<8) + (buf[3]<<16)
|
||||
self.bState = buf[4]
|
||||
self.iString = buf[5]
|
||||
def __init__(self, buf):
|
||||
self.bStatus = buf[0]
|
||||
self.bwPollTimeout = buf[1] + (buf[2]<<8) + (buf[3]<<16)
|
||||
self.bState = buf[4]
|
||||
self.iString = buf[5]
|
||||
|
||||
|
||||
class dfu_device(object):
|
||||
def __init__(self, dev, conf, iface):
|
||||
self.dev = dev
|
||||
self.conf = conf
|
||||
self.iface = iface
|
||||
self.handle = self.dev.open()
|
||||
try:
|
||||
self.handle.setConfiguration(conf)
|
||||
except: pass
|
||||
self.handle.claimInterface(iface.interfaceNumber)
|
||||
if type(self.iface) is usb.Interface:
|
||||
self.index = self.iface.interfaceNumber
|
||||
else: self.index = self.iface
|
||||
def __init__(self, dev, conf, iface):
|
||||
self.dev = dev
|
||||
self.conf = conf
|
||||
self.iface = iface
|
||||
self.handle = self.dev.open()
|
||||
try:
|
||||
self.handle.setConfiguration(conf)
|
||||
except:
|
||||
pass
|
||||
self.handle.claimInterface(iface.interfaceNumber)
|
||||
if type(self.iface) is usb.Interface:
|
||||
self.index = self.iface.interfaceNumber
|
||||
else:
|
||||
self.index = self.iface
|
||||
|
||||
def detach(self, wTimeout=255):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_DETACH,
|
||||
None, value=wTimeout, index=self.index)
|
||||
def detach(self, wTimeout=255):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_DETACH,
|
||||
None, value=wTimeout, index=self.index)
|
||||
|
||||
def download(self, wBlockNum, data):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_DNLOAD,
|
||||
data, value=wBlockNum, index=self.index)
|
||||
def download(self, wBlockNum, data):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_DNLOAD,
|
||||
data, value=wBlockNum, index=self.index)
|
||||
|
||||
def upload(self, wBlockNum, length):
|
||||
return self.handle.controlMsg(usb.ENDPOINT_IN |
|
||||
usb.TYPE_CLASS | usb.RECIP_INTERFACE, DFU_UPLOAD,
|
||||
length, value=wBlockNum, index=self.index)
|
||||
def upload(self, wBlockNum, length):
|
||||
return self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_UPLOAD,
|
||||
length, value=wBlockNum,
|
||||
index=self.index)
|
||||
|
||||
def get_status(self):
|
||||
buf = self.handle.controlMsg(usb.ENDPOINT_IN |
|
||||
usb.TYPE_CLASS | usb.RECIP_INTERFACE, DFU_GETSTATUS,
|
||||
6, index=self.index)
|
||||
return dfu_status(buf)
|
||||
|
||||
def clear_status(self):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_CLRSTATUS,
|
||||
"", index=0)
|
||||
def get_status(self):
|
||||
buf = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_GETSTATUS,
|
||||
6, index=self.index)
|
||||
return dfu_status(buf)
|
||||
|
||||
def get_state(self):
|
||||
buf = self.handle.controlMsg(usb.ENDPOINT_IN |
|
||||
usb.TYPE_CLASS | usb.RECIP_INTERFACE, DFU_GETSTATE,
|
||||
1, index=self.index)
|
||||
return buf[0]
|
||||
def clear_status(self):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_CLRSTATUS,
|
||||
"", index=0)
|
||||
|
||||
def abort(self):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_ABORT,
|
||||
None, index=self.index)
|
||||
def get_state(self):
|
||||
buf = self.handle.controlMsg(usb.ENDPOINT_IN | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_GETSTATE,
|
||||
1, index=self.index)
|
||||
return buf[0]
|
||||
|
||||
def abort(self):
|
||||
self.handle.controlMsg(usb.ENDPOINT_OUT | usb.TYPE_CLASS |
|
||||
usb.RECIP_INTERFACE, DFU_ABORT,
|
||||
None, index=self.index)
|
||||
|
||||
|
||||
def make_idle(self):
|
||||
retries = 3
|
||||
while retries:
|
||||
try:
|
||||
status = self.get_status()
|
||||
except:
|
||||
self.clear_status()
|
||||
continue
|
||||
def make_idle(self):
|
||||
retries = 3
|
||||
while retries:
|
||||
try:
|
||||
status = self.get_status()
|
||||
except:
|
||||
self.clear_status()
|
||||
continue
|
||||
|
||||
retries -= 1
|
||||
retries -= 1
|
||||
|
||||
if status.bState == STATE_DFU_IDLE:
|
||||
return True
|
||||
if status.bState == STATE_DFU_IDLE:
|
||||
return True
|
||||
|
||||
if ((status.bState == STATE_DFU_DOWNLOAD_SYNC) or
|
||||
(status.bState == STATE_DFU_DOWNLOAD_IDLE) or
|
||||
(status.bState == STATE_DFU_MANIFEST_SYNC) or
|
||||
(status.bState == STATE_DFU_UPLOAD_IDLE) or
|
||||
(status.bState == STATE_DFU_DOWNLOAD_BUSY) or
|
||||
(status.bState == STATE_DFU_MANIFEST)):
|
||||
self.abort()
|
||||
continue
|
||||
if ((status.bState == STATE_DFU_DOWNLOAD_SYNC) or
|
||||
(status.bState == STATE_DFU_DOWNLOAD_IDLE) or
|
||||
(status.bState == STATE_DFU_MANIFEST_SYNC) or
|
||||
(status.bState == STATE_DFU_UPLOAD_IDLE) or
|
||||
(status.bState == STATE_DFU_DOWNLOAD_BUSY) or
|
||||
(status.bState == STATE_DFU_MANIFEST)):
|
||||
self.abort()
|
||||
continue
|
||||
|
||||
if status.bState == STATE_DFU_ERROR:
|
||||
self.clear_status()
|
||||
continue
|
||||
if status.bState == STATE_DFU_ERROR:
|
||||
self.clear_status()
|
||||
continue
|
||||
|
||||
if status.bState == STATE_APP_IDLE:
|
||||
self.detach(DFU_DETACH_TIMEOUT)
|
||||
continue
|
||||
if status.bState == STATE_APP_IDLE:
|
||||
self.detach(DFU_DETACH_TIMEOUT)
|
||||
continue
|
||||
|
||||
if ((status.bState == STATE_APP_DETACH) or
|
||||
(status.bState == STATE_DFU_MANIFEST_WAIT_RESET)):
|
||||
usb.reset(self.handle)
|
||||
return False
|
||||
if ((status.bState == STATE_APP_DETACH) or
|
||||
(status.bState == STATE_DFU_MANIFEST_WAIT_RESET)):
|
||||
usb.reset(self.handle)
|
||||
return False
|
||||
|
||||
raise Exception
|
||||
raise Exception
|
||||
|
||||
def finddevs():
|
||||
devs = []
|
||||
for bus in usb.busses():
|
||||
for dev in bus.devices:
|
||||
for conf in dev.configurations:
|
||||
for ifaces in conf.interfaces:
|
||||
for iface in ifaces:
|
||||
if ((iface.interfaceClass == 0xFE) and
|
||||
(iface.interfaceSubClass == 0x01)):
|
||||
devs.append((dev, conf, iface))
|
||||
return devs
|
||||
devs = []
|
||||
for bus in usb.busses():
|
||||
for dev in bus.devices:
|
||||
for conf in dev.configurations:
|
||||
for ifaces in conf.interfaces:
|
||||
for iface in ifaces:
|
||||
if ((iface.interfaceClass == 0xFE) and
|
||||
(iface.interfaceSubClass == 0x01)):
|
||||
devs.append((dev, conf, iface))
|
||||
return devs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
devs = finddevs()
|
||||
if not devs:
|
||||
print "No devices found!"
|
||||
exit(-1)
|
||||
|
||||
for dfu in devs:
|
||||
handle = dfu[0].open()
|
||||
try:
|
||||
man = handle.getString(dfu[0].iManufacturer, 30)
|
||||
product = handle.getString(dfu[0].iProduct, 30)
|
||||
serial = handle.getString(dfu[0].iSerialNumber, 40)
|
||||
except:
|
||||
print "Could not access descriptions strings of a DFU device. Maybe the OS driver is claiming it?"
|
||||
continue
|
||||
|
||||
print "Device %s: ID %04x:%04x %s - %s - %s" % (dfu[0].filename,
|
||||
dfu[0].idVendor, dfu[0].idProduct, man, product, serial)
|
||||
print "%r, %r" % (dfu[1], dfu[2])
|
||||
print "Finished scanning for devices."
|
||||
devs = finddevs()
|
||||
if not devs:
|
||||
print("No devices found!")
|
||||
exit(-1)
|
||||
|
||||
for dfu in devs:
|
||||
handle = dfu[0].open()
|
||||
try:
|
||||
man = handle.getString(dfu[0].iManufacturer, 30)
|
||||
product = handle.getString(dfu[0].iProduct, 30)
|
||||
serial = handle.getString(dfu[0].iSerialNumber, 40)
|
||||
except:
|
||||
print("Could not access descriptions strings of a DFU device. " +
|
||||
"Maybe the OS driver is claiming it?")
|
||||
continue
|
||||
|
||||
print("Device %s: ID %04x:%04x %s - %s - %s" % (dfu[0].filename,
|
||||
dfu[0].idVendor, dfu[0].idProduct, man, product, serial))
|
||||
print("%r, %r" % (dfu[1], dfu[2]))
|
||||
|
||||
print("Finished scanning for devices.")
|
||||
|
||||
+82
-79
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/python
|
||||
#
|
||||
# stm32_mem.py: STM32 memory access using USB DFU class
|
||||
# Copyright (C) 2011 Black Sphere Technologies
|
||||
# Copyright (C) 2011 Black Sphere Technologies
|
||||
# Written by Gareth McMullin <gareth@blacksphere.co.nz>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
@@ -17,6 +17,8 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from time import sleep
|
||||
import struct
|
||||
from sys import stdout, argv
|
||||
@@ -27,97 +29,98 @@ import dfu
|
||||
APP_ADDRESS = 0x08002000
|
||||
SECTOR_SIZE = 2048
|
||||
|
||||
CMD_GETCOMMANDS = 0x00
|
||||
CMD_SETADDRESSPOINTER = 0x21
|
||||
CMD_ERASE = 0x41
|
||||
CMD_GETCOMMANDS = 0x00
|
||||
CMD_SETADDRESSPOINTER = 0x21
|
||||
CMD_ERASE = 0x41
|
||||
|
||||
def stm32_erase(dev, addr):
|
||||
erase_cmd = struct.pack("<BL", CMD_ERASE, addr)
|
||||
dev.download(0, erase_cmd)
|
||||
while True:
|
||||
status = dev.get_status()
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_BUSY:
|
||||
sleep(status.bwPollTimeout / 1000.0)
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_IDLE:
|
||||
break
|
||||
erase_cmd = struct.pack("<BL", CMD_ERASE, addr)
|
||||
dev.download(0, erase_cmd)
|
||||
while True:
|
||||
status = dev.get_status()
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_BUSY:
|
||||
sleep(status.bwPollTimeout / 1000.0)
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_IDLE:
|
||||
break
|
||||
|
||||
def stm32_write(dev, data):
|
||||
dev.download(2, data)
|
||||
while True:
|
||||
status = dev.get_status()
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_BUSY:
|
||||
sleep(status.bwPollTimeout / 1000.0)
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_IDLE:
|
||||
break
|
||||
|
||||
dev.download(2, data)
|
||||
while True:
|
||||
status = dev.get_status()
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_BUSY:
|
||||
sleep(status.bwPollTimeout / 1000.0)
|
||||
if status.bState == dfu.STATE_DFU_DOWNLOAD_IDLE:
|
||||
break
|
||||
|
||||
def stm32_manifest(dev):
|
||||
dev.download(0, "")
|
||||
while True:
|
||||
try:
|
||||
status = dev.get_status()
|
||||
except:
|
||||
return
|
||||
sleep(status.bwPollTimeout / 1000.0)
|
||||
if status.bState == dfu.STATE_DFU_MANIFEST:
|
||||
break
|
||||
dev.download(0, "")
|
||||
while True:
|
||||
try:
|
||||
status = dev.get_status()
|
||||
except:
|
||||
return
|
||||
sleep(status.bwPollTimeout / 1000.0)
|
||||
if status.bState == dfu.STATE_DFU_MANIFEST:
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
print
|
||||
print "USB Device Firmware Upgrade - Host Utility -- version 1.2"
|
||||
print "Copyright (C) 2011 Black Sphere Technologies"
|
||||
print "Copyright (C) 2012 Transition Robotics Inc."
|
||||
print "License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>"
|
||||
print
|
||||
print("")
|
||||
print("USB Device Firmware Upgrade - Host Utility -- version 1.2")
|
||||
print("Copyright (C) 2011 Black Sphere Technologies")
|
||||
print("Copyright (C) 2012 Transition Robotics Inc.")
|
||||
print("License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>")
|
||||
print("")
|
||||
|
||||
devs = dfu.finddevs()
|
||||
if not devs:
|
||||
print "No devices found!"
|
||||
exit(-1)
|
||||
devs = dfu.finddevs()
|
||||
if not devs:
|
||||
print("No devices found!")
|
||||
exit(-1)
|
||||
|
||||
for dev in devs:
|
||||
dfudev = dfu.dfu_device(*dev)
|
||||
try:
|
||||
man = dfudev.handle.getString(dfudev.dev.iManufacturer, 30)
|
||||
product = dfudev.handle.getString(dfudev.dev.iProduct, 30)
|
||||
serial = dfudev.handle.getString(dfudev.dev.iSerialNumber, 40)
|
||||
except:
|
||||
print "Could not access the description strings of a DFU device. Maybe the OS driver is claiming it?"
|
||||
continue
|
||||
if man == "Black Sphere Technologies": break
|
||||
if man == "Transition Robotics Inc.": break
|
||||
if man == "STMicroelectronics": break
|
||||
for dev in devs:
|
||||
dfudev = dfu.dfu_device(*dev)
|
||||
try:
|
||||
man = dfudev.handle.getString(dfudev.dev.iManufacturer, 30)
|
||||
product = dfudev.handle.getString(dfudev.dev.iProduct, 30)
|
||||
serial = dfudev.handle.getString(dfudev.dev.iSerialNumber, 40)
|
||||
except:
|
||||
print("Could not access the description strings of a DFU device. " +
|
||||
"Maybe the OS driver is claiming it?")
|
||||
continue
|
||||
if man == "Black Sphere Technologies": break
|
||||
if man == "Transition Robotics Inc.": break
|
||||
if man == "STMicroelectronics": break
|
||||
|
||||
print "Device %s: ID %04x:%04x %s - %s - %s" % (dfudev.dev.filename,
|
||||
dfudev.dev.idVendor, dfudev.dev.idProduct, man, product, serial)
|
||||
print("Device %s: ID %04x:%04x %s - %s - %s" % (dfudev.dev.filename,
|
||||
dfudev.dev.idVendor, dfudev.dev.idProduct, man, product, serial))
|
||||
|
||||
try:
|
||||
state = dfudev.get_state()
|
||||
except:
|
||||
print "Failed to read device state! Assuming APP_IDLE"
|
||||
state = dfu.STATE_APP_IDLE
|
||||
if state == dfu.STATE_APP_IDLE:
|
||||
dfudev.detach()
|
||||
print "Run again to upgrade firmware."
|
||||
exit(0)
|
||||
|
||||
dfudev.make_idle()
|
||||
try:
|
||||
state = dfudev.get_state()
|
||||
except:
|
||||
print("Failed to read device state! Assuming APP_IDLE")
|
||||
state = dfu.STATE_APP_IDLE
|
||||
if state == dfu.STATE_APP_IDLE:
|
||||
dfudev.detach()
|
||||
print("Run again to upgrade firmware.")
|
||||
exit(0)
|
||||
|
||||
try:
|
||||
bin = open(argv[1], "rb").read()
|
||||
except:
|
||||
print "Could not open binary file."
|
||||
raise
|
||||
dfudev.make_idle()
|
||||
|
||||
addr = APP_ADDRESS
|
||||
while bin:
|
||||
print ("Programming memory at 0x%08X\r" % addr),
|
||||
stdout.flush()
|
||||
stm32_erase(dfudev, addr)
|
||||
stm32_write(dfudev, bin[:SECTOR_SIZE])
|
||||
try:
|
||||
bin = open(argv[1], "rb").read()
|
||||
except:
|
||||
print("Could not open binary file.")
|
||||
raise
|
||||
|
||||
bin = bin[SECTOR_SIZE:]
|
||||
addr += SECTOR_SIZE
|
||||
addr = APP_ADDRESS
|
||||
while bin:
|
||||
print("Programming memory at 0x%08X\r" % addr)
|
||||
stdout.flush()
|
||||
stm32_erase(dfudev, addr)
|
||||
stm32_write(dfudev, bin[:SECTOR_SIZE])
|
||||
|
||||
stm32_manifest(dfudev)
|
||||
bin = bin[SECTOR_SIZE:]
|
||||
addr += SECTOR_SIZE
|
||||
|
||||
print "\nAll operations complete!\n"
|
||||
stm32_manifest(dfudev)
|
||||
|
||||
print("\nAll operations complete!\n")
|
||||
|
||||
Reference in New Issue
Block a user