目标是把数据放到Kafka里面去,然后通过Telegraf进行转化,把数据放到InfluxDB里面去,通过配置这些中间件去完成。

以下操作都是在Mac上完成

1. Kafka准备好

已启动,常用端口9092

Client使用Kafka Tool

2. InfluxDB准备好

已启动,常用端口8086
常用命令记住

show databases
use [database_name]
show measurements
SELECT * FROM "telegraf"."autogen"."kafka_consumer"
  • 1
  • 2
  • 3
  • 4

3. 安装Telegraf

brew update
brew install telegraf
  • 1
  • 2

在InfluxDB中

create user "telegraf" with password "telegraf"
  • 1

修改配置文件,配置InfluxDB

[[outputs.influxdb]]
   urls = ["http://localhost:8086"] # required 
   database = "telegraf" # required
   retention_policy = ""
   precision = "s"
   timeout = "5s"
   username = "telegraf"
   password = "password"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

启动

telegraf -config /usr/local/etc/telegraf.conf
  • 1

4. Telegraf 配置 Kafka

配置可参考官方给的一些建议

https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer

下面是从配置文件中取出来的配置

# # Read metrics from Kafka topics
# 下面这段请放开,一定放开
[[inputs.kafka_consumer]]
#   ## Kafka brokers.
    # 这个地方需要设置
    brokers = ["192.168.1.119:9092"]
#
#   ## Topics to consume.
    # 必须设置
    topics = ["tstkafkainflux"]
#
#   ## When set this tag will be added to all metrics with the topic as the value.
    # 必须设置
    topic_tag = "tstkafkainflux"
#
#   ## Optional Client id
#   # client_id = "Telegraf"
#
#   ## Set the minimal supported Kafka version.  Setting this enables the use of new
#   ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
#   ##   ex: version = "1.1.0"
#   # version = "0.10.2.0"
#
#   ## Optional TLS Config
#   # enable_tls = true
#   # tls_ca = "/etc/telegraf/ca.pem"
#   # tls_cert = "/etc/telegraf/cert.pem"
#   # tls_key = "/etc/telegraf/key.pem"
#   ## Use TLS but skip chain & host verification
#   # insecure_skip_verify = false
#
#   ## SASL authentication credentials.  These settings should typically be used
#   ## with TLS encryption enabled using the "enable_tls" option.
#   # sasl_username = "kafka"
#   # sasl_password = "secret"
#
#   ## SASL protocol version.  When connecting to Azure EventHub set to 0.
#   # sasl_version = 1
#
#   ## Name of the consumer group.
#   # consumer_group = "telegraf_metrics_consumers"
#
#   ## Initial offset position; one of "oldest" or "newest".
    offset = "oldest"
#
#   ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
#   # balance_strategy = "range"
#
#   ## Maximum length of a message to consume, in bytes (default 0/unlimited);
#   ## larger messages are dropped
#   max_message_len = 1000000
#
#   ## Maximum messages to read from the broker that have not been written by an
#   ## output.  For best throughput set based on the number of metrics within
#   ## each message and the size of the output's metric_batch_size.
#   ##
#   ## For example, if each message from the queue contains 10 metrics and the
#   ## output metric_batch_size is 1000, setting this to 100 will ensure that a
#   ## full batch is collected and the write is triggered immediately without
#   ## waiting until the next flush_interval.
#   # max_undelivered_messages = 1000
#
#   ## Data format to consume.
#   ## Each data format has its own unique set of configuration options, read
#   ## more about them here:
#   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
    # 这个地方需要设置的,目前我这样设置是可行的。具体请见上面注释的链接
    data_format = "value"
    data_type = "string"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
4.1 以下是json格式数据的配置

消息体

[{"uuid": "a", "time": 1587369361, "x": 1, "y": 2},
 {"uuid": "b", "time": 1587369361, "x": 3, "y": 4}]
  • 1
  • 2

配置,以下配置仅仅只是data_format处的配置,参考地址如下,选择其中的json

https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md

配置文件

data_format = "json"
json_strict = true
tag_keys = [
    "uuid"
]
json_string_fields=["x", "y"]
# 此处设置measurement名字
name_override = "simplejson_nolevel"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
4.2 以下是influx格式数据的配置

消息体

weather,location=us-midwest temperature=82 1465839830100400200
  • 1

配置,以下配置仅仅只是data_format处的配置,参考地址,选择其中的influx

https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md

配置文件

data_format = "influx"
  • 1

5. 演示

Python代码丢消息给Kafka

from kafka import KafkaProducer, KafkaConsumer
import time

if __name__ == '__main__':
    producer = KafkaProducer(bootstrap_servers='192.168.1.119:9092')
    value = "hereTime" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    producer.send('tstkafkainflux', key=b"abcde", value=bytes(value, encoding = "utf8"), partition=0)
    producer.flush()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

去KafKa Tool查询看看,记录结果如图
在这里插入图片描述

去InfluxDB进行查询,记录结果如图
在这里插入图片描述

6. Telegraf的docker镜像

https://hub.docker.com/_/telegraf

可自定义配置文件,启动时配置给镜像即可

7. Telegraf 支持的output和input服务

  • influxdb
  • amon
  • ampq
  • application_insights
  • postgresql
  • cloud_pubsub
  • cloudwatch
  • cratedb
  • datadog
  • discard
  • elasticsearch
  • exec
  • file
  • graphite
  • graylog
  • health
  • influxdb_v2
  • kafka
  • nats
  • nsq
  • opentsdb

反正挺多。。。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注