How to store data in MySql using cygnus?

I have read all the documentation on how cygnus works, I specifically tested this one successfully. I also ended up reading this tutorial, but I'm pretty sure I haven't configured something correctly.

at cygnus_instance_1.conf Created by:



and in agent_1.conf I created:

# To be put in APACHE_FLUME_HOME/conf/cygnus.conf
# General configuration template explaining how to setup a sink of each of the available types (HDFS, CKAN, MySQL).

# The next tree fields set the sources, sinks and channels used by Cygnus. You could use different names than the
# ones suggested below, but in that case make sure you keep coherence in properties names along the configuration file.
# Regarding sinks, you can use multiple types at the same time; the only requirement is to provide a channel for each
# one of them (this example shows how to configure 3 sink types at the same time). Even, you can define more than one
# sink of the same type and sharing the channel in order to improve the performance (this is like having
# multi-threading).
cygnusagent.sources = http-source
cygnusagent.sinks = hdfs-sink mysql-sink ckan-sink
cygnusagent.channels = hdfs-channel mysql-channel ckan-channel

# source configuration
# channel name where to write the notification events
cygnusagent.sources.http-source.channels = hdfs-channel mysql-channel ckan-channel
# source class, must not be changed
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
cygnusagent.sources.http-source.port = 5050
# Flume handler that will parse the notifications, must not be changed
cygnusagent.sources.http-source.handler = es.tid.fiware.fiwareconnectors.cygnus.handlers.OrionRestHandler
# URL target
cygnusagent.sources.http-source.handler.notification_target = /notify
# Default service (service semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service = def_serv
# Default service path (service path semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service_path = def_servpath
# Number of channel re-injection retries before a Flume event is definitely discarded (-1 means infinite retries)
cygnusagent.sources.http-source.handler.events_ttl = 10
# Source interceptors, do not change
cygnusagent.sources.http-source.interceptors = ts de
# Interceptor type, do not change
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
# Destination extractor interceptor, do not change = es.tid.fiware.fiwareconnectors.cygnus.interceptors.DestinationExtractor$Builder
# Matching table for the destination extractor interceptor, put the right absolute path to the file if necessary
# See the doc/design/interceptors document for more details = /usr/cygnus/conf/matching_table.conf

# ============================================
# OrionHDFSSink configuration
# channel name from where to read notification events = hdfs-channel
# sink class, must not be changed
cygnusagent.sinks.hdfs-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionHDFSSink
# Comma-separated list of FQDN/IP address regarding the Cosmos Namenode endpoints
# If you are using Kerberos authentication, then the usage of FQDNs instead of IP addresses is mandatory
cygnusagent.sinks.hdfs-sink.cosmos_host = x1.y1.z1.w1,x2.y2.z2.w2
# port of the Cosmos service listening for persistence operations; 14000 for httpfs, 50070 for webhdfs and free choice for inifinty
cygnusagent.sinks.hdfs-sink.cosmos_port = 14000
# default username allowed to write in HDFS
cygnusagent.sinks.hdfs-sink.cosmos_default_username = cosmos_username
# default password for the default username
cygnusagent.sinks.hdfs-sink.cosmos_default_password = xxxxxxxxxxxxx
# HDFS backend type (webhdfs, httpfs or infinity)
cygnusagent.sinks.hdfs-sink.hdfs_api = httpfs
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.hdfs-sink.attr_persistence = column
# Hive FQDN/IP address of the Hive server
cygnusagent.sinks.hdfs-sink.hive_host = x.y.z.w
# Hive port for Hive external table provisioning
cygnusagent.sinks.hdfs-sink.hive_port = 10000
# Kerberos-based authentication enabling
cygnusagent.sinks.hdfs-sink.krb5_auth = false
# Kerberos username
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_user = krb5_username
# Kerberos password
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_password = xxxxxxxxxxxxx
# Kerberos login file
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_login_conf_file = /usr/cygnus/conf/krb5_login.conf
# Kerberos configuration file
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_conf_file = /usr/cygnus/conf/krb5.conf

# ============================================
# OrionCKANSink configuration
# channel name from where to read notification events = ckan-channel
# sink class, must not be changed
cygnusagent.sinks.ckan-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionCKANSink
# the CKAN API key to use
cygnusagent.sinks.ckan-sink.api_key = ckanapikey
# the FQDN/IP address for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_host = x.y.z.w
# the port for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_port = 80
# Orion URL used to compose the resource URL with the convenience operation URL to query it
cygnusagent.sinks.ckan-sink.orion_url = http://localhost:1026
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.ckan-sink.attr_persistence = row
# enable SSL for secure Http transportation; 'true' or 'false'
cygnusagent.sinks.ckan-sink.ssl = false

# ============================================
# OrionMySQLSink configuration
# channel name from where to read notification events = mysql-channel
# sink class, must not be changed
cygnusagent.sinks.mysql-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionMySQLSink
# the FQDN/IP address where the MySQL server runs 
cygnusagent.sinks.mysql-sink.mysql_host = localhost
# the port where the MySQL server listes for incomming connections
cygnusagent.sinks.mysql-sink.mysql_port = 3306
# a valid user in the MySQL server
cygnusagent.sinks.mysql-sink.mysql_username = root
# password for the user above
cygnusagent.sinks.mysql-sink.mysql_password = klasika
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.mysql-sink.attr_persistence = column

# hdfs-channel configuration
# channel type (must not be changed)
cygnusagent.channels.hdfs-channel.type = memory
# capacity of the channel
cygnusagent.channels.hdfs-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.hdfs-channel.transactionCapacity = 100

# ckan-channel configuration
# channel type (must not be changed)
cygnusagent.channels.ckan-channel.type = memory
# capacity of the channel
cygnusagent.channels.ckan-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.ckan-channel.transactionCapacity = 100

# mysql-channel configuration
# channel type (must not be changed)
cygnusagent.channels.mysql-channel.type = memory
# capacity of the channel
cygnusagent.channels.mysql-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mysql-channel.transactionCapacity = 100


While I am not using OrionHDFSSink and OrionCKANSink, I have not touched on these configurations because I am really not sure what the weather should be.

When I finally subscribeContext and assign the default cygnus @ server 5050, I get a normal response but nothing is created in my database

What am I doing wrong here?


source to share

1 answer

First of all, feel free to remove the HDFS and CKAN configuration components. While running Cygnus, you will avoid unnecessary logs associated with these components. Of course, don't forget to remove all links to receivers and channels; namely:

cygnusagent.sources = http-source
cygnusagent.sinks = mysql-sink
cygnusagent.channels = mysql-channel
cygnusagent.sources.http-source.channels = mysql-channel


Secondly, the answer to your question can be found in the documentation:

<i> Within the tables, we can find two options:

  • Fixed lines of 8 fields as usual: recvTimeTs, recvTime, entityId, entityType, attrName, attrType, attrValue and attrMd. These tables (and databases) are created at runtime if the table did not exist before the row was inserted. As for attrValue, in its simplest form, this value is just a string, but since Orion 0.11.0 it can be a Json object or a Json array. As for attrMd, it contains the string serialization of the metadata array for the attribute in Json (if the attribute has no metadata, an empty array [] is inserted),
  • Two columns for each attribute of the object (one for the value and the other for the metadata), plus a column for adding data reception time (recv_time). This type of tables (and databases) must be pre-provisioned for Cygnus to run since each object can have a different number of attributes, and notifications must ensure that the value for each attribute is notified.

The behavior of a connector with respect to its internal representation is defined through the attr_persistence configuration parameter, whose values ​​can be rows or columns.

Perhaps there is a spelling problem, I think the last paragraph should be completed as "... the integer values ​​can be rows or columns, and whose behavior matches the options described above, respectively . "

those. if you are using colummn mode then the database and tables must be prepared in advanced mode.

There is a similar question where I explain this behavior in more detail.




All Articles