Skip to content

Instantly share code, notes, and snippets.

@kingluo kingluo/Cargo.toml
Created Oct 9, 2019

What would you like to do?
name = "tokio-psql"
version = "0.1.0"
authors = ["Jinhua Luo <>"]
edition = "2018"
futures-preview = "=0.3.0-alpha.19"
tokio = "0.2.0-alpha.6"
tokio-postgres= { git = "" }
use futures::FutureExt;
use tokio_postgres::{Error, NoTls, Row};
async fn main() -> Result<(), Error> {
// Connect to the database.
let (client, connection) =
tokio_postgres::connect("host=localhost user=postgres", NoTls).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
let connection =|r| {
if let Err(e) = r {
eprintln!("connection error: {}", e);
// Now we can prepare a simple statement that just returns its parameter.
let stmt = client.prepare("SELECT $1::TEXT").await?;
// And then execute it, returning a Stream of Rows which we collect into a Vec.
let rows: Vec<Row> = client.query(&stmt, &[&"hello world"]).await?;
// Now we can check that we got back the same string we sent over.
let value: &str = rows[0].get(0);
assert_eq!(value, "hello world");

This comment has been minimized.

Copy link

indykish commented Oct 11, 2019

Pointing to the latest git has still compile errors issues tokio-postgres= { git = "" }.

Here is my code

use futures::channel::mpsc;
use futures::{future, stream,  StreamExt};
use futures::{ FutureExt, TryStreamExt};
use std::error::Error;
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, Client, Config, Connection, NoTls};

async fn connect_raw(s: &str) -> Result<(Client, Connection<TcpStream, NoTlsStream>), Error> {
    let socket = TcpStream::connect("").await.unwrap();
    let config = s.parse::<Config>().unwrap();
    config.connect_raw(socket, NoTls).await

async fn notifications() {
    let (client, mut connection) = connect_raw("user=postgres").await.unwrap();

    let (tx, rx) = mpsc::unbounded();
    let stream = stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!(e));
    let connection = stream.forward(tx).map(|r| r.unwrap());

            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",


    let notifications = rx
        .filter_map(|m| match m {
            AsyncMessage::Notification(n) => future::ready(Some(n)),
            _ => future::ready(None),
    assert_eq!(notifications.len(), 2);
    assert_eq!(notifications[0].channel(), "test_notifications");
    assert_eq!(notifications[0].payload(), "hello");
    assert_eq!(notifications[1].channel(), "test_notifications");
    assert_eq!(notifications[1].payload(), "world");

This must be easier to solve since the Error is of type tokio_postgres::error::Error

error[E0308]: mismatched types
  --> components/streamer/src/
19 |     config.connect_raw(socket, NoTls).await
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected trait std::error::Error, found struct `tokio_postgres::error::Error`
   = note: expected type `std::result::Result<_, (dyn std::error::Error + 'static)>`
              found type `std::result::Result<_, tokio_postgres::error::Error>`

But i reverted back to 0.4.0-rc3 and downloaded Verified their test code in there and used to build our stuff.

Will wait for an official build.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.