from __future__ import print_function from argparse import ArgumentParser from hashlib import md5 from datetime import datetime from binascii import hexlify from threading import Thread from StringIO import StringIO from io import BufferedWriter from subprocess import PIPE import subprocess import sqlite3 import signal import curses import time import json import sys import os import re strptime = datetime.strptime ARG_DATE_FMT = '%Y-%m-%d %H:%M:%S' simple_date = lambda s: strptime(s, ARG_DATE_FMT) re_station = re.compile(r'^/var/fourier/[^/]+/([^/]+)') def find_hash(hsh, db=None): cursor = db.cursor() cursor.execute("select * from file where hash = ?", (hsh, ), ) return cursor.fetchone() def insert_file(hsh, filename, timestamp, db=None, uploaded=None): cursor = db.cursor() match = re_station.match(filename) station = match.group(1) was_uploaded = int(time.time()) if uploaded else None cursor.execute(""" insert into file(hash, filename, timestamp, uploaded, station) values(?, ?, ?, ?, ?) """, ( hsh, filename, int(time.mktime(timestamp.timetuple())), was_uploaded, station, ) ) cursor.close() def hash_file(filename): ahash = md5() with open(filename, 'rb') as fp: while True: data = fp.read(4096) if data: ahash.update(data) else: break return ahash.hexdigest() def parse_stations(string): return string.split(',') def main(): parser = ArgumentParser() subparsers = parser.add_subparsers(dest='entity') dbparser = subparsers.add_parser('db') dbparser.add_argument('action') dbparser.add_argument('-from-date', dest='from_date', type=simple_date, ) dbparser.add_argument('-to-date', dest='to_date', type=simple_date, ) dbparser.add_argument('-station', dest='station') dbparser.add_argument('--dry-run', dest='dry_run', action='store_const', const=True, default=False ) dbparser.add_argument('--uploaded', action='store_const', const=True, default=False, dest='uploaded', ) dbparser.add_argument('-limit', type=int, dest='limit') stations_parser = subparsers.add_parser('station') stations_parser.add_argument('action') stations_parser.add_argument('stations', nargs='*', default=['98.5']) stations_parser.add_argument('--play-only', dest='play_only', action='store_const', const=True, default=False ) stations_parser.add_argument('-output', dest='audio_output', type=int, default=0 ) args = parser.parse_args() action = args.action with open('/etc/fourier-config.json', 'r') as fp: config = json.loads(fp.read()) device_id = config['device_id'] dbpath = '/var/fourier/{}/files.db'.format(device_id) if args.entity == 'db': if action == 'stats': conn = sqlite3.connect(dbpath) cursor = conn.cursor() cursor.execute("select count(*), count(uploaded) from file") total, uploaded, = cursor.fetchone() print("total: {}".format(total)) print("uploaded: {}".format(uploaded)) print("pending: {}".format(total - uploaded)) elif action == 'index-files': counter = 0 already_indexed = 0 multiplo_commit = 200 conn = sqlite3.connect(dbpath) path = os.path.join('/var/fourier', device_id) if args.station: path = os.path.join(path, args.station) for folder, folders, files in os.walk(path): for file in files: if not file.endswith('.mp3'): continue filename = os.path.join(folder, file) dt = datetime.strptime( file[:19], '%Y-%m-%dT%H-%M-%S' ) try: if args.from_date and args.to_date: do_insert = dt >= args.from_date\ and dt <= args.to_date elif args.from_date: do_insert = dt >= args.from_date elif args.to_date: do_insert = dt <= args.to_date else: do_insert = True if do_insert: thehash = hash_file(filename) insert_file(thehash, filename, dt, db=conn, uploaded=args.uploaded, ) counter += 1 print(dt) if counter % multiplo_commit == 0: conn.commit() print('commited') except sqlite3.IntegrityError: already_indexed += 1 print('already indexed: {}'.format(filename)) except sqlite3.OperationalError: print('commit failed, retrying in next loop') except KeyboardInterrupt: break if not args.dry_run: conn.commit() else: conn.rollback() print('\n[WARNING] DRY RUN FINISHED') print('----------------------------------') print('total files indexed: {}'.format(counter)) print('total files in existence: {}'.format(already_indexed)) print('----------------------------------') elif action == 'setup': if not os.path.isfile(dbpath): conn = sqlite3.connect(dbpath) cursor = conn.cursor() sentences = [ """create table file( hash text primary key, station text, timestamp int, filename text, uploaded int )""", "create index timestamp_index_desc on file (timestamp desc)", "create index timestamp_index_asc on file (timestamp desc)", ] for query in sentences: cursor.execute(query) conn.commit() else: print('database already installed') sys.exit(1) elif action == 'migrate': conn = sqlite3.connect(dbpath) cursor = conn.cursor() try: cursor.execute(("select value " "from config " "where name = 'version'")) row = cursor.fetchone() min_version = row[0] if row else '' except sqlite3.OperationalError as err: strerr = str(err) if "table" in strerr and "config" in strerr: min_version = '' else: print(err) from fourier.migrations import versions cursor = conn.cursor() for item in versions: v, qrs = item if v > min_version: try: for q in qrs: cursor.execute(q) except Exception as ex: print('failed: {}'.format(ex)) conn.rollback() elif action == 'connect': subprocess.call(['sqlite3', dbpath]) elif action == 'add-missing-stations': conn = sqlite3.connect(dbpath) cursor = conn.cursor() query = ('select hash, filename ' 'from file ' 'where station is null ') if args.limit: query += ' limit {}'.format(args.limit) cursor.execute(query) curup = conn.cursor() for row in cursor: the_hash, filename, = row match = re_station.match(filename) if match: station_code = match.group(1) params = station_code, the_hash, curup.execute(('update "file" ' 'set "station" = ? ' 'where "hash" = ?' ), params ) conn.commit() elif args.entity == 'station': if action == 'list': stations_path = os.path.join('/var/fourier', device_id) dirs = os.listdir(stations_path) for dr in dirs: if '.' not in dr: print(dr) elif action == 'record' or action == 'play': import pyaudio, wave if action == 'play': args.play_only = True audio = pyaudio.PyAudio() stream = audio.open(format=pyaudio.paInt16, output_device_index=args.audio_output, channels=1, rate=44100, output=True) processes = [] transcoders = [] totals = [] basepath = '/var/fourier/tests' env = os.environ stream_index = 0 stations = args.stations receivers_count = len(stations) if not os.path.isdir(basepath): os.mkdir(basepath) test_id = hexlify(os.urandom(4)) for index in range(receivers_count): filename = os.path.join(basepath, "{}-{}.mp3"\ .format(test_id, index) ) if not args.play_only: ffmpeg = subprocess.Popen([ 'ffmpeg', '-f', 's16le', '-i', 'pipe:0', '-ac', '1', '-ar', '24000', '-nostdin', '-f', 'mp3', filename, ], stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=os.setpgrp, close_fds=True, ) transcoders.append(ffmpeg) rtl = subprocess.Popen([ 'rtl_fm', '-M', 'wbfm', '-g', '10', '-d', str(index), '-f', '{}M'.format(stations[index]), '-o', '4', '-F', '8', '-p', '200', '-s', '88200', '-r', '44100', ], stdout=PIPE, stdin=PIPE, stderr=PIPE, env=env, preexec_fn=os.setpgrp, close_fds=True, ) processes.append(rtl) totals.append(0) scr = curses.initscr() scr.addstr(0, 0, 'Sintonizadores') scr.addstr(receivers_count + 2, 0, 'Presiona un nĂºmero para escuchar' ) if args.play_only: scr.addstr(receivers_count + 3, 0, 'Solo reproduciendo' ) else: scr.addstr(receivers_count + 4, 0, 'Reproduciendo y grabando' ) scr.addstr(receivers_count + 4, 0, 'ID de prueba: /var/fourier/tests/{}-x.mp3'\ .format(test_id) ) scr.addstr(receivers_count + 5, 0, 'Usar --play-only para solo reproducir' ) scr.addstr(stream_index + 1, 0, '>') scr.refresh() scr.timeout(0) curses.noecho() curses.curs_set(0) try: while 1: refresh = False for index in range(receivers_count): data = processes[index].stdout.read(4096) if data: if index == stream_index: stream.write(data) if not args.play_only: transcoders[index].stdin.write(data) totals[index] += len(data) if totals[index] % 100 == 0: scr.addstr(index + 1, 5, '{:01x} {:>4} {}\n\n'\ .format(index, stations[index], totals[index]) ) refresh = True else: errout = processes[index].stderr.read(4096) refresh = True k = scr.getch() if k > -1: key = int(chr(k), 16) if k > 0 else 0 if key >= 0 and key < receivers_count: scr.addstr(stream_index + 1, 0, ' ') stream_index = key refresh = True if refresh: scr.addstr(stream_index + 1, 0, '>') scr.refresh() except KeyboardInterrupt: curses.endwin() for index in range(receivers_count): if not args.play_only: transcoders[index].send_signal(signal.SIGINT) processes[index].send_signal(signal.SIGINT) except Exception as ex: curses.endwin() print(ex) if __name__ == '__main__': main()