from __future__ import print_function from argparse import ArgumentParser from hashlib import md5 from datetime import datetime from binascii import hexlify from threading import Thread from StringIO import StringIO from io import BufferedWriter from subprocess import PIPE import subprocess import sqlite3 import signal import curses import time import json import sys import os strptime = datetime.strptime ARG_DATE_FMT = '%Y-%m-%d %H:%M:%S' simple_date = lambda s: strptime(s, ARG_DATE_FMT) def find_hash(hsh, db=None): cursor = db.cursor() cursor.execute("select * from file where hash = ?", (hsh, ), ) return cursor.fetchone() def insert_file(hsh, filename, timestamp, db=None): cursor = db.cursor() cursor.execute(""" insert into file(hash, filename, timestamp) values(?, ?, ?) """, (hsh, filename, int(time.mktime(timestamp.timetuple())), ) ) def hash_file(filename): ahash = md5() with open(filename, 'rb') as fp: while True: data = fp.read(4096) if data: ahash.update(data) else: break return ahash.hexdigest() def parse_stations(string): return string.split(',') def main(): parser = ArgumentParser() subparsers = parser.add_subparsers(dest='entity') dbparser = subparsers.add_parser('db') dbparser.add_argument('action') dbparser.add_argument('-from-date', dest='from_date', type=simple_date, ) dbparser.add_argument('-station', dest='station') dbparser.add_argument('--dry-run', dest='dry_run', action='store_const', const=True, default=False ) stations_parser = subparsers.add_parser('station') stations_parser.add_argument('action') stations_parser.add_argument('stations', nargs='*', default=['98.5']) stations_parser.add_argument('--play-only', dest='play_only', action='store_const', const=True, default=False ) args = parser.parse_args() action = args.action with open('/etc/fourier-config.json', 'r') as fp: config = json.loads(fp.read()) device_id = config['device_id'] dbpath = '/var/fourier/{}/files.db'.format(device_id) if args.entity == 'db': if action == 'stats': conn = sqlite3.connect(dbpath) cursor = conn.cursor() cursor.execute("select count(*), count(uploaded) from file") total, uploaded, = cursor.fetchone() print("total: {}".format(total)) print("uploaded: {}".format(uploaded)) print("pending: {}".format(total - uploaded)) elif action == 'index-files': counter = 0 already_indexed = 0 conn = sqlite3.connect(dbpath) path = os.path.join('/var/fourier', device_id) if args.station: path = os.path.join(path, args.station) for folder, folders, files in os.walk(path): for file in files: if not file.endswith('.mp3'): continue filename = os.path.join(folder, file) dt = datetime.strptime( file[:19], '%Y-%m-%dT%H-%M-%S' ) try: if args.from_date: do_insert = dt >= args.from_date else: do_insert = True if do_insert: thehash = hash_file(filename) insert_file(thehash, filename, dt, db=conn) counter += 1 print(dt) except sqlite3.IntegrityError: already_indexed += 1 print('already indexed: {}'.format(filename)) if not args.dry_run: conn.commit() else: conn.rollback() print('\n[WARNING] DRY RUN FINISHED') print('----------------------------------') print('total files indexed: {}'.format(counter)) print('total files in existence: {}'.format(already_indexed)) print('----------------------------------') elif action == 'setup': if not os.path.isfile(dbpath): conn = sqlite3.connect(dbpath) cursor = conn.cursor() sentences = [ """create table file( hash text primary key, station text, timestamp int, filename text, uploaded int )""", "create index timestamp_index_desc on file (timestamp desc)", "create index timestamp_index_asc on file (timestamp desc)", ] for query in sentences: cursor.execute(query) conn.commit() else: print('database already installed') sys.exit(1) elif args.entity == 'station': if action == 'list': stations_path = os.path.join('/var/fourier', device_id) dirs = os.listdir(stations_path) for dr in dirs: if '.' not in dr: print(dr) elif action == 'record' or action == 'play': import pyaudio, wave if action == 'play': args.play_only = True audio = pyaudio.PyAudio() stream = audio.open(format=pyaudio.paInt16, channels=1, rate=44100, output=True) processes = [] transcoders = [] totals = [] basepath = '/var/fourier/tests' env = os.environ stream_index = 0 stations = args.stations receivers_count = len(stations) if not os.path.isdir(basepath): os.mkdir(basepath) test_id = hexlify(os.urandom(4)) for index in range(receivers_count): filename = os.path.join(basepath, "{}-{}.mp3"\ .format(test_id, index) ) if not args.play_only: ffmpeg = subprocess.Popen([ 'ffmpeg', '-f', 's16le', '-i', 'pipe:0', '-ac', '1', '-ar', '24000', '-ab', '64k', '-nostdin', '-f', 'mp3', filename, ], stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=os.setpgrp, close_fds=True, ) transcoders.append(ffmpeg) rtl = subprocess.Popen([ 'rtl_fm', '-M', 'wbfm', '-g', '10', '-d', str(index), '-f', '{}M'.format(stations[index]), '-o', '4', '-F', '8', '-p', '200', '-s', '88200', '-r', '44100', ], stdout=PIPE, stdin=PIPE, stderr=PIPE, env=env, preexec_fn=os.setpgrp, close_fds=True, ) processes.append(rtl) totals.append(0) scr = curses.initscr() scr.addstr(0, 0, 'Sintonizadores') scr.addstr(receivers_count + 2, 0, 'Presiona un nĂºmero para escuchar' ) if args.play_only: scr.addstr(receivers_count + 3, 0, 'Solo reproduciendo' ) else: scr.addstr(receivers_count + 4, 0, 'Reproduciendo y grabando' ) scr.addstr(receivers_count + 4, 0, 'ID de prueba: /var/fourier/tests/{}-x.mp3'\ .format(test_id) ) scr.addstr(receivers_count + 5, 0, 'Usar --play-only para solo reproducir' ) scr.addstr(stream_index + 1, 0, '>') scr.refresh() scr.timeout(0) curses.noecho() curses.curs_set(0) try: while 1: refresh = False for index in range(receivers_count): data = processes[index].stdout.read(4096) if data: if index == stream_index: stream.write(data) if not args.play_only: transcoders[index].stdin.write(data) totals[index] += len(data) if totals[index] % 10 == 0: scr.addstr(index + 1, 5, '{:01x} {:>4} {}\n\n'\ .format(index, stations[index], totals[index]) ) refresh = True k = scr.getch() if k > -1: key = int(chr(k), 16) if k > 0 else 0 if key >= 0 and key < receivers_count: scr.addstr(stream_index + 1, 0, ' ') stream_index = key refresh = True if refresh: scr.addstr(stream_index + 1, 0, '>') scr.refresh() except KeyboardInterrupt: curses.endwin() for index in range(receivers_count): if not args.play_only: transcoders[index].send_signal(signal.SIGINT) processes[index].send_signal(signal.SIGINT) except Exception as ex: curses.endwin() print(ex) if __name__ == '__main__': main()