Last active
March 29, 2019 08:36
-
-
Save ajaygunalan/29bf34e78dea66fa818694062604d275 to your computer and use it in GitHub Desktop.
Non-Block Communication in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from non_blocking import * | |
def HigherControl(k, v0): | |
if k == 'w': | |
v = v0 + 0.001 | |
elif k == 's': | |
v = v0 - 0.001 | |
else: | |
v = v0 | |
if k == 'a': | |
omega = 0.25 | |
elif k == 'd': | |
omega = -0.25 | |
else: | |
omega = 0 | |
return [v, omega] | |
global v0 | |
v0 = 0.0 | |
# def a(val): | |
# # print val | |
# global initial | |
# initial = val + 1 | |
# print initial | |
def get_input(k): | |
global v0 | |
[v0, omega] = HigherControl(k, v0) | |
print ("V, w") | |
print ([v0, omega]) | |
kb = KBHit() | |
while True: | |
if kb.kbhit(): | |
c = kb.getch() | |
if ord(c) == 27: # ESC | |
break | |
get_input(c) | |
kb.set_normal_term() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <fstream> | |
#include <iostream> | |
#include <cstdlib> | |
#include <cstring> | |
#include <stdio.h> | |
#include <string> | |
#include <ncurses.h> | |
#include <unistd.h> | |
#define SLEEP( milliseconds ) usleep( (unsigned long) (milliseconds * 1000.0) ) | |
int main() | |
{ | |
int ch; | |
initscr(); /* Start curses mode */ | |
raw(); /* Line buffering disabled */ | |
keypad(stdscr, TRUE); /* We get F1, F2 etc.. */ | |
noecho(); /* Don't echo() while we do getch */ | |
while (1) | |
{ | |
SLEEP( 100 ); | |
printw("Exit: Esc key\n"); | |
ch = getch(); /* If raw() hadn't been called | |
* we have to press enter before it | |
* gets to the program */ | |
if(ch == 27) // Esc key | |
{ | |
endwin(); /* End curses mode */ | |
exit(0); | |
//return 0; | |
} | |
else | |
{ printw("The pressed key is "); | |
attron(A_BOLD); | |
printw("%c\n", ch); | |
attroff(A_BOLD); | |
} | |
//refresh(); /* Print it on to the real screen */ | |
//getch(); /* Wait for user input */ | |
} | |
endwin(); /* End curses mode */ | |
exit(1); | |
//return 0; | |
} | |
/**How to Compile and Run **/ | |
g++ FILE_NAME.cpp -lncurses -o EXECUTABLE_NAME | |
./EXECUTABLE_NAME |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
A Python class implementing KBHIT, the standard keyboard-interrupt poller. | |
Works transparently on Windows and Posix (Linux, Mac OS X). Doesn't work | |
with IDLE. | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU Lesser General Public License as | |
published by the Free Software Foundation, either version 3 of the | |
License, or (at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
''' | |
import os | |
# Windows | |
if os.name == 'nt': | |
import msvcrt | |
# Posix (Linux, OS X) | |
else: | |
import sys | |
import termios | |
import atexit | |
from select import select | |
class KBHit: | |
def __init__(self): | |
'''Creates a KBHit object that you can call to do various keyboard things. | |
''' | |
if os.name == 'nt': | |
pass | |
else: | |
# Save the terminal settings | |
self.fd = sys.stdin.fileno() | |
self.new_term = termios.tcgetattr(self.fd) | |
self.old_term = termios.tcgetattr(self.fd) | |
# New terminal setting unbuffered | |
self.new_term[3] = (self.new_term[3] & ~termios.ICANON & ~termios.ECHO) | |
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.new_term) | |
# Support normal-terminal reset at exit | |
atexit.register(self.set_normal_term) | |
def set_normal_term(self): | |
''' Resets to normal terminal. On Windows this is a no-op. | |
''' | |
if os.name == 'nt': | |
pass | |
else: | |
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_term) | |
def getch(self): | |
''' Returns a keyboard character after kbhit() has been called. | |
Should not be called in the same program as getarrow(). | |
''' | |
s = '' | |
if os.name == 'nt': | |
return msvcrt.getch().decode('utf-8') | |
else: | |
return sys.stdin.read(1) | |
def getarrow(self): | |
''' Returns an arrow-key code after kbhit() has been called. Codes are | |
0 : up | |
1 : right | |
2 : down | |
3 : left | |
Should not be called in the same program as getch(). | |
''' | |
if os.name == 'nt': | |
msvcrt.getch() # skip 0xE0 | |
c = msvcrt.getch() | |
vals = [72, 77, 80, 75] | |
else: | |
c = sys.stdin.read(3)[2] | |
vals = [65, 67, 66, 68] | |
return vals.index(ord(c.decode('utf-8'))) | |
def kbhit(self): | |
''' Returns True if keyboard character was hit, False otherwise. | |
''' | |
if os.name == 'nt': | |
return msvcrt.kbhit() | |
else: | |
dr,dw,de = select([sys.stdin], [], [], 0) | |
return dr != [] | |
# Test | |
if __name__ == "__main__": | |
kb = KBHit() | |
print('Hit any key, or ESC to exit') | |
while True: | |
if kb.kbhit(): | |
c = kb.getch() | |
if ord(c) == 27: # ESC | |
break | |
print(c) | |
kb.set_normal_term() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Non-Block Communication in Python
In python, if you want to have user input, you can use
input
orraw_input
but the problem is the entire script waits for the input till timeout occurs. This is known as Blocking I/O.We wanted to control the stoch robot like playing a video game. i.e., we wanted the script to run irrespective of the user input. The solution is to use the non-blokcing I/O. It can implemented via either Threading or Select.
Clone it and run
python controller.py
orpython3 controller.py