| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- from __future__ import print_function
- from argparse import ArgumentParser
- from hashlib import md5
- from datetime import datetime
- from binascii import hexlify
- from threading import Thread
- from StringIO import StringIO
- from io import BufferedWriter
- from subprocess import PIPE
- import subprocess
- import sqlite3
- import signal
- import curses
- import time
- import json
- import sys
- import os
- import re
- strptime = datetime.strptime
- ARG_DATE_FMT = '%Y-%m-%d %H:%M:%S'
- simple_date = lambda s: strptime(s, ARG_DATE_FMT)
- re_station = re.compile(r'^/var/fourier/[^/]+/([^/]+)')
- def find_hash(hsh, db=None):
- cursor = db.cursor()
- cursor.execute("select * from file where hash = ?",
- (hsh, ),
- )
- return cursor.fetchone()
- def insert_file(hsh, filename, timestamp, db=None, uploaded=None):
- cursor = db.cursor()
- match = re_station.match(filename)
- station = match.group(1)
- was_uploaded = int(time.time()) if uploaded else None
- cursor.execute("""
- insert into file(hash, filename,
- timestamp, uploaded,
- station)
- values(?, ?, ?, ?, ?)
- """,
- (
- hsh,
- filename,
- int(time.mktime(timestamp.timetuple())),
- was_uploaded,
- station,
- )
- )
- cursor.close()
- def hash_file(filename):
- ahash = md5()
- with open(filename, 'rb') as fp:
- while True:
- data = fp.read(4096)
- if data:
- ahash.update(data)
- else:
- break
- return ahash.hexdigest()
- def parse_stations(string):
- return string.split(',')
- def main():
- parser = ArgumentParser()
- subparsers = parser.add_subparsers(dest='entity')
-
- dbparser = subparsers.add_parser('db')
- dbparser.add_argument('action')
- dbparser.add_argument('-from-date',
- dest='from_date',
- type=simple_date,
- )
- dbparser.add_argument('-to-date',
- dest='to_date',
- type=simple_date,
- )
- dbparser.add_argument('-station', dest='station')
- dbparser.add_argument('--dry-run', dest='dry_run',
- action='store_const',
- const=True,
- default=False
- )
- dbparser.add_argument('--uploaded', action='store_const',
- const=True, default=False,
- dest='uploaded',
- )
- dbparser.add_argument('-limit', type=int, dest='limit')
- stations_parser = subparsers.add_parser('station')
- stations_parser.add_argument('action')
- stations_parser.add_argument('stations', nargs='*', default=['98.5'])
- stations_parser.add_argument('--play-only', dest='play_only',
- action='store_const',
- const=True,
- default=False
- )
- stations_parser.add_argument('-output', dest='audio_output',
- type=int, default=0
- )
- args = parser.parse_args()
- action = args.action
- with open('/etc/fourier-config.json', 'r') as fp:
- config = json.loads(fp.read())
- device_id = config['device_id']
- dbpath = '/var/fourier/{}/files.db'.format(device_id)
- if args.entity == 'db':
- if action == 'stats':
- conn = sqlite3.connect(dbpath)
- cursor = conn.cursor()
- cursor.execute("select count(*), count(uploaded) from file")
- total, uploaded, = cursor.fetchone()
- print("total: {}".format(total))
- print("uploaded: {}".format(uploaded))
- print("pending: {}".format(total - uploaded))
- elif action == 'index-files':
- counter = 0
- already_indexed = 0
- multiplo_commit = 200
- conn = sqlite3.connect(dbpath)
- path = os.path.join('/var/fourier', device_id)
- if args.station:
- path = os.path.join(path, args.station)
- for folder, folders, files in os.walk(path):
- for file in files:
- if not file.endswith('.mp3'):
- continue
- filename = os.path.join(folder, file)
- dt = datetime.strptime(
- file[:19],
- '%Y-%m-%dT%H-%M-%S'
- )
- try:
- if args.from_date and args.to_date:
- do_insert = dt >= args.from_date\
- and dt <= args.to_date
- elif args.from_date:
- do_insert = dt >= args.from_date
- elif args.to_date:
- do_insert = dt <= args.to_date
- else:
- do_insert = True
- if do_insert:
- thehash = hash_file(filename)
- insert_file(thehash, filename, dt,
- db=conn,
- uploaded=args.uploaded,
- )
- counter += 1
- print(dt)
- if counter % multiplo_commit == 0:
- conn.commit()
- print('commited')
- except sqlite3.IntegrityError:
- already_indexed += 1
- print('already indexed: {}'.format(filename))
- except sqlite3.OperationalError:
- print('commit failed, retrying in next loop')
- except KeyboardInterrupt:
- break
- if not args.dry_run:
- conn.commit()
- else:
- conn.rollback()
- print('\n[WARNING] DRY RUN FINISHED')
- print('----------------------------------')
- print('total files indexed: {}'.format(counter))
- print('total files in existence: {}'.format(already_indexed))
- print('----------------------------------')
- elif action == 'setup':
- if not os.path.isfile(dbpath):
- conn = sqlite3.connect(dbpath)
- cursor = conn.cursor()
- sentences = [
- """create table file(
- hash text primary key,
- station text,
- timestamp int,
- filename text,
- uploaded int
- )""",
- "create index timestamp_index_desc on file (timestamp desc)",
- "create index timestamp_index_asc on file (timestamp desc)",
- ]
- for query in sentences:
- cursor.execute(query)
- conn.commit()
- else:
- print('database already installed')
- sys.exit(1)
- elif action == 'migrate':
- conn = sqlite3.connect(dbpath)
- cursor = conn.cursor()
- try:
- cursor.execute(("select value "
- "from config "
- "where name = 'version'"))
- row = cursor.fetchone()
- min_version = row[0] if row else ''
- except sqlite3.OperationalError as err:
- strerr = str(err)
- if "table" in strerr and "config" in strerr:
- min_version = ''
- else:
- print(err)
- from fourier.migrations import versions
- cursor = conn.cursor()
- for item in versions:
- v, qrs = item
- if v > min_version:
- try:
- for q in qrs:
- cursor.execute(q)
- except Exception as ex:
- print('failed: {}'.format(ex))
- conn.rollback()
- elif action == 'connect':
- subprocess.call(['sqlite3', dbpath])
-
- elif action == 'add-missing-stations':
- conn = sqlite3.connect(dbpath)
- cursor = conn.cursor()
- query = ('select hash, filename '
- 'from file '
- 'where station is null ')
- if args.limit:
- query += ' limit {}'.format(args.limit)
- cursor.execute(query)
- curup = conn.cursor()
- for row in cursor:
- the_hash, filename, = row
- match = re_station.match(filename)
- if match:
- station_code = match.group(1)
- params = station_code, the_hash,
- curup.execute(('update "file" '
- 'set "station" = ? '
- 'where "hash" = ?'
- ), params
- )
-
- conn.commit()
- elif action == 'align-dates':
- conn = sqlite3.connect(dbpath)
- cursor = conn.cursor()
- query = ('select hash, filename, timestamp '
- 'from "file"')
- cursor.execute(query)
- for row in cursor:
- fn = os.path.basename(row['filename'])
- ts = datetime.fromtimestamp(row['timestamp'])
- fl = datetime.strptime(fn[:19], '%Y-%m-%dT%H-%M-%S')
- if fl != ts:
- print('{}\t{}\t{}'.format(row['hash'],
- row['timestamp'],
- row['filename']
- ))
- elif args.entity == 'station':
- if action == 'list':
- stations_path = os.path.join('/var/fourier', device_id)
- dirs = os.listdir(stations_path)
- for dr in dirs:
- if '.' not in dr:
- print(dr)
- elif action == 'record' or action == 'play':
- import pyaudio, wave
- if action == 'play':
- args.play_only = True
- audio = pyaudio.PyAudio()
- stream = audio.open(format=pyaudio.paInt16,
- output_device_index=args.audio_output,
- channels=1,
- rate=44100,
- output=True)
-
- processes = []
- transcoders = []
- totals = []
- basepath = '/var/fourier/tests'
- env = os.environ
- stream_index = 0
- stations = args.stations
- receivers_count = len(stations)
- if not os.path.isdir(basepath):
- os.mkdir(basepath)
- test_id = hexlify(os.urandom(4))
-
- for index in range(receivers_count):
- filename = os.path.join(basepath, "{}-{}.mp3"\
- .format(test_id, index)
- )
- if not args.play_only:
- ffmpeg = subprocess.Popen([
- 'ffmpeg', '-f', 's16le', '-i', 'pipe:0',
- '-ac', '1', '-ar', '24000',
- '-nostdin',
- '-f', 'mp3',
- filename,
- ],
- stdin=PIPE,
- stdout=PIPE,
- stderr=PIPE,
- env=env,
- preexec_fn=os.setpgrp,
- close_fds=True,
- )
- transcoders.append(ffmpeg)
- rtl = subprocess.Popen([
- 'rtl_fm', '-M', 'wbfm', '-g', '10',
- '-d', str(index),
- '-f', '{}M'.format(stations[index]),
- '-o', '4',
- '-F', '8',
- '-p', '200',
- '-s', '88200',
- '-r', '44100',
- ],
- stdout=PIPE,
- stdin=PIPE,
- stderr=PIPE,
- env=env,
- preexec_fn=os.setpgrp,
- close_fds=True,
- )
-
- processes.append(rtl)
- totals.append(0)
- scr = curses.initscr()
- scr.addstr(0, 0, 'Sintonizadores')
- scr.addstr(receivers_count + 2, 0,
- 'Presiona un número para escuchar'
- )
- if args.play_only:
- scr.addstr(receivers_count + 3, 0,
- 'Solo reproduciendo'
- )
- else:
- scr.addstr(receivers_count + 4, 0,
- 'Reproduciendo y grabando'
- )
- scr.addstr(receivers_count + 4, 0,
- 'ID de prueba: /var/fourier/tests/{}-x.mp3'\
- .format(test_id)
- )
- scr.addstr(receivers_count + 5, 0,
- 'Usar --play-only para solo reproducir'
- )
- scr.addstr(stream_index + 1, 0, '>')
- scr.refresh()
- scr.timeout(0)
- curses.noecho()
- curses.curs_set(0)
- try:
- while 1:
- refresh = False
- for index in range(receivers_count):
- data = processes[index].stdout.read(4096)
- if data:
- if index == stream_index:
- stream.write(data)
- if not args.play_only:
- transcoders[index].stdin.write(data)
- totals[index] += len(data)
- if totals[index] % 100 == 0:
- scr.addstr(index + 1, 5, '{:01x} {:>4} {}\n\n'\
- .format(index, stations[index], totals[index])
- )
- refresh = True
- else:
- errout = processes[index].stderr.read(4096)
- refresh = True
- k = scr.getch()
- if k > -1:
- key = int(chr(k), 16) if k > 0 else 0
- if key >= 0 and key < receivers_count:
- scr.addstr(stream_index + 1, 0, ' ')
- stream_index = key
- refresh = True
- if refresh:
- scr.addstr(stream_index + 1, 0, '>')
- scr.refresh()
-
- except KeyboardInterrupt:
- curses.endwin()
- for index in range(receivers_count):
- if not args.play_only:
- transcoders[index].send_signal(signal.SIGINT)
- processes[index].send_signal(signal.SIGINT)
-
- except Exception as ex:
- curses.endwin()
- print(ex)
- if __name__ == '__main__':
- main()
|