linux - Request authentication on gatt server with BlueZ 5.37 -
i running on rpi gatt server(example-gatt-server modified) using bluez ble stack , runs fine wandering if managed set pin authentication server. haven't found info online this, don't know if gatt profile supports pin authentication.
#!/usr/bin/python __future__ import absolute_import, print_function, unicode_literals optparse import optionparser import sys import dbus import dbus.service import dbus.mainloop.glib import gobject gobject import bluezutils import config_file import config_def bus_name = 'org.bluez' agent_interface = 'org.bluez.agent1' agent_path = "/test/agent" bus = none device_obj = none dev_path = none def ask(prompt): try: return raw_input(prompt) except: return input(prompt) def set_trusted(path): props = dbus.interface(bus.get_object("org.bluez", path), "org.freedesktop.dbus.properties") props.set("org.bluez.device1", "trusted", true) def dev_connect(path): dev = dbus.interface(bus.get_object("org.bluez", path), "org.bluez.device1") dev.connect() class rejected(dbus.dbusexception): _dbus_error_name = "org.bluez.error.rejected" class agent(dbus.service.object): exit_on_release = true def set_exit_on_release(self, exit_on_release): self.exit_on_release = exit_on_release @dbus.service.method(agent_interface, in_signature="", out_signature="") def release(self): print("release") if self.exit_on_release: mainloop.quit() @dbus.service.method(agent_interface, in_signature="os", out_signature="") def authorizeservice(self, device, uuid): print("authorizeservice (%s, %s)" % (device, uuid)) authorize = ask("authorize connection (yes/no): ") if (authorize == "yes"): return raise rejected("connection rejected user") @dbus.service.method(agent_interface, in_signature="o", out_signature="s") def requestpincode(self, device): print("requestpincode (%s)" % (device)) set_trusted(device) return ask("enter pin code: ") #return config_file.get(config_def.ble_section, config_def.ble_pin) @dbus.service.method(agent_interface, in_signature="o", out_signature="u") def requestpasskey(self, device): print("requestpasskey (%s)" % (device)) set_trusted(device) passkey = ask("enter passkey: ") return dbus.uint32(passkey) #return config_file.get(config_def.ble_section, config_def.ble_pin) @dbus.service.method(agent_interface, in_signature="ouq", out_signature="") def displaypasskey(self, device, passkey, entered): print("displaypasskey (%s, %06u entered %u)" % (device, passkey, entered)) @dbus.service.method(agent_interface, in_signature="os", out_signature="") def displaypincode(self, device, pincode): print("displaypincode (%s, %s)" % (device, pincode)) @dbus.service.method(agent_interface, in_signature="ou", out_signature="") def requestconfirmation(self, device, passkey): print("requestconfirmation (%s, %06d)" % (device, passkey)) confirm = ask("confirm passkey (yes/no): ") if (confirm == "yes"): set_trusted(device) return raise rejected("passkey doesn't match") @dbus.service.method(agent_interface, in_signature="o", out_signature="") def requestauthorization(self, device): print("requestauthorization (%s)" % (device)) auth = ask("authorize? (yes/no): ") if (auth == "yes"): return raise rejected("pairing rejected") @dbus.service.method(agent_interface, in_signature="", out_signature="") def cancel(self): print("cancel") def pair_reply(): print("device paired") set_trusted(dev_path) dev_connect(dev_path) mainloop.quit() def pair_error(error): err_name = error.get_dbus_name() if err_name == "org.freedesktop.dbus.error.noreply" , device_obj: print("timed out. cancelling pairing") device_obj.cancelpairing() else: print("creating device failed: %s" % (error)) mainloop.quit() if __name__ == '__main__': dbus.mainloop.glib.dbusgmainloop(set_as_default=true) bus = dbus.systembus() capability = "keyboarddisplay" parser = optionparser() parser.add_option("-i", "--adapter", action="store", type="string", dest="adapter_pattern", default=none) parser.add_option("-c", "--capability", action="store", type="string", dest="capability") parser.add_option("-t", "--timeout", action="store", type="int", dest="timeout", default=60000) (options, args) = parser.parse_args() if options.capability: capability = options.capability path = "/test/agent" agent = agent(bus, path) mainloop = gobject.mainloop() print("capability:" + capability) obj = bus.get_object(bus_name, "/org/bluez"); manager = dbus.interface(obj, "org.bluez.agentmanager1") manager.registeragent(path, capability) #manager.registeragent(agent, capability) print("agent registered") # fix-up old style invocation (bluez 4) if len(args) > 0 , args[0].startswith("hci"): options.adapter_pattern = args[0] del args[:1] if len(args) > 0: print("custom agent registered") device = bluezutils.find_device(args[0], options.adapter_pattern) dev_path = device.object_path agent.set_exit_on_release(false) device.pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000) device_obj = device else: manager.requestdefaultagent(path) #manager.requestdefaultagent(agent) print("default agent registered") mainloop.run()
Comments
Post a Comment