Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Issue 1039

Steps to reproduce:

  1. Create virtualenv and install requirements: pip install -r requirements.txt
  2. Start local Datadog agent
  3. Start backend services: docker-compose up -d
  4. Start celery worker from a shell: celery worker -A tasks -l info
  5. Start grpc server from a shell: ddtrace-run python server.py
  6. Run task from a shell: python run.py

You should see in celery logs the soft limit reached:

[2019-09-09 11:35:37,333: INFO/MainProcess] Received task: tasks.do_grpc_call[fa9a0691-6385-423e-9da0-2ce5135a880e]
[2019-09-09 11:35:39,340: WARNING/MainProcess] Soft time limit (2s) exceeded for tasks.do_grpc_call[fa9a0691-6385-423e-9da0-2ce5135a880e]
[2019-09-09 11:35:39,373: INFO/ForkPoolWorker-8] tasks.do_grpc_call[fa9a0691-6385-423e-9da0-2ce5135a880e]: Everything is fine!
[2019-09-09 11:35:39,374: INFO/ForkPoolWorker-8] Task tasks.do_grpc_call[fa9a0691-6385-423e-9da0-2ce5135a880e] succeeded in 2.0387737190000017s: None
version: "3"
services:
rabbitmq:
image: rabbitmq:3.7-alpine
ports:
- "127.0.0.1:5672:5672"
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: helloworld.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='helloworld.proto',
package='helloworld',
syntax='proto3',
serialized_options=_b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW'),
serialized_pb=_b('\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3')
)
_HELLOREQUEST = _descriptor.Descriptor(
name='HelloRequest',
full_name='helloworld.HelloRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='helloworld.HelloRequest.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=32,
serialized_end=60,
)
_HELLOREPLY = _descriptor.Descriptor(
name='HelloReply',
full_name='helloworld.HelloReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='message', full_name='helloworld.HelloReply.message', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=62,
serialized_end=91,
)
DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST
DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), {
'DESCRIPTOR' : _HELLOREQUEST,
'__module__' : 'helloworld_pb2'
# @@protoc_insertion_point(class_scope:helloworld.HelloRequest)
})
_sym_db.RegisterMessage(HelloRequest)
HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), {
'DESCRIPTOR' : _HELLOREPLY,
'__module__' : 'helloworld_pb2'
# @@protoc_insertion_point(class_scope:helloworld.HelloReply)
})
_sym_db.RegisterMessage(HelloReply)
DESCRIPTOR._options = None
_GREETER = _descriptor.ServiceDescriptor(
name='Greeter',
full_name='helloworld.Greeter',
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=93,
serialized_end=166,
methods=[
_descriptor.MethodDescriptor(
name='SayHello',
full_name='helloworld.Greeter.SayHello',
index=0,
containing_service=None,
input_type=_HELLOREQUEST,
output_type=_HELLOREPLY,
serialized_options=None,
),
])
_sym_db.RegisterServiceDescriptor(_GREETER)
DESCRIPTOR.services_by_name['Greeter'] = _GREETER
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import helloworld_pb2 as helloworld__pb2
class GreeterStub(object):
"""The greeting service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.SayHello = channel.unary_unary(
'/helloworld.Greeter/SayHello',
request_serializer=helloworld__pb2.HelloRequest.SerializeToString,
response_deserializer=helloworld__pb2.HelloReply.FromString,
)
class GreeterServicer(object):
"""The greeting service definition.
"""
def SayHello(self, request, context):
"""Sends a greeting
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_GreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
'SayHello': grpc.unary_unary_rpc_method_handler(
servicer.SayHello,
request_deserializer=helloworld__pb2.HelloRequest.FromString,
response_serializer=helloworld__pb2.HelloReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'helloworld.Greeter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
celery==4.3.0
git+https://github.com/datadog/dd-trace-py@majorgreys/grpc-fix#egg=ddtrace
googleapis-common-protos==1.6.0
grpcio==1.23.0
grpcio-status==1.23.0
grpcio-tools==1.23.0
import tasks
if __name__ == "__main__":
# force the method to time out after 2 seconds
tasks.do_grpc_call.apply_async(args=[], soft_time_limit=2)
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC helloworld.Greeter server."""
from concurrent import futures
import time
import logging
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
time.sleep(5)
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
logging.basicConfig()
serve()
from ddtrace import patch_all
patch_all(grpc=True)
from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
app = Celery("tasks")
logger = get_task_logger(__name__)
@app.task()
def do_grpc_call():
channel = grpc.insecure_channel("localhost:50051")
stub = helloworld_pb2_grpc.GreeterStub(channel)
try:
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
logger.info('Done!')
except SoftTimeLimitExceeded:
logger.info("Everything is fine!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment