-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patha2dp.py
More file actions
126 lines (107 loc) · 4.37 KB
/
a2dp.py
File metadata and controls
126 lines (107 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from gi.repository import Gio,GLib
import dbus
import dbus.exceptions
import dbus.service
from dbus.mainloop.glib import DBusGMainLoop
from socket import *
import sys
from _Pump import datapump
ADAPTER_PATH = "/org/bluez/hci0"
#edit next line so bd_addr matches your device
DEVICE_ADDR = "AA:BB:CC:DD:EE:FF"
DEVICE_PATH = ADAPTER_PATH + "/dev_" + DEVICE_ADDR.replace(':','_')
SOURCE_UUID = "0000110a-0000-1000-8000-00805f9b34fb"
SINK_UUID = "000110b-0000-1000-8000-00805f9b34fb"
bluez_name = "org.bluez"
INTROSPECT_IFACE = "org.freedesktop.DBus.Introspectable"
OM_IFACE = "org.freedesktop.DBus.ObjectManager"
PROPS_IFACE = "org.freedesktop.DBus.Properties"
ADAPTER_IFACE="org.bluez.Adapter1"
ENDPOINT_IFACE = "org.bluez.MediaEndpoint1"
TRANSPORT_IFACE = "org.bluez.MediaTransport1"
DBusGMainLoop(set_as_default=True)
mainloop = GLib.MainLoop()
system_bus = dbus.SystemBus()
gtransport=None
adpt = system_bus.get_object(bluez_name, ADAPTER_PATH)
adapter = dbus.Interface(adpt, PROPS_IFACE)
uuids = adapter.Get(ADAPTER_IFACE, 'UUIDs')
for uuid in uuids:
if uuid == SOURCE_UUID:
print("A2DP source found. First disable pulseaudio with\npulseaudio --kill")
quit()
if len(sys.argv)==2:
filename = sys.argv[1]
wavf=open(filename,"rb")
h = wavf.read(40)
numchans = h[22]+(h[23]<<8)
freq = h[24] + (h[25]<<8) + (h[26]<<16) + (h[27]<<24)
print("numchans=", numchans, "samplefreq=",freq)
wavf.close()
else:
print("usage: python a2dp.py <wavfilename>")
quit()
class Endpoint(dbus.service.Object):
def __init__(self, bus, path, properties, configuration):
self.bus=bus
self.path=path
self.properties=properties
self.configuration=configuration
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def get_properties(self):
return self.properties
@dbus.service.method(ENDPOINT_IFACE, in_signature='oay', out_signature='')
def SetConfiguration(self, transport, properties):
global gtransport, mode
print("SET CONFIG")
gtransport = transport
print("transport=", transport)
for k,v in properties.items(): print(k,v)
# cannot aquire transport immediately. wait for connection.
system_bus.add_signal_receiver(sig_handler,
signal_name="PropertiesChanged",
dbus_interface=PROPS_IFACE, path=DEVICE_PATH)
@dbus.service.method(ENDPOINT_IFACE, in_signature='ay', out_signature='ay')
def SelectConfiguration(self, capabilities):
print("SELECT CONFIG", capabilities)
return self.configuration
@dbus.service.method(ENDPOINT_IFACE, in_signature='o', out_signature='')
def ClearConfiguration(self, transport):
print("CLEAR")
@dbus.service.method(PROPS_IFACE, in_signature='s', out_signature='a{sv}')
def GetAll(self, iface):
if iface != EDNPOINT_IFACE: print("GETALL WRONG IFACE")
return self.properties
adapter = system_bus.get_object(bluez_name, ADAPTER_PATH)
media = dbus.Interface(adapter, "org.bluez.Media1")
eppath = "/Endpoint/A2DPSource/sbc"
if numchans==1: b0=0x28 # 44100 mono
else: b0=0x22 # 44100 stereo
SBC_CAPS = dbus.Array([dbus.Byte(b0), dbus.Byte(0xff), dbus.Byte(10), dbus.Byte(53)])
SBC_CONFIG = dbus.Array([dbus.Byte(0x28), dbus.Byte(0x15), dbus.Byte(2), dbus.Byte(32)])
SBC_CODEC = dbus.Byte(0)
properties = dbus.Dictionary({ "UUID": SOURCE_UUID,
"Codec": SBC_CODEC,
"DelayReporting": False,
"Capabilities": SBC_CAPS })
ep = Endpoint(system_bus, eppath, properties, SBC_CONFIG)
media.RegisterEndpoint(eppath, properties)
print(eppath, "registered")
def sig_handler(iface, changedprops, invalidated):
global gtransport, filename, numchans
print("sig received")
for k,v in changedprops.items():
# print(k, v)
if k=='Connected' and v==1:
transport_proxy = system_bus.get_object(bluez_name, gtransport)
ufd, rmtu, wmtu = transport_proxy.Acquire(dbus_interface=TRANSPORT_IFACE)
print("rmtu=", rmtu, "wmtu=", wmtu)
fd = ufd.take()
# print("fd=",fd)
# print(sock.getpeername())
sock = fromfd(fd, AF_BLUETOOTH, SOCK_SEQPACKET)
mainloop.quit()
datapump(sock.fileno(), filename, numchans)
mainloop.run()