Module rtc
Classes
class AudioStream
-
Expand source code
class AudioStream: def __init__(self) -> None: self.queue: asyncio.Queue = asyncio.Queue() def __aiter__(self) -> AsyncIterator[PcmAudioFrame]: return self async def __anext__(self) -> PcmAudioFrame: item = await self.queue.get() if item is None: raise StopAsyncIteration return item
class Channel (rtc: RtcEngine, options: RtcOptions)
-
Expand source code
class Channel(): def __init__( self, rtc: "RtcEngine", options: RtcOptions ) -> None: self.loop = asyncio.get_event_loop() # Create the event emitter self.emitter = AsyncIOEventEmitter(self.loop) self.options = options self.remote_users = dict[int, Any]() self.rtc = rtc self.chat = Chat(self) self.channelId = options.channel_name self.uid = options.uid self.token = options.build_token(rtc.appid, rtc.appcert) if rtc.appcert else "" conn_config = RTCConnConfig( client_role_type=ClientRoleType.CLIENT_ROLE_BROADCASTER, channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING, ) self.connection = self.rtc.agora_service.create_rtc_connection(conn_config) self.channel_event_observer = ChannelEventObserver(self.emitter, options=options,) self.connection.register_observer(self.channel_event_observer) self.local_user = self.connection.get_local_user() self.local_user.set_playback_audio_frame_before_mixing_parameters( options.channels, options.sample_rate ) self.local_user.register_local_user_observer(self.channel_event_observer) self.local_user.register_audio_frame_observer(self.channel_event_observer) # self.local_user.subscribe_all_audio() self.media_node_factory = self.rtc.agora_service.create_media_node_factory() self.audio_pcm_data_sender = ( self.media_node_factory.create_audio_pcm_data_sender() ) self.audio_track = self.rtc.agora_service.create_custom_audio_track_pcm( self.audio_pcm_data_sender ) self.audio_track.set_enabled(1) self.local_user.publish_audio(self.audio_track) self.stream_id = self.connection.create_data_stream(False, False) self.received_chunks = {} self.waiting_message = None self.msg_id = "" self.msg_index = "" self.on("user_joined", lambda agora_rtc_conn, user_id: self.remote_users.update({user_id: True})) self.on("user_left", lambda agora_rtc_conn, user_id, reason: self.remote_users.pop(user_id, None)) def handle_audio_subscribe_state_changed(agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state): if new_state == 3: # Successfully subscribed self.channel_event_observer.audio_streams.update({user_id: AudioStream()}) elif new_state == 0: self.channel_event_observer.audio_streams.pop(user_id, None) self.on("audio_subscribe_state_changed", handle_audio_subscribe_state_changed) async def connect(self) -> None: """ Connects to a channel. Parameters: channelId: The channel ID. uid: The user ID. Returns: Channel: The connected channel. """ future = asyncio.Future() def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): logger.info(f"Connection state changed: {conn_info.state}") if conn_info.state == 3: # Connection successful future.set_result(None) elif conn_info.state == 5: # Connection failed future.set_exception(Exception(f"Connection failed with state: {conn_info.state}")) self.on("connection_state_changed", callback) self.connection.connect(self.token, self.channelId, self.uid) try: await future except Exception as e: raise Exception(f"Failed to connect to channel {self.channelId}: {str(e)}") from e finally: self.off("connection_state_changed", callback) async def disconnect(self) -> None: """ Disconnects the channel. """ disconnected_future = asyncio.Future[None]() def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): self.off("connection_state_changed", callback) if conn_info.state == 1: disconnected_future.set_result(None) self.on("connection_state_changed", callback) self.connection.disconnect() await disconnected_future def get_audio_frames(self, uid: int) -> AudioStream: """ Returns the audio frames from the channel. Returns: AudioStream: The audio stream. """ return self.channel_event_observer.audio_streams[uid] async def push_audio_frame(self, frame: bytes) -> None: """ Pushes an audio frame to the channel. Parameters: frame: The audio frame to push. """ audio_frame = PcmAudioFrame() audio_frame.data = bytearray(frame) audio_frame.timestamp = 0 audio_frame.bytes_per_sample = 2 audio_frame.number_of_channels = self.options.channels audio_frame.sample_rate = self.options.sample_rate audio_frame.samples_per_channel = int( len(frame) / audio_frame.bytes_per_sample / audio_frame.number_of_channels ) self.audio_pcm_data_sender.send_audio_pcm_data(audio_frame) async def subscribe_audio(self, uid: int) -> None: """ Subscribes to the audio of a user. Parameters: uid: The user ID to subscribe to. """ future = asyncio.Future() def callback( agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state, ): if new_state == 3: # Successfully subscribed future.set_result(None) # elif new_state == 1: # Subscription failed # future.set_exception( # Exception( # f"Failed to subscribe {user_id} audio: state changed from {old_state} to {new_state}" # ) # ) self.on("audio_subscribe_state_changed", callback) self.local_user.subscribe_audio(uid) try: await future except Exception as e: raise Exception(f"Audio subscription failed for user {uid}: {str(e)}") from e finally: self.off("audio_subscribe_state_changed", callback) async def unsubscribe_audio(self, uid: int) -> None: """ Unsubscribes from the audio of a user. Parameters: uid: The user ID to unsubscribe from. """ future = asyncio.Future() def callback( agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state, ): if new_state == 3: # Successfully unsubscribed future.set_result(None) else: # Failed to unsubscribe future.set_exception( Exception( f"Failed to unsubscribe {user_id} audio: state changed from {old_state} to {new_state}" ) ) self.on("audio_subscribe_state_changed", callback) self.local_user.unsubscribe_audio(uid) try: await future except Exception as e: raise Exception(f"Audio unsubscription failed for user {uid}: {str(e)}") from e finally: self.off("audio_subscribe_state_changed", callback) def _split_string_into_chunks(self, long_string, msg_id, chunk_size=300) -> list[dict[str: Any]]: """ Splits a long string into chunks of a given size. Parameters: long_string: The string to split. msg_id: The message ID. chunk_size: The size of each chunk. Returns: list[dict[str: Any]]: The list of chunks. """ total_parts = (len(long_string) + chunk_size - 1) // chunk_size json_chunks = [] for idx in range(total_parts): start = idx * chunk_size end = min(start + chunk_size, len(long_string)) chunk = { 'msg_id': msg_id, 'part_idx': idx, 'total_parts': total_parts, 'content': long_string[start:end] } json_chunk = json.dumps(chunk, ensure_ascii=False) json_chunks.append(json_chunk) return json_chunks async def send_stream_message(self, data: str, msg_id: str) -> None: """ Sends a stream message to the channel. Parameters: data: The data to send. msg_id: The message ID. """ chunks = self._split_string_into_chunks(data, msg_id) for chunk in chunks: self.connection.send_stream_message(self.stream_id, chunk) def on(self, event_name: str, callback): """ Allows external components to subscribe to events. Parameters: event_name: The name of the event to subscribe to. callback: The callback to call when the event is emitted. """ self.emitter.on(event_name, callback) def once(self, event_name: str, callback): """ Allows external components to subscribe to events once. Parameters: event_name: The name of the event to subscribe to. callback: The callback to call when the event is emitted. """ self.emitter.once(event_name, callback) def off(self, event_name: str, callback): """ Allows external components to unsubscribe from events. Parameters: event_name: The name of the event to unsubscribe from. callback: The callback to remove from the event. """ self.emitter.remove_listener(event_name, callback)
Methods
async def connect(self) ‑> None
-
Connects to a channel.
Parameters
channelId: The channel ID. uid: The user ID.
Returns
Channel
- The connected channel.
async def disconnect(self) ‑> None
-
Disconnects the channel.
def get_audio_frames(self, uid: int) ‑> AudioStream
def off(self, event_name: str, callback)
-
Allows external components to unsubscribe from events.
Parameters
event_name: The name of the event to unsubscribe from. callback: The callback to remove from the event.
def on(self, event_name: str, callback)
-
Allows external components to subscribe to events.
Parameters
event_name: The name of the event to subscribe to. callback: The callback to call when the event is emitted.
def once(self, event_name: str, callback)
-
Allows external components to subscribe to events once.
Parameters
event_name: The name of the event to subscribe to. callback: The callback to call when the event is emitted.
async def push_audio_frame(self, frame: bytes) ‑> None
-
Pushes an audio frame to the channel.
Parameters
frame: The audio frame to push.
async def send_stream_message(self, data: str, msg_id: str) ‑> None
-
Sends a stream message to the channel.
Parameters
data: The data to send. msg_id: The message ID.
async def subscribe_audio(self, uid: int) ‑> None
-
Subscribes to the audio of a user.
Parameters
uid: The user ID to subscribe to.
async def unsubscribe_audio(self, uid: int) ‑> None
-
Unsubscribes from the audio of a user.
Parameters
uid: The user ID to unsubscribe from.
class ChannelEventObserver (event_emitter: pyee.asyncio.AsyncIOEventEmitter, options: RtcOptions)
-
Expand source code
class ChannelEventObserver(IRTCConnectionObserver, IRTCLocalUserObserver, IAudioFrameObserver): def __init__(self, event_emitter: AsyncIOEventEmitter, options: RtcOptions) -> None: self.loop = asyncio.get_event_loop() self.emitter = event_emitter self.audio_streams = dict[int, AudioStream]() self.options = options def emit_event(self, event_name: str, *args): """Helper function to emit events.""" self.loop.call_soon_threadsafe(self.emitter.emit, event_name, *args) def on_connected( self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason ): logger.info(f"Connected to RTC: {agora_rtc_conn} {conn_info} {reason}") self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) def on_disconnected( self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason ): logger.info(f"Disconnected from RTC: {agora_rtc_conn} {conn_info} {reason}") self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) def on_connecting( self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason ): logger.info(f"Connecting to RTC: {agora_rtc_conn} {conn_info} {reason}") self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) def on_connection_failure(self, agora_rtc_conn, conn_info, reason): logger.error(f"Connection failure: {agora_rtc_conn} {conn_info} {reason}") self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) def on_user_joined(self, agora_rtc_conn: RTCConnection, user_id): logger.info(f"User joined: {agora_rtc_conn} {user_id}") self.emit_event("user_joined", agora_rtc_conn, user_id) def on_user_left(self, agora_rtc_conn: RTCConnection, user_id, reason): logger.info(f"User left: {agora_rtc_conn} {user_id} {reason}") self.emit_event("user_left", agora_rtc_conn, user_id, reason) def handle_received_chunk(self, json_chunk): chunk = json.loads(json_chunk) msg_id = chunk["msg_id"] part_idx = chunk["part_idx"] total_parts = chunk["total_parts"] if msg_id not in self.received_chunks: self.received_chunks[msg_id] = {"parts": {}, "total_parts": total_parts} if ( part_idx not in self.received_chunks[msg_id]["parts"] and 0 <= part_idx < total_parts ): self.received_chunks[msg_id]["parts"][part_idx] = chunk if len(self.received_chunks[msg_id]["parts"]) == total_parts: # all parts received, now recomposing original message and get rid it from dict sorted_parts = sorted( self.received_chunks[msg_id]["parts"].values(), key=lambda c: c["part_idx"], ) full_message = "".join(part["content"] for part in sorted_parts) del self.received_chunks[msg_id] return full_message, msg_id return (None, None) def on_stream_message( self, agora_local_user: LocalUser, user_id, stream_id, data, length ): # logger.info(f"Stream message", agora_local_user, user_id, stream_id, length) (reassembled_message, msg_id) = self.handle_received_chunk(data) if reassembled_message is not None: logger.info(f"Reassembled message: {msg_id} {reassembled_message}") def on_audio_subscribe_state_changed( self, agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state, ): logger.info( f"Audio subscribe state changed: {user_id} {new_state} {elapse_since_last_state}" ) self.emit_event("audio_subscribe_state_changed", agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state) def on_playback_audio_frame_before_mixing( self, agora_local_user: LocalUser, channelId, uid, frame: AudioFrame ): audio_frame = PcmAudioFrame() audio_frame.samples_per_channel = frame.samples_per_channel audio_frame.bytes_per_sample = frame.bytes_per_sample audio_frame.number_of_channels = frame.channels audio_frame.sample_rate = self.options.sample_rate audio_frame.data = frame.buffer # print( # "on_playback_audio_frame_before_mixing", # audio_frame.samples_per_channel, # audio_frame.bytes_per_sample, # audio_frame.number_of_channels, # audio_frame.sample_rate, # len(audio_frame.data), # ) self.loop.call_soon_threadsafe(self.audio_streams[uid].queue.put_nowait, audio_frame) return 0
Ancestors
- agora.rtc.rtc_connection_observer.IRTCConnectionObserver
- agora.rtc.local_user_observer.IRTCLocalUserObserver
- agora.rtc.audio_frame_observer.IAudioFrameObserver
Methods
def emit_event(self, event_name: str, *args)
-
Helper function to emit events.
def handle_received_chunk(self, json_chunk)
def on_audio_subscribe_state_changed(self, agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state)
def on_connected(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, conn_info: agora.rtc.rtc_connection.RTCConnInfo, reason)
def on_connecting(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, conn_info: agora.rtc.rtc_connection.RTCConnInfo, reason)
def on_connection_failure(self, agora_rtc_conn, conn_info, reason)
def on_disconnected(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, conn_info: agora.rtc.rtc_connection.RTCConnInfo, reason)
def on_playback_audio_frame_before_mixing(self, agora_local_user: agora.rtc.local_user.LocalUser, channelId, uid, frame: agora.rtc.agora_base.AudioFrame)
def on_stream_message(self, agora_local_user: agora.rtc.local_user.LocalUser, user_id, stream_id, data, length)
def on_user_joined(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, user_id)
def on_user_left(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, user_id, reason)
class Chat (channel: Channel)
-
Expand source code
class Chat(): def __init__(self, channel: Channel) -> None: self.channel = channel self.loop = self.channel.loop self.queue = asyncio.Queue() def log_exception(t: asyncio.Task[Any]) -> None: if not t.cancelled() and t.exception(): logger.error( "unhandled exception", exc_info=t.exception(), ) asyncio.create_task(self._process_message()).add_done_callback(log_exception) async def send_message(self, item: ChatMessage) -> None: """ Sends a message to the channel. Parameters: item: The message to send. """ await self.queue.put(item) # await self.queue.put_nowait(item) async def _process_message(self) -> None: """ Processes messages in the queue. """ while True: item: ChatMessage = await self.queue.get() await self.channel.send_stream_message(item.message, item.msg_id) self.queue.task_done() # await asyncio.sleep(0)
Methods
async def send_message(self, item: ChatMessage) ‑> None
-
Sends a message to the channel.
Parameters
item: The message to send.
class ChatMessage (message: str, msg_id: str)
-
Expand source code
class ChatMessage(): def __init__(self, message: str, msg_id: str) -> None: self.message = message self.msg_id = msg_id
class RtcEngine (appid: str, appcert: str)
-
Expand source code
class RtcEngine: def __init__(self, appid: str, appcert: str): self.appid = appid self.appcert = appcert if not appid: raise Exception("App ID is required)") config = AgoraServiceConfig() config.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS config.appid = appid config.log_path = os.path.join( os.path.dirname( os.path.dirname( os.path.dirname(os.path.join(os.path.abspath(__file__))) ) ), "agorasdk.log", ) self.agora_service = AgoraService() self.agora_service.initialize(config) def create_channel(self, options: RtcOptions) -> Channel: """ Creates a channel. Parameters: channelId: The channel ID. uid: The user ID. Returns: Channel: The created channel. """ return Channel(self, options) def destroy(self) -> None: """ Destroys the RTC engine. """ self.agora_service.release()
Methods
def create_channel(self, options: RtcOptions) ‑> Channel
-
Creates a channel.
Parameters
channelId: The channel ID. uid: The user ID.
Returns
Channel
- The created channel.
def destroy(self) ‑> None
-
Destroys the RTC engine.
class RtcOptions (*, channel_name: str = None, uid: int = 0, sample_rate: int = 24000, channels: int = 1)
-
Expand source code
class RtcOptions: def __init__( self, *, channel_name: str = None, uid: int = 0, sample_rate: int = 24000, channels: int = 1,): self.channel_name = channel_name self.uid = uid self.sample_rate = sample_rate self.channels = channels def build_token(self, appid: str, appcert: str) -> str: return RealtimekitTokenBuilder.build_token(appid, appcert, self.channel_name, self.uid)
Methods
def build_token(self, appid: str, appcert: str) ‑> str