Intro
구글 솔루션 챌린지를 위해 진행하는 이번 프로젝트는 청각장애인을 위한 혁신적인 양방향 통역기를 개발하는 것을 목표로 하고 있다. 이 어플리케이션은 카메라를 통해 수어를 인식하고, 이를 Google의 Text-to-Speech(TTS)를 통해 텍스트로 변환하여 일반인에게 음성으로 전달한다. 또한, 일반인의 말을 Google의 Speech-to-Text(STT) 기술을 이용하여 텍스트로 변환해서 청각장애인에게 보여줌으로써, 소통의 장벽을 없애는 데 중점을 두고 있다. 이 기술은 청각장애인이 일상 생활에서 더욱 원활하게 소통할 수 있게 하며, 모두가 접근 가능한 커뮤니케이션 환경을 조성하는 데 기여할 것으로 기대된다.
STT
음성 인식 기술, 흔히 STT(Speech-to-Text)라고 불리는 이 기술은 말소리를 인식하여 텍스트 형태로 변환하는 과정을 말한다. 이 기술은 AI의 발전에 힘입어 크게 발전했다. STT 기술은 다양한 언어와 방언, 그리고 말하는 방식을 인식할 수 있도록 설계되어 있다. 이는 디지털 어시스턴트, 음성 명령 시스템, 자동 자막 생성 등 여러 분야에서 활용된다. STT 시스템은 크게 두 단계로 작동한다. 첫째, 음성 인식 단계에서는 사람의 말소리를 디지털 신호로 변환한다. 이후, 이 신호를 분석하여 언어의 구조와 문법에 따라 단어나 문장으로 변환한다. 둘째, 텍스트 변환 단계에서는 인식된 단어를 문맥에 맞게 텍스트로 전환한다. 이 기술의 정확도는 배경 소음, 발화자의 명료성, 방언, 언어의 복잡성 등 다양한 요소에 영향을 받는다. 최근에는 딥 러닝과 신경망 기술을 활용하여 이러한 문제들을 상당 부분 해결하고 있다. STT 기술의 발전은 특히 장애를 가진 사람들에게 효과적인 의사소통 수단을 제공하며, 교육, 보건, 법률 등 다양한 분야에서 효율성과 접근성을 개선하는 데 기여하고 있다.
Google Speech-to-Text
구글 대회인만큼 Google의 Speech-to-Text api를 이용하기로 하였고 우선 api 테스트를 해보려한다.
아래의 블로그를 참고하여 설치를 진행하였다. 자세한 설명을 통해 큰 도움이 되었다.
모든 설치를 완료하였고 아래 예시 코드를 통해 테스트를 진행해보았다. 구글에서 제공하는 예시 코드이나 문제가 있어서 수정하여 실행하였다.
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Cloud Speech API sample application using the streaming API.
NOTE: This module requires the additional dependency `pyaudio`. To install
using pip:
pip install pyaudio
Example usage:
python transcribe_streaming_mic.py
"""
# [START speech_transcribe_streaming_mic]
import queue
import re
import sys
from google.cloud import speech
import pyaudio
# Audio recording parameters
RATE = 16000
CHUNK = int(RATE / 10) # 100ms
class MicrophoneStream:
"""Opens a recording stream as a generator yielding the audio chunks."""
def __init__(self: object, rate: int = RATE, chunk: int = CHUNK) -> None:
"""The audio -- and generator -- is guaranteed to be on the main thread."""
self._rate = rate
self._chunk = chunk
# Create a thread-safe buffer of audio data
self._buff = queue.Queue()
self.closed = True
def __enter__(self: object) -> object:
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
# The API currently only supports 1-channel (mono) audio
# https://goo.gl/z757pE
channels=1,
rate=self._rate,
input=True,
frames_per_buffer=self._chunk,
# Run the audio stream asynchronously to fill the buffer object.
# This is necessary so that the input device's buffer doesn't
# overflow while the calling thread makes network requests, etc.
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(
self: object,
type: object,
value: object,
traceback: object,
) -> None:
"""Closes the stream, regardless of whether the connection was lost or not."""
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
# Signal the generator to terminate so that the client's
# streaming_recognize method will not block the process termination.
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(
self: object,
in_data: object,
frame_count: int,
time_info: object,
status_flags: object,
) -> object:
"""Continuously collect data from the audio stream, into the buffer.
Args:
in_data: The audio data as a bytes object
frame_count: The number of frames captured
time_info: The time information
status_flags: The status flags
Returns:
The audio data as a bytes object
"""
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self: object) -> object:
"""Generates audio chunks from the stream of audio data in chunks.
Args:
self: The MicrophoneStream object
Returns:
A generator that outputs audio chunks.
"""
while not self.closed:
# Use a blocking get() to ensure there's at least one chunk of
# data, and stop iteration if the chunk is None, indicating the
# end of the audio stream.
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
# Now consume whatever other data's still buffered.
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b"".join(data)
def listen_print_loop(responses: object) -> str:
"""Iterates through server responses and prints them.
The responses passed is a generator that will block until a response
is provided by the server.
Each response may contain multiple results, and each result may contain
multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
print only the transcription for the top alternative of the top result.
In this case, responses are provided for interim results as well. If the
response is an interim one, print a line feed at the end of it, to allow
the next result to overwrite it, until the response is a final one. For the
final one, print a newline to preserve the finalized transcription.
Args:
responses: List of server responses
Returns:
The transcribed text.
"""
num_chars_printed = 0
for response in responses:
if not response.results:
continue
# The `results` list is consecutive. For streaming, we only care about
# the first result being considered, since once it's `is_final`, it
# moves on to considering the next utterance.
result = response.results[0]
if not result.alternatives:
continue
# Display the transcription of the top alternative.
transcript = result.alternatives[0].transcript
# Display interim results, but with a carriage return at the end of the
# line, so subsequent lines will overwrite them.
#
# If the previous result was longer than this one, we need to print
# some extra spaces to overwrite the previous result
overwrite_chars = " " * (num_chars_printed - len(transcript))
if not result.is_final:
sys.stdout.write(transcript + overwrite_chars + "\r")
sys.stdout.flush()
num_chars_printed = len(transcript)
else:
print(transcript + overwrite_chars)
if re.search(r"\b(종료)\b", transcript, re.I):
print("Exiting..")
return transcript # 특정 조건에서만 반환
num_chars_printed = 0
return "" # 모든 응답을 처리한 후 빈 문자열 반환
def main() -> None:
"""Transcribe speech from audio file."""
# See http://g.co/cloud/speech/docs/languages
# for a list of supported languages.
language_code = "ko-KR" # a BCP-47 language tag
client = speech.SpeechClient()
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=RATE,
language_code=language_code,
)
streaming_config = speech.StreamingRecognitionConfig(
config=config, interim_results=True
)
with MicrophoneStream(RATE, CHUNK) as stream:
audio_generator = stream.generator()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)
responses = client.streaming_recognize(streaming_config, requests)
# Now, put the transcription responses to use.
listen_print_loop(responses)
if __name__ == "__main__":
main()
# [END speech_transcribe_streaming_mic]
영상에서 보이듯 제대로 출력된다.
'Coloring (Additional Study) > Contest' 카테고리의 다른 글
Google Solution Challenge 2024 - Voice-Bridge (0) | 2024.02.28 |
---|---|
DCC 수상 후기 (2) | 2023.12.11 |
대구 교통사고 피해 예측 AI 경진대회 - (1) (3) | 2023.11.19 |
DCC 전이학습 (2) | 2023.11.03 |
DCC 한국음식 분류 모델 (0) | 2023.10.29 |