Created
January 29, 2019 20:43
-
-
Save yanniszark/0104e0f6481804d746dade34836d0983 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scylla_code_assignment | |
import ( | |
"context" | |
"github.com/pkg/errors" | |
log "github.com/sirupsen/logrus" | |
"golang.org/x/time/rate" | |
"net" | |
"time" | |
) | |
var _ net.Conn = &RateLimitedConn{} | |
// Decorator for TCPConn | |
type RateLimitedConn struct { | |
net.Conn | |
// Rate limiting for TCP connection. | |
// Measured in bytes/sec. | |
rl *rate.Limiter | |
// Timeout for reading/writing | |
timeout time.Duration | |
} | |
// NewRateLimitedConn decorates a TCPConn with rate limiting. | |
func NewRateLimitedConn(conn net.Conn, rl *rate.Limiter, timeout time.Duration) net.Conn { | |
return &RateLimitedConn{ | |
Conn: conn, | |
rl: rl, | |
timeout: timeout, | |
} | |
} | |
func (c *RateLimitedConn) Read(b []byte) (n int, err error) { | |
log.Info("Rate-Limited Read") | |
// Limit the size of the data to read to min(burst, len(b)) | |
maxSize := min(len(b), c.rl.Burst()) | |
if maxSize == 0 { | |
return | |
} | |
buffer := make([]byte, maxSize) | |
// Create context with max wait time for the token bucket | |
ctx, cancel := context.WithTimeout(context.TODO(), c.timeout) | |
defer cancel() | |
// Check if we can read maxSize bytes | |
if err := c.rl.WaitN(ctx, maxSize); err != nil { | |
return 0, errors.Wrap(err, "Error while waiting for token bucket") | |
} | |
n, err = c.Conn.Read(buffer) | |
if err != nil { | |
return | |
} | |
copy(b, buffer) | |
return | |
} | |
func (c *RateLimitedConn) Write(b []byte) (n int, err error) { | |
log.Info("Rate-Limited Write") | |
// Limit the size of the data to read to min(burst, len(b)) | |
maxSize := min(len(b), c.rl.Burst()) | |
if maxSize == 0 { | |
return | |
} | |
buffer := make([]byte, maxSize) | |
// Create context with max wait time for the token bucket | |
ctx, cancel := context.WithTimeout(context.TODO(), c.timeout) | |
defer cancel() | |
// Check if we can write maxSize bytes | |
if err = c.rl.WaitN(ctx, maxSize); err != nil { | |
return | |
} | |
copy(buffer, b) | |
n, err = c.Conn.Write(buffer) | |
return | |
} | |
func min(x, y int) int { | |
if x < y { | |
return x | |
} | |
return y | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment