#!/usr/bin/env python3
import time
import soundfile as sf
import io
import logging
import sys
import json
import librosa
import glob
import requests
from multiprocessing.dummy import Pool as ThreadPool
import random
# 3cffc8449fc0
SLEEP_BETTER_WAIT_SEC = 0.5
SLEEP_WAIT_FOR_RESULT_SEC = 0.5
MAX_N_TRIES_FOR_RESULT = 10000
MAX_NUM_PARALLEL_REQS = 100
INFO_REQUESTS_TIMEOUT_SEC = 500
UPLOAD_REQUEST_TIMEOUT_SEC = 600
def load_wav_file(filename: str):
# with open(filename, "rb") as f:
# return f.read()
wav, sr = librosa.load(filename, sr=8000, mono=True)
output_buffer = io.BytesIO()
sf.write(file=output_buffer, data=wav, samplerate=sr,
subtype='PCM_16', format="WAV")
output_buffer.seek(0)
data = output_buffer.read()
return data
def sleep_with_rand(s, rel_delta=0.1):
# to prevent synchronized requests
abs_delta = s*rel_delta
rs = random.uniform(s - abs_delta, s + abs_delta)
time.sleep(rs)
def recognize_file(session: requests.Session, server: str, wav_fnm: str) -> str:
try:
# Wait for server capacity.
while True:
with session.get(server+'/status', timeout=INFO_REQUESTS_TIMEOUT_SEC) as r:
r.raise_for_status()
rj = r.json()
better_wait = rj['better_wait']
if not better_wait:
break
else:
sleep_with_rand(SLEEP_BETTER_WAIT_SEC)
# Load wav
# may be, it will be better to load wav before better_wait is True, to better load the server.
wav_bytes = load_wav_file(wav_fnm)
# submit task
with session.post(
server+'/task/create?is_word_level=true',
files={'file': ('wav', wav_bytes)},
timeout=UPLOAD_REQUEST_TIMEOUT_SEC
) as r:
r.raise_for_status()
rj = r.json()
if rj['status'] != 'ok':
logging.error(
'ERROR in /task/create: wav_fnm=%s response.text=%s', wav_fnm, r.text)
return wav_fnm+' <ERROR_CREATE_' + rj['status']+'>'
else:
task_id = rj['task_id']
# wait for result
n_tries = 0
while True:
sleep_with_rand(SLEEP_WAIT_FOR_RESULT_SEC)
with session.get(
server + '/task/result',
params={'task_id': task_id},
timeout=INFO_REQUESTS_TIMEOUT_SEC
) as r:
r.raise_for_status()
rj = r.json()
if rj['status'] == 'ok':
result = rj['result']
result_timing = rj['result_word_level']
with open(f"{wav_fnm}.txt", mode="w", encoding="utf8") as f:
f.write(result)
with open(f"{wav_fnm}.json", mode="w", encoding="utf8") as f:
json.dump(fp=f, obj=result_timing, ensure_ascii=False)
logging.info(
'DONE: wav_fnm=%s response.text=%s', wav_fnm, r.text)
return wav_fnm + ' ' + result
elif rj['status'] == 'key_not_found':
logging.error(
'ERROR: key_not_found. wav_fnm=%s task_id=%s', wav_fnm, task_id)
return wav_fnm + ' <ERROR_key_not_found>'
else:
n_tries += 1
if n_tries > MAX_N_TRIES_FOR_RESULT:
logging.error(
'CLIENT result timeout : wav_fnm=%s response.text=%s', wav_fnm, r.text)
return wav_fnm + ' <ERROR_RESULT_TIMEOUT>'
except requests.exceptions.RequestException as err:
logging.error('REQUESTS ERROR : wav_fnm=%s exception=%s',
wav_fnm, str(err))
return wav_fnm+' <ERROR=' + str(err).replace(' ', '_') + '>'
except IOError as err:
logging.error('FILE IO ERROR: wav_fnm=%s exception=%s',
wav_fnm, str(err))
return wav_fnm+' <ERROR=' + str(err).replace(' ', '_') + '>'
except Exception as err:
logging.error('FILE ERROR: wav_fnm=%s exception=%s',
wav_fnm, str(err))
return wav_fnm+' <ERROR=' + str(err).replace(' ', '_') + '>'
if __name__ == '__main__':
if len(sys.argv) < 1+2:
print('ERROR in cmd line \n'
'Syntax: ./test_rest_files.py server:port wav1 wav2 ... \n'
'For example: ./test_rest_files.py http://127.0.0.1:8000 wav1.wav\n'
' or ./test_rest_files.py http://127.0.0.1:8000 wavs/*.wav')
sys.exit(1)
logging.basicConfig(filename=sys.argv[0]+'.log',
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO)
logging.info('started ...')
server = "http://172.17.0.2:8000"
wavs = glob.glob(
"/home/anna/Repos/data4(kirill_big_date)_severstal/audio/records_feb2022/audio_rec/*.mp3")
with requests.Session() as session:
# detecting maximum number of parallel requests which server can allow
with session.get(server + '/status', timeout=INFO_REQUESTS_TIMEOUT_SEC) as r:
r.raise_for_status()
rj = r.json()
server_num_recognizers = rj['num_recognizers']
server_wav_sample_rate = rj['wav_sample_rate']
pool_size = min(MAX_NUM_PARALLEL_REQS, server_num_recognizers)
logging.info("creating pool for tasks=%d; req. sample rate=%d",
server_num_recognizers, server_wav_sample_rate)
# creating pool and submit recognition tasks
with ThreadPool(pool_size) as pool:
def rec_file_wrapper(wav_fnm: str) -> str:
return recognize_file(session, server, wav_fnm)
results = pool.map(rec_file_wrapper, wavs)
logging.info('... done')