name = "tokio-psql"
version = "0.1.0"
authors = ["Jinhua Luo <>"]
edition = "2018"
futures-preview = "=0.3.0-alpha.19"
tokio = "0.2.0-alpha.6"
tokio-postgres= { git = "" }
use futures::FutureExt;
use tokio_postgres::{Error, NoTls, Row};
async fn main() -> Result<(), Error> {
// Connect to the database.
let (client, connection) =
tokio_postgres::connect("host=localhost user=postgres", NoTls).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
let connection =|r| {
if let Err(e) = r {
eprintln!("connection error: {}", e);
// Now we can prepare a simple statement that just returns its parameter.
let stmt = client.prepare("SELECT $1::TEXT").await?;
// And then execute it, returning a Stream of Rows which we collect into a Vec.
let rows: Vec<Row> = client.query(&stmt, &[&"hello world"]).await?;
// Now we can check that we got back the same string we sent over.
let value: &str = rows[0].get(0);
assert_eq!(value, "hello world");

indykish commented Oct 11, 2019

Pointing to the latest git has still compile errors issues tokio-postgres= { git = "" }.

Here is my code

use futures::channel::mpsc;
use futures::{future, stream,  StreamExt};
use futures::{ FutureExt, TryStreamExt};
use std::error::Error;
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, Client, Config, Connection, NoTls};

async fn connect_raw(s: &str) -> Result<(Client, Connection<TcpStream, NoTlsStream>), Error> {
    let socket = TcpStream::connect("").await.unwrap();
    let config = s.parse::<Config>().unwrap();
    config.connect_raw(socket, NoTls).await

async fn notifications() {
    let (client, mut connection) = connect_raw("user=postgres").await.unwrap();

    let (tx, rx) = mpsc::unbounded();
    let stream = stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!(e));
    let connection = stream.forward(tx).map(|r| r.unwrap());

            "LISTEN test_notifications;
             NOTIFY test_notifications, 'hello';
             NOTIFY test_notifications, 'world';",


    let notifications = rx
        .filter_map(|m| match m {
            AsyncMessage::Notification(n) => future::ready(Some(n)),
            _ => future::ready(None),
    assert_eq!(notifications.len(), 2);
    assert_eq!(notifications[0].channel(), "test_notifications");
    assert_eq!(notifications[0].payload(), "hello");
    assert_eq!(notifications[1].channel(), "test_notifications");
    assert_eq!(notifications[1].payload(), "world");

This must be easier to solve since the Error is of type tokio_postgres::error::Error

error[E0308]: mismatched types
  --> components/streamer/src/
19 |     config.connect_raw(socket, NoTls).await
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected trait std::error::Error, found struct `tokio_postgres::error::Error`
   = note: expected type `std::result::Result<_, (dyn std::error::Error + 'static)>`
              found type `std::result::Result<_, tokio_postgres::error::Error>`

But i reverted back to 0.4.0-rc3 and downloaded Verified their test code in there and used to build our stuff.

Will wait for an official build.

