Страницы

Поиск по вопросам

вторник, 26 февраля 2019 г.

saveToCassandra() : IllegalArgumentException: Multiple constructors with the same number of parameters not allowed

Что я делаю не так? Здесь выдаёт ошибку при запуске:
textfile.saveToCassandra("logs", "logstable")
Весь код:
object App extends java.io.Serializable {
final val APP_NAME = "SparkBatchTest"
def main (args: Array[String]) {
val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]") .set("spark.cassandra.connection.host", "localhost") .set("spark.cassandra.connection.native.port", "9042") val sc = new SparkContext(conf)
val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*")
textfile.foreach(record => seperateFields(record)) textfile.saveToCassandra("logs", "logstable") //здесь ругается
}
def seperateFields(line: String): Tuple5[Object, Object, Object, Object, Object] = { println("Waiting...") Thread sleep 3000 println("Saving...") val split = line.split(" ").toArray[Object] println(line) return (split(0) + " " + split(1), if (line contains "Down") "0" else "1", split(5), split(6), split(4)) } }
Перед форматированием содержание файла следующее:
2015-12-10 12:04:48.299 AMP (amp-management-5-sa)[6953]: Aruba RAP-109 a63253686jypmO68380 Down System Device ID: 2490 Top > mariscos puerto vallarta 2-Standard
таблица в Cassandra выглядит следующим образом:
datetime | location | logid | status | systemid ---------------------+----------+---------+---------------------+---------- 2015-12-23 15:10:01 | 1 | RAP-109 | a73225704jypmO54359 | Aruba 2015-12-23 15:55:54 | 0 | RAP-109 | a62710684jypmO66318 | Aruba 2015-12-23 15:10:02 | 1 | RAP-109 | a2222705jypmO69412 | Aruba 2015-12-23 15:09:45 | 1 | RAP-109 | a80296226jypmO21003 | Aruba 2015-12-23 15:25:29 | 1 | RAP-109 | a11170884jypmO9634 | Aruba 2015-12-23 15:55:53 | 1 | RAP-109 | a18255961jypmO91299 | Aruba 2015-12-23 16:17:27 | 1 | RAP-109 | a41956492jypmO85560 | Aruba


Ответ

Решено, переписал следующие строки:
val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*")
val res = sc.parallelize(Seq(seperateFields(textfile.first()))) res.saveToCassandra("logs", "logstable", SomeColumns("datetime","location","logid","status","systemid"))
и ещё тут чуть-чуть:
def seperateFields(line: String): Tuple5[String, String, String, String, String] = {...
и всё заработало!

Комментариев нет:

Отправить комментарий