Что я делаю не так? Здесь выдаёт ошибку при запуске:
textfile.saveToCassandra("logs", "logstable")
Весь код:
object App extends java.io.Serializable {
final val APP_NAME = "SparkBatchTest"
def main (args: Array[String]) {
val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.cassandra.connection.native.port", "9042")
val sc = new SparkContext(conf)
val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*")
textfile.foreach(record => seperateFields(record))
textfile.saveToCassandra("logs", "logstable") //здесь ругается
}
def seperateFields(line: String): Tuple5[Object, Object, Object, Object, Object] = {
println("Waiting...")
Thread sleep 3000
println("Saving...")
val split = line.split(" ").toArray[Object]
println(line)
return (split(0) + " " + split(1), if (line contains "Down") "0" else "1", split(5), split(6), split(4))
}
}
Перед форматированием содержание файла следующее:
2015-12-10 12:04:48.299 AMP (amp-management-5-sa)[6953]: Aruba RAP-109 a63253686jypmO68380 Down System Device ID: 2490 Top > mariscos puerto vallarta 2-Standard
таблица в Cassandra выглядит следующим образом:
datetime | location | logid | status | systemid
---------------------+----------+---------+---------------------+----------
2015-12-23 15:10:01 | 1 | RAP-109 | a73225704jypmO54359 | Aruba
2015-12-23 15:55:54 | 0 | RAP-109 | a62710684jypmO66318 | Aruba
2015-12-23 15:10:02 | 1 | RAP-109 | a2222705jypmO69412 | Aruba
2015-12-23 15:09:45 | 1 | RAP-109 | a80296226jypmO21003 | Aruba
2015-12-23 15:25:29 | 1 | RAP-109 | a11170884jypmO9634 | Aruba
2015-12-23 15:55:53 | 1 | RAP-109 | a18255961jypmO91299 | Aruba
2015-12-23 16:17:27 | 1 | RAP-109 | a41956492jypmO85560 | Aruba
Ответ
Решено, переписал следующие строки:
val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*")
val res = sc.parallelize(Seq(seperateFields(textfile.first())))
res.saveToCassandra("logs", "logstable", SomeColumns("datetime","location","logid","status","systemid"))
и ещё тут чуть-чуть:
def seperateFields(line: String): Tuple5[String, String, String, String, String] = {...
и всё заработало!
Комментариев нет:
Отправить комментарий