cli.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. from __future__ import print_function
  2. from argparse import ArgumentParser
  3. from hashlib import md5
  4. from datetime import datetime
  5. from binascii import hexlify
  6. from threading import Thread
  7. from StringIO import StringIO
  8. from io import BufferedWriter
  9. from subprocess import PIPE
  10. import subprocess
  11. import sqlite3
  12. import signal
  13. import curses
  14. import time
  15. import json
  16. import sys
  17. import os
  18. import re
  19. strptime = datetime.strptime
  20. ARG_DATE_FMT = '%Y-%m-%d %H:%M:%S'
  21. simple_date = lambda s: strptime(s, ARG_DATE_FMT)
  22. re_station = re.compile(r'^/var/fourier/[^/]+/([^/]+)')
  23. def find_hash(hsh, db=None):
  24. cursor = db.cursor()
  25. cursor.execute("select * from file where hash = ?",
  26. (hsh, ),
  27. )
  28. return cursor.fetchone()
  29. def insert_file(hsh, filename, timestamp, db=None, uploaded=None):
  30. cursor = db.cursor()
  31. match = re_station.match(filename)
  32. station = match.group(1)
  33. was_uploaded = int(time.time()) if uploaded else None
  34. cursor.execute("""
  35. insert into file(hash, filename,
  36. timestamp, uploaded,
  37. station)
  38. values(?, ?, ?, ?, ?)
  39. """,
  40. (
  41. hsh,
  42. filename,
  43. int(time.mktime(timestamp.timetuple())),
  44. was_uploaded,
  45. station,
  46. )
  47. )
  48. cursor.close()
  49. def hash_file(filename):
  50. ahash = md5()
  51. with open(filename, 'rb') as fp:
  52. while True:
  53. data = fp.read(4096)
  54. if data:
  55. ahash.update(data)
  56. else:
  57. break
  58. return ahash.hexdigest()
  59. def parse_stations(string):
  60. return string.split(',')
  61. def main():
  62. parser = ArgumentParser()
  63. subparsers = parser.add_subparsers(dest='entity')
  64. dbparser = subparsers.add_parser('db')
  65. dbparser.add_argument('action')
  66. dbparser.add_argument('-from-date',
  67. dest='from_date',
  68. type=simple_date,
  69. )
  70. dbparser.add_argument('-to-date',
  71. dest='to_date',
  72. type=simple_date,
  73. )
  74. dbparser.add_argument('-station', dest='station')
  75. dbparser.add_argument('--dry-run', dest='dry_run',
  76. action='store_const',
  77. const=True,
  78. default=False
  79. )
  80. dbparser.add_argument('--uploaded', action='store_const',
  81. const=True, default=False,
  82. dest='uploaded',
  83. )
  84. dbparser.add_argument('-limit', type=int, dest='limit')
  85. stations_parser = subparsers.add_parser('station')
  86. stations_parser.add_argument('action')
  87. stations_parser.add_argument('stations', nargs='*', default=['98.5'])
  88. stations_parser.add_argument('--play-only', dest='play_only',
  89. action='store_const',
  90. const=True,
  91. default=False
  92. )
  93. stations_parser.add_argument('-output', dest='audio_output',
  94. type=int, default=0
  95. )
  96. args = parser.parse_args()
  97. action = args.action
  98. with open('/etc/fourier-config.json', 'r') as fp:
  99. config = json.loads(fp.read())
  100. device_id = config['device_id']
  101. dbpath = '/var/fourier/{}/files.db'.format(device_id)
  102. if args.entity == 'db':
  103. if action == 'stats':
  104. conn = sqlite3.connect(dbpath)
  105. cursor = conn.cursor()
  106. cursor.execute("select count(*), count(uploaded) from file")
  107. total, uploaded, = cursor.fetchone()
  108. print("total: {}".format(total))
  109. print("uploaded: {}".format(uploaded))
  110. print("pending: {}".format(total - uploaded))
  111. elif action == 'index-files':
  112. counter = 0
  113. already_indexed = 0
  114. multiplo_commit = 200
  115. conn = sqlite3.connect(dbpath)
  116. path = os.path.join('/var/fourier', device_id)
  117. if args.station:
  118. path = os.path.join(path, args.station)
  119. for folder, folders, files in os.walk(path):
  120. for file in files:
  121. if not file.endswith('.mp3'):
  122. continue
  123. filename = os.path.join(folder, file)
  124. dt = datetime.strptime(
  125. file[:19],
  126. '%Y-%m-%dT%H-%M-%S'
  127. )
  128. try:
  129. if args.from_date and args.to_date:
  130. do_insert = dt >= args.from_date\
  131. and dt <= args.to_date
  132. elif args.from_date:
  133. do_insert = dt >= args.from_date
  134. elif args.to_date:
  135. do_insert = dt <= args.to_date
  136. else:
  137. do_insert = True
  138. if do_insert:
  139. thehash = hash_file(filename)
  140. insert_file(thehash, filename, dt,
  141. db=conn,
  142. uploaded=args.uploaded,
  143. )
  144. counter += 1
  145. print(dt)
  146. if counter % multiplo_commit == 0:
  147. conn.commit()
  148. print('commited')
  149. except sqlite3.IntegrityError:
  150. already_indexed += 1
  151. print('already indexed: {}'.format(filename))
  152. except sqlite3.OperationalError:
  153. print('commit failed, retrying in next loop')
  154. except KeyboardInterrupt:
  155. break
  156. if not args.dry_run:
  157. conn.commit()
  158. else:
  159. conn.rollback()
  160. print('\n[WARNING] DRY RUN FINISHED')
  161. print('----------------------------------')
  162. print('total files indexed: {}'.format(counter))
  163. print('total files in existence: {}'.format(already_indexed))
  164. print('----------------------------------')
  165. elif action == 'setup':
  166. if not os.path.isfile(dbpath):
  167. conn = sqlite3.connect(dbpath)
  168. cursor = conn.cursor()
  169. sentences = [
  170. """create table file(
  171. hash text primary key,
  172. station text,
  173. timestamp int,
  174. filename text,
  175. uploaded int
  176. )""",
  177. "create index timestamp_index_desc on file (timestamp desc)",
  178. "create index timestamp_index_asc on file (timestamp desc)",
  179. ]
  180. for query in sentences:
  181. cursor.execute(query)
  182. conn.commit()
  183. else:
  184. print('database already installed')
  185. sys.exit(1)
  186. elif action == 'migrate':
  187. conn = sqlite3.connect(dbpath)
  188. cursor = conn.cursor()
  189. try:
  190. cursor.execute(("select value "
  191. "from config "
  192. "where name = 'version'"))
  193. row = cursor.fetchone()
  194. min_version = row[0] if row else ''
  195. except sqlite3.OperationalError as err:
  196. strerr = str(err)
  197. if "table" in strerr and "config" in strerr:
  198. min_version = ''
  199. else:
  200. print(err)
  201. from fourier.migrations import versions
  202. cursor = conn.cursor()
  203. for item in versions:
  204. v, qrs = item
  205. if v > min_version:
  206. try:
  207. for q in qrs:
  208. cursor.execute(q)
  209. except Exception as ex:
  210. print('failed: {}'.format(ex))
  211. conn.rollback()
  212. elif action == 'connect':
  213. subprocess.call(['sqlite3', dbpath])
  214. elif action == 'add-missing-stations':
  215. conn = sqlite3.connect(dbpath)
  216. cursor = conn.cursor()
  217. query = ('select hash, filename '
  218. 'from file '
  219. 'where station is null ')
  220. if args.limit:
  221. query += ' limit {}'.format(args.limit)
  222. cursor.execute(query)
  223. curup = conn.cursor()
  224. for row in cursor:
  225. the_hash, filename, = row
  226. match = re_station.match(filename)
  227. if match:
  228. station_code = match.group(1)
  229. params = station_code, the_hash,
  230. curup.execute(('update "file" '
  231. 'set "station" = ? '
  232. 'where "hash" = ?'
  233. ), params
  234. )
  235. conn.commit()
  236. elif action == 'align-dates':
  237. conn = sqlite3.connect(dbpath)
  238. cursor = conn.cursor()
  239. query = ('select hash, filename, timestamp '
  240. 'from "file"')
  241. cursor.execute(query)
  242. for row in cursor:
  243. fn = os.path.basename(row['filename'])
  244. ts = datetime.fromtimestamp(row['timestamp'])
  245. fl = datetime.strptime(fn[:19], '%Y-%m-%dT%H-%M-%S')
  246. if fl != ts:
  247. print('{}\t{}\t{}'.format(row['hash'],
  248. row['timestamp'],
  249. row['filename']
  250. ))
  251. elif args.entity == 'station':
  252. if action == 'list':
  253. stations_path = os.path.join('/var/fourier', device_id)
  254. dirs = os.listdir(stations_path)
  255. for dr in dirs:
  256. if '.' not in dr:
  257. print(dr)
  258. elif action == 'record' or action == 'play':
  259. import pyaudio, wave
  260. if action == 'play':
  261. args.play_only = True
  262. audio = pyaudio.PyAudio()
  263. stream = audio.open(format=pyaudio.paInt16,
  264. output_device_index=args.audio_output,
  265. channels=1,
  266. rate=44100,
  267. output=True)
  268. processes = []
  269. transcoders = []
  270. totals = []
  271. basepath = '/var/fourier/tests'
  272. env = os.environ
  273. stream_index = 0
  274. stations = args.stations
  275. receivers_count = len(stations)
  276. if not os.path.isdir(basepath):
  277. os.mkdir(basepath)
  278. test_id = hexlify(os.urandom(4))
  279. for index in range(receivers_count):
  280. filename = os.path.join(basepath, "{}-{}.mp3"\
  281. .format(test_id, index)
  282. )
  283. if not args.play_only:
  284. ffmpeg = subprocess.Popen([
  285. 'ffmpeg', '-f', 's16le', '-i', 'pipe:0',
  286. '-ac', '1', '-ar', '24000',
  287. '-nostdin',
  288. '-f', 'mp3',
  289. filename,
  290. ],
  291. stdin=PIPE,
  292. stdout=PIPE,
  293. stderr=PIPE,
  294. env=env,
  295. preexec_fn=os.setpgrp,
  296. close_fds=True,
  297. )
  298. transcoders.append(ffmpeg)
  299. rtl = subprocess.Popen([
  300. 'rtl_fm', '-M', 'wbfm', '-g', '10',
  301. '-d', str(index),
  302. '-f', '{}M'.format(stations[index]),
  303. '-o', '4',
  304. '-F', '8',
  305. '-p', '200',
  306. '-s', '88200',
  307. '-r', '44100',
  308. ],
  309. stdout=PIPE,
  310. stdin=PIPE,
  311. stderr=PIPE,
  312. env=env,
  313. preexec_fn=os.setpgrp,
  314. close_fds=True,
  315. )
  316. processes.append(rtl)
  317. totals.append(0)
  318. scr = curses.initscr()
  319. scr.addstr(0, 0, 'Sintonizadores')
  320. scr.addstr(receivers_count + 2, 0,
  321. 'Presiona un número para escuchar'
  322. )
  323. if args.play_only:
  324. scr.addstr(receivers_count + 3, 0,
  325. 'Solo reproduciendo'
  326. )
  327. else:
  328. scr.addstr(receivers_count + 4, 0,
  329. 'Reproduciendo y grabando'
  330. )
  331. scr.addstr(receivers_count + 4, 0,
  332. 'ID de prueba: /var/fourier/tests/{}-x.mp3'\
  333. .format(test_id)
  334. )
  335. scr.addstr(receivers_count + 5, 0,
  336. 'Usar --play-only para solo reproducir'
  337. )
  338. scr.addstr(stream_index + 1, 0, '>')
  339. scr.refresh()
  340. scr.timeout(0)
  341. curses.noecho()
  342. curses.curs_set(0)
  343. try:
  344. while 1:
  345. refresh = False
  346. for index in range(receivers_count):
  347. data = processes[index].stdout.read(4096)
  348. if data:
  349. if index == stream_index:
  350. stream.write(data)
  351. if not args.play_only:
  352. transcoders[index].stdin.write(data)
  353. totals[index] += len(data)
  354. if totals[index] % 100 == 0:
  355. scr.addstr(index + 1, 5, '{:01x} {:>4} {}\n\n'\
  356. .format(index, stations[index], totals[index])
  357. )
  358. refresh = True
  359. else:
  360. errout = processes[index].stderr.read(4096)
  361. refresh = True
  362. k = scr.getch()
  363. if k > -1:
  364. key = int(chr(k), 16) if k > 0 else 0
  365. if key >= 0 and key < receivers_count:
  366. scr.addstr(stream_index + 1, 0, ' ')
  367. stream_index = key
  368. refresh = True
  369. if refresh:
  370. scr.addstr(stream_index + 1, 0, '>')
  371. scr.refresh()
  372. except KeyboardInterrupt:
  373. curses.endwin()
  374. for index in range(receivers_count):
  375. if not args.play_only:
  376. transcoders[index].send_signal(signal.SIGINT)
  377. processes[index].send_signal(signal.SIGINT)
  378. except Exception as ex:
  379. curses.endwin()
  380. print(ex)
  381. if __name__ == '__main__':
  382. main()