#scala #hadoop #cassandra #apache_spark
Что я делаю не так? Здесь выдаёт ошибку при запуске: textfile.saveToCassandra("logs", "logstable") Весь код: object App extends java.io.Serializable { final val APP_NAME = "SparkBatchTest" def main (args: Array[String]) { val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[2]") .set("spark.cassandra.connection.host", "localhost") .set("spark.cassandra.connection.native.port", "9042") val sc = new SparkContext(conf) val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*") textfile.foreach(record => seperateFields(record)) textfile.saveToCassandra("logs", "logstable") //здесь ругается } def seperateFields(line: String): Tuple5[Object, Object, Object, Object, Object] = { println("Waiting...") Thread sleep 3000 println("Saving...") val split = line.split(" ").toArray[Object] println(line) return (split(0) + " " + split(1), if (line contains "Down") "0" else "1", split(5), split(6), split(4)) } } Перед форматированием содержание файла следующее: 2015-12-10 12:04:48.299 AMP (amp-management-5-sa)[6953]: Aruba RAP-109 a63253686jypmO68380 Down System Device ID: 2490 Top > mariscos puerto vallarta 2-Standard таблица в Cassandra выглядит следующим образом: datetime | location | logid | status | systemid ---------------------+----------+---------+---------------------+---------- 2015-12-23 15:10:01 | 1 | RAP-109 | a73225704jypmO54359 | Aruba 2015-12-23 15:55:54 | 0 | RAP-109 | a62710684jypmO66318 | Aruba 2015-12-23 15:10:02 | 1 | RAP-109 | a2222705jypmO69412 | Aruba 2015-12-23 15:09:45 | 1 | RAP-109 | a80296226jypmO21003 | Aruba 2015-12-23 15:25:29 | 1 | RAP-109 | a11170884jypmO9634 | Aruba 2015-12-23 15:55:53 | 1 | RAP-109 | a18255961jypmO91299 | Aruba 2015-12-23 16:17:27 | 1 | RAP-109 | a41956492jypmO85560 | Aruba
Ответы
Ответ 1
Решено, переписал следующие строки: val textfile = sc.textFile("hdfs:////tmp/kafka/test/15-12-10/FlumeDat*") val res = sc.parallelize(Seq(seperateFields(textfile.first()))) res.saveToCassandra("logs", "logstable", SomeColumns("datetime","location","logid","status","systemid")) и ещё тут чуть-чуть: def seperateFields(line: String): Tuple5[String, String, String, String, String] = {... и всё заработало!
Комментариев нет:
Отправить комментарий