Skip to content

Instantly share code, notes, and snippets.

@Integralist
Last active June 17, 2020 10:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Integralist/c88f6dca57f3bc327403c9e042e11ff2 to your computer and use it in GitHub Desktop.
Save Integralist/c88f6dca57f3bc327403c9e042e11ff2 to your computer and use it in GitHub Desktop.
[NSQ Questions] some questions I received from someone new to NSQ #nsq #python #pynsq #tornado #queues #buzzfeed #work

Below is my response to a colleague who was new to NSQ and had some questions about an application he was building.

Note: bf_nsq is an internal BuzzFeed abstraction on top of pynsq.


Heya 👋🏻

Are there any limits I should be aware of when using BF NSQ? ie. queue size, message size, requeues/max tries, response times, etc.

The NSQ defaults are (these are IMHO the more important parameters to be aware of)

  • topic/channel names have a size restriction of <= 64 characters (used to be 32, see tcp spec here).
  • message size 1mb (see -max-msg-size in nsqd docs here).
  • message queue size 1000 (see -mem-queue-size in nsqd docs here).

The BuzzFeed defaults are effectively the same. You'll find this information in our Ansible configuration (i.e. ansible/roles/nsqd/templates/nsqd_bootstrap.sh.j2 and the values: ansible/roles/nsqd/defaults/main.yml).

As far as reading/writing of NSQ messages I would suggest reading through the pynsq docs (specifically the Reader/Writer classes). I know that bf_nsq is an abstraction on top of pynsq but it's worth getting a bit familiar with what bf_nsq is using 'under the covers'. Hence knowing what you can configure within nsq.Reader will help you.

As an example: bf_nsq provides bf_nsq.MultiReader which is an abstraction for pynsq's nsq.Reader class, and any additional keyword args you provide to bf_nsq.MultiReader will be passed into the constructor for nsq.Reader).Hence knowing what you can configure within nsq.Reader will help you.

One important thing to be aware of is that bf_nsq uses nsq.Reader but not nsq.Writer. My understanding is that bf_nsq's publish method is designed to support batch publishing of messages (something nsq.Writer doesn't support). So although you can read the pynsq docs to understand the nsq.Reader and nsq.Writer classes, you'll only really be able to configure the reading of messages when using bf_nsq.

Also, bf_nsq doesn't set any reader parameters when creating instances of nsq.Reader, so you'll need to check pynsq to see what those defaults are. Although in most cases the defaults are mentioned in the documentation itself, I typically will go directly to the pynsq GitHub code repository to see what the defaults are as I like to get familiar with the code I'm ultimately going to be using, and possibly debugging if things don't work how I expect them to (you'll find the defaults set here).

Specifically you'll find:

  • max_tries: 5 (the maximum number of attempts the reader will make to process a message after which messages will be automatically discarded)
  • max_in_flight: 1 (the maximum number of messages this reader will pipeline for processing.this value will be divided evenly amongst the configured/discovered nsqd producers)
  • msg_timeout: None (the amount of time (in seconds) that nsqd will wait before considering messages that have been delivered to this consumer timed out)

NOTE: msg_timeout is slightly misleading. In nearly all examples of nsq.Reader you'll see msg_timeout being provided as a parameter but in fact it will be proxied down into nsq.AsyncConn which means you have to look at a different code file to find its default value (you'll find the default set here).

Say a user uploads their Twitter history and a batch of NSQ messages are sent off to trigger tweet deletions. How can I tell when all messages have been processed successfully and ensure that none were dropped or failed?

So there's two things to consider here:

  1. instrumentation
  2. message contents

As far as I'm aware there is no built-in callback mechanism with either nsq.Writer or bf_nsq.publish and so you'll want to utilize instrumentation (e.g. bf_metrics) in your application that's reading the messages from NSQ and processing them. That way you'll be able to report whether a message was processed completely or if it needed to be requeued (or maybe even dropped).

Now what I'm about to mention I don't think will affect you as your design is different to what I'm about to describe but I want to say it anyway as it could be a useful thought experiment. If you're using bf_nsq.publish to send batched messages to NSQ and some of those messages fail to publish, then you'll get back a list of failed messages.

You can then decide if you want to try sending those failed messages again, but you'll want to build in some logic that short-circuits the retries after a set threshold has been exceeded (e.g. you don't want to retry the failed messages for the rest of time!) especially if there's some kind of formatting error that would mean they would never succeed.

I brought this up as I had a service (qr_plan_z) that would be given messages that contained log files. I would parse individual log lines from the message contents and then group each log line up into a list to be batch published to NSQ via the bf_nsq.publish method.

If any of those log lines failed and I wanted to requeue them I would be in an awkward situation because I ultimately would have to requeue the entire message and that would mean when my queue reader service (qr_plan_z) received the requeued message, then I'd have to parse the log lines again (even those that were already successfully published).

A queue reader service needs to be performant and so if I didn't just requeue the message (and I kept retrying the failing messages over and over) it would have resulted in my service not being able to context switch to another ioloop task (i.e. handling another incoming message). Not without writing extra logic for forcing a context switch, which would have been a non-trivial piece of work (especially considering the queue reader was using a tornado ioloop that was hidden away inside of pynsq so it's not like I had direct access to it because of how pynsq is designed to be used).

Ultimately it was better/easier for me to just requeue the message and suffer the consequence of re-publishing URLs that would have already been published successfully in the hope that the failed messages would go through successfully the second time around.

This made it even more important for a service upstream of mine that would be consuming these messages to have some form of deduplication logic in place.

What are some good strategies for handling duplicate requests and avoiding redundant API calls? I saw in your blog that you’ve used Redis to track duplicate requests and rate limit yourself. Can you talk more about this or do you have any examples that I can take a look at?

You can see examples of using redis for deduplication in qr_plan_z (e.g. qr_plan_z/app/handler.py#L160-L185) and you can see examples of using redis for rate limiting in qr_static_fallback_s3 (e.g. qr_static_fallback_s3/app/ratelimit.py).

It's good that you're thinking about deduplicating entries (as tolerance is useful when designing resilient systems) but I wouldn't necessarily worry about my specific implementation, as it's not only old (and not very good) but it's also not going to align with the design of your own service.

You could (depending on the number of tweets that are expected to be purged) use a simple in-memory data structure to hold state about tweets that have already been seen. Maybe see how far that approach gets you first before spinning up a redis instance.

Additionally I used redis because there was an expectation (in my use case at least) for messages to 'eventually' need to be reprocessed. What I mean by that was my messages were log files and so those log files might have a URL such as https://www.buzzfeed.com/videos multiple times (because multiple users visited that page). I only want to handle the URL once as I was going to grab a static copy of that page and store it into S3 (no point in doing that multiple times right). BUT that page will be updated at some point and I want the latest version of it. So when storing a key into redis I would have a TTL set so that it would auto-expire at a point in future. So let's say the next day I'll get a new bunch of messages and they might include the same https://www.buzzfeed.com/videos URL, my queue reader will now check redis, find no key there and reprocess the URL.

Doesn't sound like you have to worry about that sort of set-up because a tweet is a unique entity and so once it has been deleted, you're unlikely to see it pop up again. Unless due to your UI you allow someone to upload a bunch of past tweets, then that way someone might upload the same set of tweets multiple times accidentally (admittedly it wouldn't be the end of the world though if that happened as your logic could account for that by checking the twitter API response so that if it came back with a message like "tweet not found" then you can happily just drop that message. Sure it would be an extra API call to twitter, and so that might be important enough to protect by using deduplication in your application).

If the QR hits a Twitter API rate limit, I think it would make the most sense to save the current user’s remaining deletion requests for later and move onto the next user’s deletion requests since deletion rate limits are per user rather than per app. Should I just keep requeueing that user's remaining requests until their rate limit window refreshes or is there a better way of handling this?

If you know what the refresh window is, then I believe you could configure NSQ to use a pause duration to match. The nsq.AsyncConn class has a requeue_delay parameter which you can configure by passing it into the nsq.Reader class (as the parameter will be passed down into nsq.AsyncConn on your behalf).

Looks like the default value for that is 90 (see code here). But be aware the value you provide is not a time period (e.g. it's not 90 seconds). According to the code they describe this feature as...

When a message on this connection is requeued and the requeue delay has not been specified, it calculates the delay automatically by an increasing multiple of requeue_delay.

If you look at the code you can see the calculation it uses.

Is there a way to ensure that Twitter API requests can only be triggered by trusted sources (UI, job) via NSQ? For example, restricting which services are able to publish to a given topic?

I don't believe there is a built-in mechanism for this, but I could be wrong. I don't know enough about our BuzzFeed 'pixie dust' implementation (which iirc is the abstraction platform layer we've built around NSQ here at BuzzFeed).

I would suggest adding some form of shared secret key, or just contextual data to your message contents such that the service processing the NSQ messages can check if the value is present and if it is you know the message was published from a trusted source.

I don’t fully understand how to use the max in flight setting. Is there a good rule of thumb for figuring out what that should be for my service?

The answer will depend on your application throughput. I don't think there's an exact calculation. You'll need to start with a small number, let's say 10, and then monitor your application (message processing latency, observing memory and cpu consumption etc) and then if all looks well then maybe try increasing the value incrementally.

You should also look at the nsqadmin UI service (and subsequent metrics reported into Datadog) to gauge performance of NSQ such as message depth over time.

Ps, there's some "Design and Theory" + "pynsq" questions in the NSQ FAQ which might be of interest to you as someone new to NSQ (see FAQ here).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment