1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import numpy as np
19
20 from pyspark.mllib.linalg import Vectors, SparseVector
21 from pyspark.mllib.regression import LabeledPoint
22 from pyspark.mllib._common import _convert_vector
26 """
27 Helper methods to load, save and pre-process data used in MLlib.
28 """
29
30 @staticmethod
32 """
33 Parses a line in LIBSVM format into (label, indices, values).
34 """
35 items = line.split(None)
36 label = float(items[0])
37 if not multiclass:
38 label = 1.0 if label > 0.5 else 0.0
39 nnz = len(items) - 1
40 indices = np.zeros(nnz, dtype=np.int32)
41 values = np.zeros(nnz)
42 for i in xrange(nnz):
43 index, value = items[1 + i].split(":")
44 indices[i] = int(index) - 1
45 values[i] = float(value)
46 return label, indices, values
47
48 @staticmethod
50 """Converts a LabeledPoint to a string in LIBSVM format."""
51 items = [str(p.label)]
52 v = _convert_vector(p.features)
53 if type(v) == np.ndarray:
54 for i in xrange(len(v)):
55 items.append(str(i + 1) + ":" + str(v[i]))
56 elif type(v) == SparseVector:
57 nnz = len(v.indices)
58 for i in xrange(nnz):
59 items.append(str(v.indices[i] + 1) + ":" + str(v.values[i]))
60 else:
61 raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector"
62 " but got " % type(v))
63 return " ".join(items)
64
65 @staticmethod
66 - def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None):
67 """
68 Loads labeled data in the LIBSVM format into an RDD of
69 LabeledPoint. The LIBSVM format is a text-based format used by
70 LIBSVM and LIBLINEAR. Each line represents a labeled sparse
71 feature vector using the following format:
72
73 label index1:value1 index2:value2 ...
74
75 where the indices are one-based and in ascending order. This
76 method parses each line into a LabeledPoint, where the feature
77 indices are converted to zero-based.
78
79 @param sc: Spark context
80 @param path: file or directory path in any Hadoop-supported file
81 system URI
82 @param multiclass: whether the input labels contain more than
83 two classes. If false, any label with value
84 greater than 0.5 will be mapped to 1.0, or
85 0.0 otherwise. So it works for both +1/-1 and
86 1/0 cases. If true, the double value parsed
87 directly from the label string will be used
88 as the label value.
89 @param numFeatures: number of features, which will be determined
90 from the input data if a nonpositive value
91 is given. This is useful when the dataset is
92 already split into multiple files and you
93 want to load them separately, because some
94 features may not present in certain files,
95 which leads to inconsistent feature
96 dimensions.
97 @param minPartitions: min number of partitions
98 @return: labeled data stored as an RDD of LabeledPoint
99
100 >>> from tempfile import NamedTemporaryFile
101 >>> from pyspark.mllib.util import MLUtils
102 >>> tempFile = NamedTemporaryFile(delete=True)
103 >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
104 >>> tempFile.flush()
105 >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
106 >>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect()
107 >>> tempFile.close()
108 >>> examples[0].label
109 1.0
110 >>> examples[0].features.size
111 6
112 >>> print examples[0].features
113 [0: 1.0, 2: 2.0, 4: 3.0]
114 >>> examples[1].label
115 0.0
116 >>> examples[1].features.size
117 6
118 >>> print examples[1].features
119 []
120 >>> examples[2].label
121 0.0
122 >>> examples[2].features.size
123 6
124 >>> print examples[2].features
125 [1: 4.0, 3: 5.0, 5: 6.0]
126 >>> multiclass_examples[1].label
127 -1.0
128 """
129
130 lines = sc.textFile(path, minPartitions)
131 parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l, multiclass))
132 if numFeatures <= 0:
133 parsed.cache()
134 numFeatures = parsed.map(lambda x: 0 if x[1].size == 0 else x[1][-1]).reduce(max) + 1
135 return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2])))
136
137 @staticmethod
139 """
140 Save labeled data in LIBSVM format.
141
142 @param data: an RDD of LabeledPoint to be saved
143 @param dir: directory to save the data
144
145 >>> from tempfile import NamedTemporaryFile
146 >>> from fileinput import input
147 >>> from glob import glob
148 >>> from pyspark.mllib.util import MLUtils
149 >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \
150 LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
151 >>> tempFile = NamedTemporaryFile(delete=True)
152 >>> tempFile.close()
153 >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
154 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
155 '0.0 1:1.01 2:2.02 3:3.03\\n1.1 1:1.23 3:4.56\\n'
156 """
157 lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p))
158 lines.saveAsTextFile(dir)
159
162 import doctest
163 from pyspark.context import SparkContext
164 globs = globals().copy()
165
166
167 globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
168 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
169 globs['sc'].stop()
170 if failure_count:
171 exit(-1)
172
173
174 if __name__ == "__main__":
175 _test()
176