Stream conversations and messages with XMTP
XMTP supports real-time message delivery and retrieval. Once you initially retrieve existing conversations, you can listen for a real-time stream of new conversations and messages.
Listen for new conversations
You can listen for new conversations being started in real-time. This enables apps to display incoming messages from new contacts.
- JavaScript
- Swift
- Dart
- Kotlin
- React
- React Native
const stream = await xmtp.conversations.stream();
for await (const conversation of stream) {
console.log(`New conversation started with ${conversation.peerAddress}`);
// Say hello to your new friend
await conversation.send("Hi there!");
// Break from the loop to stop listening
//This stream will continue infinitely. To end the stream,
//You can either break from the loop, or call `await stream.return()`.
break;
}
for try await conversation in client.conversations.stream() {
print("New conversation started with \(conversation.peerAddress)")
// Say hello to your new friend
try await conversation.send(content: "Hi there!")
// Break from the loop to stop listening
//This stream will continue infinitely
break
}
var listening = client.streamConversations().listen((convo) {
debugPrint('Got a new conversation with ${convo.peer}');
});
// When you want to stop listening:
await listening.cancel();
client.conversations.stream().collect {
print("New conversation started with ${it.peerAddress}")
// Say hello to your new friend
it.send(text = "Hi there!")
}
import { useCallback, useState } from "react";
import { useStreamConversations } from "@xmtp/react-sdk";
import type { Conversation } from "@xmtp/react-sdk";
export const NewConversations: React.FC = () => {
// track streamed conversations
const [streamedConversations, setStreamedConversations] = useState<
Conversation[]
>([]);
// callback to handle incoming conversations
const onConversation = useCallback(
(conversation: Conversation) => {
setStreamedConversations((prev) => [...prev, conversation]);
},
[],
);
const { error } = useStreamConversations(onConversation);
if (error) {
return "An error occurred while streaming conversations";
}
return (
...
);
};
const stream = await xmtp.conversations.stream();
for await (const conversation of stream) {
console.log(`New conversation started with ${conversation.peerAddress}`);
// Say hello to your new friend
await conversation.send("Hi there!");
// Break from the loop to stop listening
break;
}
Listen for new messages in a conversation
You can listen for any new messages (incoming or outgoing) in a conversation by calling conversation.streamMessages()
.
The Stream returned by the stream
methods is an asynchronous iterator and as such usable by a for-await-of loop. Note however that it is by its nature infinite, so any looping construct used with it will not terminate, unless the termination is explicitly initiated (by breaking the loop or by an external call to return
).
- JavaScript
- Swift
- Dart
- Kotlin
- React
- React Native
const conversation = await xmtp.conversations.newConversation(
"0x3F11b27F323b62B159D2642964fa27C46C841897",
);
for await (const message of await conversation.streamMessages()) {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue;
}
console.log(`New message from ${message.senderAddress}: ${message.content}`);
}
let conversation = try await client.conversations.newConversation(
with: "0x3F11b27F323b62B159D2642964fa27C46C841897")
for try await message in conversation.streamMessages() {
if message.senderAddress == client.address {
// This message was sent from me
continue
}
print("New message from \(message.senderAddress): \(message.body)")
}
var listening = client.streamMessages(convo).listen((message) {
debugPrint('${message.sender}> ${message.content}');
});
// When you want to stop listening:
await listening.cancel();
val conversation =
client.conversations.newConversation("0x3F11b27F323b62B159D2642964fa27C46C841897")
conversation.streamMessages().collect {
if (it.senderAddress == client.address) {
// This message was sent from me
}
print("New message from ${it.senderAddress}: ${it.body}")
}
// The useStreamMessages hook streams new conversation messages on mount
// and exposes an error state.
import { useStreamMessages } from "@xmtp/react-sdk";
import type { CachedConversation, DecodedMessage } from "@xmtp/react-sdk";
import { useCallback, useEffect, useState } from "react";
export const StreamMessages: React.FC<{
conversation: CachedConversation;
}> = ({
conversation,
}) => {
// track streamed messages
const [streamedMessages, setStreamedMessages] = useState<DecodedMessage[]>(
[],
);
// callback to handle incoming messages
const onMessage = useCallback(
(message: DecodedMessage) => {
setStreamedMessages((prev) => [...prev, message]);
},
[streamedMessages],
);
useStreamMessages(conversation, onMessage);
useEffect(() => {
setStreamedMessages([]);
}, [conversation]);
return (
...
);
};
const conversation = await xmtp.conversations.newConversation(
"0x3F11b27F323b62B159D2642964fa27C46C841897",
);
for await (const message of await conversation.streamMessages()) {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue;
}
console.log(`New message from ${message.senderAddress}: ${message.content}`);
}
Listen for new messages in all conversations
There is a chance this stream can miss messages if multiple new conversations are received in the time it takes to update the stream to include a new conversation.
- JavaScript
- React
- React Native
for await (const message of await xmtp.conversations.streamAllMessages()) {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue;
}
console.log(`New message from ${message.senderAddress}: ${message.content}`);
}
// The useStreamAllMessages hook streams new messages from all conversations
// on mount and exposes an error state.
import { useStreamAllMessages } from "@xmtp/react-sdk";
import type { DecodedMessage } from "@xmtp/react-sdk";
import { useCallback, useState } from "react";
export const StreamAllMessages: React.FC = () => {
// track streamed messages
const [streamedMessages, setStreamedMessages] = useState<DecodedMessage[]>(
[],
);
// callback to handle incoming messages
const onMessage = useCallback(
(message: DecodedMessage) => {
setStreamedMessages((prev) => [...prev, message]);
},
[streamedMessages],
);
useStreamAllMessages(onMessage);
return (
...
);
};
for await (const message of await xmtp.conversations.streamAllMessages()) {
if (message.senderAddress === xmtp.address) {
// This message was sent from me
continue;
}
console.log(`New message from ${message.senderAddress}: ${message.content}`);
}