Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save heumsi/9de90345a9d3523362f69f237bfd2092 to your computer and use it in GitHub Desktop.
Save heumsi/9de90345a9d3523362f69f237bfd2092 to your computer and use it in GitHub Desktop.
Airbyte connection reset log after schema change
2023-01-07 13:38:30 INFO i.a.w.t.TemporalAttemptExecution(get):136 - Docker volume job log path: /tmp/workspace/18/0/logs.log
2023-01-07 13:38:30 INFO i.a.w.t.TemporalAttemptExecution(get):141 - Executing worker wrapper. Airbyte version: 0.40.21
2023-01-07 13:38:30 WARN i.a.m.l.MetricClientFactory(initialize):60 - Metric client is already initialized to otel
2023-01-07 13:38:30 INFO i.a.c.f.EnvVariableFeatureFlags(getEnvOrDefault):50 - Using default value for environment variable LOG_CONNECTOR_MESSAGES: 'false'
2023-01-07 13:38:30 INFO i.a.w.g.DefaultReplicationWorker(run):135 - start sync worker. job id: 18 attempt id: 0
2023-01-07 13:38:30 INFO i.a.w.g.DefaultReplicationWorker(run):150 - configured sync modes: {my_database.table_one=full_refresh - overwrite}
2023-01-07 13:38:30 INFO i.a.w.i.DefaultAirbyteDestination(start):72 - Running destination...
2023-01-07 13:38:30 INFO i.a.c.i.LineGobbler(voidCall):114 -
2023-01-07 13:38:30 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- START REPLICATION -----
2023-01-07 13:38:30 INFO i.a.c.i.LineGobbler(voidCall):114 -
2023-01-07 13:38:30 INFO i.a.c.i.LineGobbler(voidCall):114 - Checking if airbyte/destination-bigquery:1.2.7 exists...
2023-01-07 13:38:30 INFO i.a.c.i.LineGobbler(voidCall):114 - airbyte/destination-bigquery:1.2.7 was found locally.
2023-01-07 13:38:30 INFO i.a.w.p.DockerProcessFactory(create):119 - Creating docker container = destination-bigquery-write-18-0-noxxk with resources null
2023-01-07 13:38:30 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/18/0 --log-driver none --name destination-bigquery-write-18-0-noxxk --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e WORKER_JOB_ATTEMPT=0 -e WORKER_CONNECTOR_IMAGE=airbyte/destination-bigquery:1.2.7 -e AIRBYTE_VERSION=0.40.21 -e WORKER_JOB_ID=18 airbyte/destination-bigquery:1.2.7 write --config destination_config.json --catalog destination_catalog.json
2023-01-07 13:38:30 INFO i.a.w.i.VersionedAirbyteMessageBufferedWriterFactory(createWriter):33 - Writing messages to protocol version 0.2.0
2023-01-07 13:38:30 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):97 - Reading messages from protocol version 0.2.0
2023-01-07 13:38:30 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):243 - Destination output thread started.
2023-01-07 13:38:30 INFO i.a.w.g.DefaultReplicationWorker(replicate):217 - Waiting for source and destination threads to complete.
2023-01-07 13:38:30 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromSrcAndWriteToDstRunnable$6):291 - Replication thread started.
2023-01-07 13:38:30 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromSrcAndWriteToDstRunnable$6):333 - Total records read: 1 (0 bytes)
2023-01-07 13:38:30 INFO i.a.w.g.DefaultReplicationWorker(replicate):222 - One of source or destination thread complete. Waiting on the other.
2023-01-07 13:38:31 destination > integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2023-01-07 13:38:31 destination > Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination
2023-01-07 13:38:31 destination > Command: WRITE
2023-01-07 13:38:31 destination > Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2023-01-07 13:38:31 destination > Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-07 13:38:31 destination > Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-07 13:38:31 destination > Selected loading method is set to: GCS
2023-01-07 13:38:31 destination > S3 format config: {"format_type":"AVRO","flattening":"No flattening"}
2023-01-07 13:38:31 destination > All tmp files GCS will be kept in bucket when replication is finished
2023-01-07 13:38:31 destination > Creating BigQuery staging message consumer with staging ID d5128581-31c3-4921-ad44-fa511d9724a0 at 2023-01-07T13:38:31.419Z
2023-01-07 13:38:31 destination > BigQuery write config: BigQueryWriteConfig[streamName=table_one, namespace=my_database, datasetId=my_database, datasetLocation=asia-northeast3, tmpTableId=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_tmp_uko_table_one}}, targetTableId=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_raw_table_one}}, tableSchema=Schema{fields=[Field{name=_airbyte_ab_id, type=STRING, mode=null, description=null, policyTags=null}, Field{name=_airbyte_emitted_at, type=TIMESTAMP, mode=null, description=null, policyTags=null}, Field{name=_airbyte_data, type=STRING, mode=null, description=null, policyTags=null}]}, syncMode=overwrite, stagedFiles=[]]
2023-01-07 13:38:31 destination > class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2023-01-07 13:38:31 destination > Preparing tmp tables in destination started for 1 streams
2023-01-07 13:38:31 destination > Creating dataset my_database
2023-01-07 13:38:32 destination > Creating tmp table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_tmp_uko_table_one}}
2023-01-07 13:38:33 destination > Partitioned table created successfully: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_tmp_uko_table_one}}
2023-01-07 13:38:33 destination > Creating staging path for stream table_one (dataset my_database): staging/my_database_table_one/2023/01/07/13/d5128581-31c3-4921-ad44-fa511d9724a0/
2023-01-07 13:38:33 destination > Storage Object heumsi-playground-airbyte/staging/my_database_table_one/2023/01/07/13/d5128581-31c3-4921-ad44-fa511d9724a0/ does not exist in bucket; creating...
2023-01-07 13:38:33 destination > Storage Object heumsi-playground-airbyte/staging/my_database_table_one/2023/01/07/13/d5128581-31c3-4921-ad44-fa511d9724a0/ has been created in bucket.
2023-01-07 13:38:33 destination > Preparing tmp tables in destination completed.
2023-01-07 13:38:33 destination > Airbyte message consumer: succeeded.
2023-01-07 13:38:33 destination > executing on success close procedure.
2023-01-07 13:38:33 destination > Flushing all 0 current buffers (0 bytes in total)
2023-01-07 13:38:33 destination > Copying into tables in destination started for 1 streams
2023-01-07 13:38:33 destination > Uploading records from staging files to tmp table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_tmp_uko_table_one}} (dataset my_database): []
2023-01-07 13:38:33 destination > Copying data from tmp table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_tmp_uko_table_one}} to target table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_raw_table_one}} (dataset my_database, sync mode overwrite)
2023-01-07 13:38:34 destination > successfully copied table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_tmp_uko_table_one}} to table: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_raw_table_one}}
2023-01-07 13:38:34 destination > Finalizing tables in destination completed
2023-01-07 13:38:34 destination > Cleaning up destination started for 1 streams
2023-01-07 13:38:34 destination > Deleting tmp table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=my_database, tableId=_airbyte_tmp_uko_table_one}} (dataset my_database)
2023-01-07 13:38:34 destination > Cleaning up destination completed.
2023-01-07 13:38:34 INFO i.a.w.g.DefaultReplicationWorker(lambda$readFromDstRunnable$4):253 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@400b44fc[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@4eccb28d[type=LEGACY,stream=<null>,global=<null>,data={},additionalProperties={}],trace=<null>,control=<null>,additionalProperties={}]
2023-01-07 13:38:34 destination > Completed integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination
2023-01-07 13:38:34 INFO i.a.w.g.DefaultReplicationWorker(replicate):224 - Source and destination threads complete.
2023-01-07 13:38:34 INFO i.a.w.g.DefaultReplicationWorker(prepStateForLaterSaving):475 - Source output at least one state message
2023-01-07 13:38:34 INFO i.a.w.g.DefaultReplicationWorker(prepStateForLaterSaving):481 - State capture: Updated state to: Optional[io.airbyte.config.State@434196e0[state={}]]
2023-01-07 13:38:34 INFO i.a.w.g.DefaultReplicationWorker(getReplicationOutput):408 - sync summary: {
"status" : "completed",
"recordsSynced" : 0,
"bytesSynced" : 0,
"startTime" : 1673098710281,
"endTime" : 1673098714889,
"totalStats" : {
"recordsEmitted" : 0,
"bytesEmitted" : 0,
"sourceStateMessagesEmitted" : 1,
"destinationStateMessagesEmitted" : 1,
"recordsCommitted" : 0,
"meanSecondsBeforeSourceStateMessageEmitted" : 0,
"maxSecondsBeforeSourceStateMessageEmitted" : 0,
"maxSecondsBetweenStateMessageEmittedandCommitted" : 4,
"meanSecondsBetweenStateMessageEmittedandCommitted" : 4,
"replicationStartTime" : 1673098710281,
"replicationEndTime" : 1673098714889,
"sourceReadStartTime" : 1673098710342,
"sourceReadEndTime" : 1673098710343,
"destinationWriteStartTime" : 1673098710342,
"destinationWriteEndTime" : 1673098714889
},
"streamStats" : [ ]
}
2023-01-07 13:38:34 INFO i.a.w.g.DefaultReplicationWorker(getReplicationOutput):409 - failures: [ ]
2023-01-07 13:38:34 INFO i.a.c.i.LineGobbler(voidCall):114 -
2023-01-07 13:38:34 INFO i.a.w.t.TemporalAttemptExecution(get):160 - Stopping cancellation check scheduling...
2023-01-07 13:38:34 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- END REPLICATION -----
2023-01-07 13:38:34 INFO i.a.c.i.LineGobbler(voidCall):114 -
2023-01-07 13:38:34 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):198 - sync summary: io.airbyte.config.StandardSyncOutput@334174d7[standardSyncSummary=io.airbyte.config.StandardSyncSummary@331f440[status=completed,recordsSynced=0,bytesSynced=0,startTime=1673098710281,endTime=1673098714889,totalStats=io.airbyte.config.SyncStats@43be5cb1[recordsEmitted=0,bytesEmitted=0,sourceStateMessagesEmitted=1,destinationStateMessagesEmitted=1,recordsCommitted=0,meanSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBetweenStateMessageEmittedandCommitted=4,meanSecondsBetweenStateMessageEmittedandCommitted=4,replicationStartTime=1673098710281,replicationEndTime=1673098714889,sourceReadStartTime=1673098710342,sourceReadEndTime=1673098710343,destinationWriteStartTime=1673098710342,destinationWriteEndTime=1673098714889,additionalProperties={}],streamStats=[]],normalizationSummary=<null>,webhookOperationSummary=<null>,state=io.airbyte.config.State@434196e0[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@26b0035e[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@1eef3c93[stream=io.airbyte.protocol.models.AirbyteStream@12f058fa[name=table_one,jsonSchema={"type":"object","properties":{"id":{"type":"number","airbyte_type":"integer"},"name":{"type":"string"},"value":{"type":"number","airbyte_type":"integer"},"updated_at":{"type":"string","format":"date-time","airbyte_type":"timestamp_with_timezone"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=my_database,additionalProperties={}],syncMode=full_refresh,cursorField=[name],destinationSyncMode=overwrite,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}],failures=[]]
2023-01-07 13:38:34 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):203 - Sync summary length: 1723
2023-01-07 13:38:34 INFO i.a.c.t.TemporalUtils(withBackgroundHeartbeat):283 - Stopping temporal heartbeating...
2023-01-07 13:38:34 INFO i.a.c.f.EnvVariableFeatureFlags(getEnvOrDefault):50 - Using default value for environment variable NEED_STATE_VALIDATION: 'true'
2023-01-07 13:38:35 INFO i.a.w.t.TemporalAttemptExecution(get):136 - Docker volume job log path: /tmp/workspace/18/0/logs.log
2023-01-07 13:38:35 INFO i.a.w.t.TemporalAttemptExecution(get):141 - Executing worker wrapper. Airbyte version: 0.40.21
2023-01-07 13:38:35 INFO i.a.c.i.LineGobbler(voidCall):114 -
2023-01-07 13:38:35 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- START DEFAULT NORMALIZATION -----
2023-01-07 13:38:35 INFO i.a.c.i.LineGobbler(voidCall):114 -
2023-01-07 13:38:35 INFO i.a.w.n.DefaultNormalizationRunner(runProcess):134 - Running with normalization version: airbyte/normalization:0.2.24
2023-01-07 13:38:35 INFO i.a.c.i.LineGobbler(voidCall):114 - Checking if airbyte/normalization:0.2.24 exists...
2023-01-07 13:38:35 INFO i.a.c.i.LineGobbler(voidCall):114 - airbyte/normalization:0.2.24 was found locally.
2023-01-07 13:38:35 INFO i.a.w.p.DockerProcessFactory(create):119 - Creating docker container = normalization-normalize-18-0-qpkpy with resources io.airbyte.config.ResourceRequirements@27f5c46d[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=]
2023-01-07 13:38:35 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/18/0/normalize --log-driver none --name normalization-normalize-18-0-qpkpy --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e AIRBYTE_VERSION=0.40.21 airbyte/normalization:0.2.24 run --integration-type bigquery --config destination_config.json --catalog destination_catalog.json
2023-01-07 13:38:35 normalization > Running: transform-config --config destination_config.json --integration-type bigquery --out /data/18/0/normalize
2023-01-07 13:38:38 normalization > Namespace(config='destination_config.json', integration_type=<DestinationType.BIGQUERY: 'bigquery'>, out='/data/18/0/normalize')
2023-01-07 13:38:38 normalization > transform_bigquery
2023-01-07 13:38:38 normalization > Running: transform-catalog --integration-type bigquery --profile-config-dir /data/18/0/normalize --catalog destination_catalog.json --out /data/18/0/normalize/models/generated/ --json-column _airbyte_data
2023-01-07 13:38:42 normalization > Processing destination_catalog.json...
2023-01-07 13:38:42 normalization > Generating airbyte_ctes/my_database/table_one_ab1.sql from table_one
2023-01-07 13:38:42 normalization > Generating airbyte_ctes/my_database/table_one_ab2.sql from table_one
2023-01-07 13:38:42 normalization > Generating airbyte_ctes/my_database/table_one_ab3.sql from table_one
2023-01-07 13:38:42 normalization > Adding drop table hook for table_one_scd to table_one
2023-01-07 13:38:42 normalization > Generating airbyte_tables/my_database/table_one.sql from table_one
2023-01-07 13:38:42 normalization > detected no config file for ssh, assuming ssh is off.
2023-01-07 13:39:02 normalization > [--event-buffer-size EVENT_BUFFER_SIZE]
2023-01-07 13:39:02 normalization > --event-buffer-size EVENT_BUFFER_SIZE
2023-01-07 13:39:02 INFO i.a.w.n.NormalizationAirbyteStreamFactory(filterOutAndHandleNonAirbyteMessageLines):104 -
2023-01-07 13:39:02 normalization > DBT >=1.0.0 detected; using 10K event buffer size
2023-01-07 13:39:02 INFO i.a.w.n.NormalizationAirbyteStreamFactory(filterOutAndHandleNonAirbyteMessageLines):104 -
2023-01-07 13:39:24 normalization > Running with dbt=1.0.0
2023-01-07 13:39:24 normalization > Partial parse save file not found. Starting full parse.
2023-01-07 13:39:35 normalization > [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 2 unused configuration paths:
- models.airbyte_utils.generated.airbyte_views
- models.airbyte_utils.generated.airbyte_incremental
2023-01-07 13:39:35 normalization > Found 4 models, 0 tests, 0 snapshots, 0 analyses, 602 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
2023-01-07 13:39:37 normalization > Concurrency: 8 threads (target='prod')
2023-01-07 13:39:38 normalization > 1 of 1 START table model my_database.table_one.......................................................................... [RUN]
2023-01-07 13:39:41 normalization > 1 of 1 OK created table model my_database.table_one..................................................................... [CREATE TABLE (0.0 rows, 0 processed) in 3.37s]
2023-01-07 13:39:41 normalization > Finished running 1 table model in 5.61s.
2023-01-07 13:39:41 normalization > Completed successfully
2023-01-07 13:39:41 normalization > Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
2023-01-07 13:39:42 INFO i.a.w.g.DefaultNormalizationWorker(run):93 - Normalization executed in 1 minute 7 seconds.
2023-01-07 13:39:42 INFO i.a.w.g.DefaultNormalizationWorker(run):106 - Normalization summary: io.airbyte.config.NormalizationSummary@ba651b4[startTime=1673098715059,endTime=1673098782184,failures=[]]
2023-01-07 13:39:42 INFO i.a.w.t.TemporalAttemptExecution(get):160 - Stopping cancellation check scheduling...
2023-01-07 13:39:42 INFO i.a.c.t.TemporalUtils(withBackgroundHeartbeat):283 - Stopping temporal heartbeating...
2023-01-07 13:39:42 INFO i.a.c.i.LineGobbler(voidCall):114 -
2023-01-07 13:39:42 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- END DEFAULT NORMALIZATION -----
2023-01-07 13:39:42 INFO i.a.c.i.LineGobbler(voidCall):114 -
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment